1
wei
2021-01-21 62d098cb78296feaa6f786a20748921338db838c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
//
//  MQTTInMemoryPersistence.m
//  MQTTClient
//
//  Created by Christoph Krey on 22.03.15.
//  Copyright © 2015-2017 Christoph Krey. All rights reserved.
//
 
#import "MQTTInMemoryPersistence.h"
 
#import "MQTTLog.h"
 
@implementation MQTTInMemoryFlow
@synthesize clientId;
@synthesize incomingFlag;
@synthesize retainedFlag;
@synthesize commandType;
@synthesize qosLevel;
@synthesize messageId;
@synthesize topic;
@synthesize data;
@synthesize deadline;
 
@end
 
@interface MQTTInMemoryPersistence()
@end
 
static NSMutableDictionary *clientIds;
 
@implementation MQTTInMemoryPersistence
@synthesize maxSize;
@synthesize persistent;
@synthesize maxMessages;
@synthesize maxWindowSize;
 
- (MQTTInMemoryPersistence *)init {
    self = [super init];
    self.maxMessages = MQTT_MAX_MESSAGES;
    self.maxWindowSize = MQTT_MAX_WINDOW_SIZE;
    @synchronized(clientIds) {
        if (!clientIds) {
            clientIds = [[NSMutableDictionary alloc] init];
        }
    }
    return self;
}
 
- (NSUInteger)windowSize:(NSString *)clientId {
    NSUInteger windowSize = 0;
    NSArray *flows = [self allFlowsforClientId:clientId
                                  incomingFlag:NO];
    for (MQTTInMemoryFlow *flow in flows) {
        if ((flow.commandType).intValue != MQTT_None) {
            windowSize++;
        }
    }
    return windowSize;
}
 
- (MQTTInMemoryFlow *)storeMessageForClientId:(NSString *)clientId
                                        topic:(NSString *)topic
                                         data:(NSData *)data
                                   retainFlag:(BOOL)retainFlag
                                          qos:(MQTTQosLevel)qos
                                        msgId:(UInt16)msgId
                                 incomingFlag:(BOOL)incomingFlag
                                  commandType:(UInt8)commandType
                                     deadline:(NSDate *)deadline {
    @synchronized(clientIds) {
        
        if (([self allFlowsforClientId:clientId incomingFlag:incomingFlag].count <= self.maxMessages)) {
            MQTTInMemoryFlow *flow = (MQTTInMemoryFlow *)[self createFlowforClientId:clientId
                                                                        incomingFlag:incomingFlag
                                                                           messageId:msgId];
            flow.topic = topic;
            flow.data = data;
            flow.retainedFlag = @(retainFlag);
            flow.qosLevel = @(qos);
            flow.commandType = [NSNumber numberWithUnsignedInteger:commandType];
            flow.deadline = deadline;
            return flow;
        } else {
            return nil;
        }
    }
}
 
- (void)deleteFlow:(MQTTInMemoryFlow *)flow {
    @synchronized(clientIds) {
        
        NSMutableDictionary *clientIdFlows = clientIds[flow.clientId];
        if (clientIdFlows) {
            NSMutableDictionary *clientIdDirectedFlow = clientIdFlows[flow.incomingFlag];
            if (clientIdDirectedFlow) {
                [clientIdDirectedFlow removeObjectForKey:flow.messageId];
            }
        }
    }
}
 
- (void)deleteAllFlowsForClientId:(NSString *)clientId {
    @synchronized(clientIds) {
        
        DDLogInfo(@"[MQTTInMemoryPersistence] deleteAllFlowsForClientId %@", clientId);
        [clientIds removeObjectForKey:clientId];
    }
}
 
- (void)sync {
    //
}
 
- (NSArray *)allFlowsforClientId:(NSString *)clientId
                    incomingFlag:(BOOL)incomingFlag {
    @synchronized(clientIds) {
        
        NSMutableArray *flows = nil;
        NSMutableDictionary *clientIdFlows = clientIds[clientId];
        if (clientIdFlows) {
            NSMutableDictionary *clientIdDirectedFlow = clientIdFlows[@(incomingFlag)];
            if (clientIdDirectedFlow) {
                flows = [NSMutableArray array];
                NSArray *keys = [clientIdDirectedFlow.allKeys sortedArrayUsingDescriptors:@[[NSSortDescriptor sortDescriptorWithKey:@"self" ascending:YES]]];
                for (id key in keys) {
                    [flows addObject:clientIdDirectedFlow[key]];
                }
            }
        }
        return flows;
    }
}
 
- (MQTTInMemoryFlow *)flowforClientId:(NSString *)clientId
                         incomingFlag:(BOOL)incomingFlag
                            messageId:(UInt16)messageId {
    @synchronized(clientIds) {
        
        MQTTInMemoryFlow *flow = nil;
        
        NSMutableDictionary *clientIdFlows = clientIds[clientId];
        if (clientIdFlows) {
            NSMutableDictionary *clientIdDirectedFlow = clientIdFlows[@(incomingFlag)];
            if (clientIdDirectedFlow) {
                flow = clientIdDirectedFlow[[NSNumber numberWithUnsignedInteger:messageId]];
            }
        }
        
        return flow;
    }
}
 
- (MQTTInMemoryFlow *)createFlowforClientId:(NSString *)clientId
                               incomingFlag:(BOOL)incomingFlag
                                  messageId:(UInt16)messageId {
    @synchronized(clientIds) {
        NSMutableDictionary *clientIdFlows = clientIds[clientId];
        if (!clientIdFlows) {
            clientIdFlows = [[NSMutableDictionary alloc] init];
            clientIds[clientId] = clientIdFlows;
        }
        
        NSMutableDictionary *clientIdDirectedFlow = clientIdFlows[@(incomingFlag)];
        if (!clientIdDirectedFlow) {
            clientIdDirectedFlow = [[NSMutableDictionary alloc] init];
            clientIdFlows[@(incomingFlag)] = clientIdDirectedFlow;
        }
        
        MQTTInMemoryFlow *flow = [[MQTTInMemoryFlow alloc] init];
        flow.clientId = clientId;
        flow.incomingFlag = @(incomingFlag);
        flow.messageId = [NSNumber numberWithUnsignedInteger:messageId];
        
        clientIdDirectedFlow[[NSNumber numberWithUnsignedInteger:messageId]] = flow;
        
        return flow;
    }
}
 
@end