// // MQTTSessionManager.m // MQTTClient // // Created by Christoph Krey on 09.07.14. // Copyright © 2013-2017 Christoph Krey. All rights reserved. // #import "MQTTSessionManager.h" #import "MQTTCoreDataPersistence.h" #import "MQTTLog.h" #import "ReconnectTimer.h" #import "ForegroundReconnection.h" #import "MQTTSSLSecurityPolicyTransport.h" @interface MQTTSessionManager() @property (nonatomic, readwrite) MQTTSessionManagerState state; @property (nonatomic, readwrite) NSError *lastErrorCode; @property (strong, nonatomic) ReconnectTimer *reconnectTimer; @property (nonatomic) BOOL reconnectFlag; @property (strong, nonatomic) MQTTSession *session; @property (strong, nonatomic) NSString *host; @property (nonatomic) UInt32 port; @property (nonatomic) BOOL tls; @property (nonatomic) NSInteger keepalive; @property (nonatomic) BOOL clean; @property (nonatomic) BOOL auth; @property (nonatomic) BOOL will; @property (strong, nonatomic) NSString *user; @property (strong, nonatomic) NSString *pass; @property (strong, nonatomic) NSString *willTopic; @property (strong, nonatomic) NSData *willMsg; @property (nonatomic) NSInteger willQos; @property (nonatomic) BOOL willRetainFlag; @property (strong, nonatomic) NSString *clientId; @property (strong, nonatomic) dispatch_queue_t queue; @property (strong, nonatomic) MQTTSSLSecurityPolicy *securityPolicy; @property (strong, nonatomic) NSArray *certificates; @property (nonatomic) MQTTProtocolVersion protocolLevel; #if TARGET_OS_IPHONE == 1 @property (strong, nonatomic) ForegroundReconnection *foregroundReconnection; #endif @property (nonatomic) BOOL persistent; @property (nonatomic) NSUInteger maxWindowSize; @property (nonatomic) NSUInteger maxSize; @property (nonatomic) NSUInteger maxMessages; @property (strong, nonatomic) NSString *streamSSLLevel; @property (strong, nonatomic) NSDictionary *internalSubscriptions; @property (strong, nonatomic) NSDictionary *effectiveSubscriptions; @property (strong, nonatomic) NSLock *subscriptionLock; @end #define RECONNECT_TIMER 1.0 #define RECONNECT_TIMER_MAX_DEFAULT 64.0 @implementation MQTTSessionManager - (instancetype)init { self = [self initWithPersistence:MQTT_PERSISTENT maxWindowSize:MQTT_MAX_WINDOW_SIZE maxMessages:MQTT_MAX_MESSAGES maxSize:MQTT_MAX_SIZE maxConnectionRetryInterval:RECONNECT_TIMER_MAX_DEFAULT connectInForeground:YES streamSSLLevel:(NSString *)kCFStreamSocketSecurityLevelNegotiatedSSL queue:dispatch_get_main_queue()]; return self; } - (MQTTSessionManager *)initWithPersistence:(BOOL)persistent maxWindowSize:(NSUInteger)maxWindowSize maxMessages:(NSUInteger)maxMessages maxSize:(NSUInteger)maxSize maxConnectionRetryInterval:(NSTimeInterval)maxRetryInterval connectInForeground:(BOOL)connectInForeground streamSSLLevel:(NSString *)streamSSLLevel queue:(dispatch_queue_t)queue { self = [super init]; self.streamSSLLevel = streamSSLLevel; self.queue = queue; [self updateState:MQTTSessionManagerStateStarting]; self.internalSubscriptions = [[NSMutableDictionary alloc] init]; self.effectiveSubscriptions = [[NSMutableDictionary alloc] init]; self.persistent = persistent; self.maxWindowSize = maxWindowSize; self.maxSize = maxSize; self.maxMessages = maxMessages; __weak MQTTSessionManager *weakSelf = self; self.reconnectTimer = [[ReconnectTimer alloc] initWithRetryInterval:RECONNECT_TIMER maxRetryInterval:maxRetryInterval queue:self.queue reconnectBlock:^{ [weakSelf reconnect:nil]; }]; #if TARGET_OS_IPHONE == 1 if (connectInForeground) { self.foregroundReconnection = [[ForegroundReconnection alloc] initWithMQTTSessionManager:self]; } #endif self.subscriptionLock = [[NSLock alloc] init]; return self; } - (void)connectTo:(NSString *)host port:(NSInteger)port tls:(BOOL)tls keepalive:(NSInteger)keepalive clean:(BOOL)clean auth:(BOOL)auth user:(NSString *)user pass:(NSString *)pass will:(BOOL)will willTopic:(NSString *)willTopic willMsg:(NSData *)willMsg willQos:(MQTTQosLevel)willQos willRetainFlag:(BOOL)willRetainFlag withClientId:(NSString *)clientId securityPolicy:(MQTTSSLSecurityPolicy *)securityPolicy certificates:(NSArray *)certificates protocolLevel:(MQTTProtocolVersion)protocolLevel connectHandler:(MQTTConnectHandler)connectHandler { DDLogVerbose(@"MQTTSessionManager connectTo:%@", host); BOOL shouldReconnect = self.session != nil; if (!self.session || ![host isEqualToString:self.host] || port != self.port || tls != self.tls || keepalive != self.keepalive || clean != self.clean || auth != self.auth || ![user isEqualToString:self.user] || ![pass isEqualToString:self.pass] || ![willTopic isEqualToString:self.willTopic] || ![willMsg isEqualToData:self.willMsg] || willQos != self.willQos || willRetainFlag != self.willRetainFlag || ![clientId isEqualToString:self.clientId] || securityPolicy != self.securityPolicy || certificates != self.certificates) { self.host = host; self.port = (int)port; self.tls = tls; self.keepalive = keepalive; self.clean = clean; self.auth = auth; self.user = user; self.pass = pass; self.will = will; self.willTopic = willTopic; self.willMsg = willMsg; self.willQos = willQos; self.willRetainFlag = willRetainFlag; self.clientId = clientId; self.securityPolicy = securityPolicy; self.certificates = certificates; self.protocolLevel = protocolLevel; self.session = [[MQTTSession alloc] initWithClientId:clientId userName:auth ? user : nil password:auth ? pass : nil keepAlive:keepalive connectMessage:nil cleanSession:clean will:will willTopic:willTopic willMsg:willMsg willQoS:willQos willRetainFlag:willRetainFlag protocolLevel:protocolLevel queue:self.queue securityPolicy:securityPolicy certificates:certificates]; self.session.streamSSLLevel = self.streamSSLLevel; MQTTCoreDataPersistence *persistence = [[MQTTCoreDataPersistence alloc] init]; persistence.persistent = self.persistent; persistence.maxWindowSize = self.maxWindowSize; persistence.maxSize = self.maxSize; persistence.maxMessages = self.maxMessages; self.session.persistence = persistence; self.session.delegate = self; self.reconnectFlag = FALSE; } if (shouldReconnect) { DDLogVerbose(@"[MQTTSessionManager] reconnecting"); [self disconnectWithDisconnectHandler:nil]; [self reconnect:connectHandler]; } else { DDLogVerbose(@"[MQTTSessionManager] connecting"); [self connectToInternal:connectHandler]; } } - (UInt16)sendData:(NSData *)data topic:(NSString *)topic qos:(MQTTQosLevel)qos retain:(BOOL)retainFlag { if (self.state != MQTTSessionManagerStateConnected) { [self connectToLast:nil]; } UInt16 msgId = [self.session publishData:data onTopic:topic retain:retainFlag qos:qos]; return msgId; } - (void)disconnectWithDisconnectHandler:(MQTTDisconnectHandler)disconnectHandler { [self updateState:MQTTSessionManagerStateClosing]; [self.session closeWithDisconnectHandler:disconnectHandler]; [self.reconnectTimer stop]; } - (BOOL)requiresTearDown { return (self.state != MQTTSessionManagerStateClosed && self.state != MQTTSessionManagerStateStarting); } - (void)updateState:(MQTTSessionManagerState)newState { self.state = newState; if ([self.delegate respondsToSelector:@selector(sessionManager:didChangeState:)]) { [self.delegate sessionManager:self didChangeState:newState]; } } #pragma mark - MQTT Callback methods - (void)handleEvent:(MQTTSession *)session event:(MQTTSessionEvent)eventCode error:(NSError *)error { #ifdef DEBUG __unused const NSDictionary *events = @{ @(MQTTSessionEventConnected): @"connected", @(MQTTSessionEventConnectionRefused): @"connection refused", @(MQTTSessionEventConnectionClosed): @"connection closed", @(MQTTSessionEventConnectionError): @"connection error", @(MQTTSessionEventProtocolError): @"protocoll error", @(MQTTSessionEventConnectionClosedByBroker): @"connection closed by broker" }; DDLogVerbose(@"[MQTTSessionManager] eventCode: %@ (%ld) %@", events[@(eventCode)], (long)eventCode, error); #endif switch (eventCode) { case MQTTSessionEventConnected: self.lastErrorCode = nil; [self updateState:MQTTSessionManagerStateConnected]; [self.reconnectTimer resetRetryInterval]; break; case MQTTSessionEventConnectionClosed: [self updateState:MQTTSessionManagerStateClosed]; break; case MQTTSessionEventConnectionClosedByBroker: if (self.state != MQTTSessionManagerStateClosing) { [self triggerDelayedReconnect]; } [self updateState:MQTTSessionManagerStateClosed]; break; case MQTTSessionEventProtocolError: case MQTTSessionEventConnectionRefused: case MQTTSessionEventConnectionError: [self triggerDelayedReconnect]; self.lastErrorCode = error; [self updateState:MQTTSessionManagerStateError]; break; default: break; } } - (void)newMessage:(MQTTSession *)session data:(NSData *)data onTopic:(NSString *)topic qos:(MQTTQosLevel)qos retained:(BOOL)retained mid:(unsigned int)mid { if (self.delegate) { if ([self.delegate respondsToSelector:@selector(sessionManager:didReceiveMessage:onTopic:retained:)]) { [self.delegate sessionManager:self didReceiveMessage:data onTopic:topic retained:retained]; } if ([self.delegate respondsToSelector:@selector(handleMessage:onTopic:retained:)]) { [self.delegate handleMessage:data onTopic:topic retained:retained]; } } } - (void)connected:(MQTTSession *)session sessionPresent:(BOOL)sessionPresent { if (self.clean || !self.reconnectFlag || !sessionPresent) { NSDictionary *subscriptions = [self.internalSubscriptions copy]; [self.subscriptionLock lock]; self.effectiveSubscriptions = [[NSMutableDictionary alloc] init]; [self.subscriptionLock unlock]; if (subscriptions.count) { __weak MQTTSessionManager *weakSelf = self; [self.session subscribeToTopics:subscriptions subscribeHandler:^(NSError *error, NSArray *gQoss) { MQTTSessionManager *strongSelf = weakSelf; if (!error) { NSArray *allTopics = subscriptions.allKeys; for (int i = 0; i < allTopics.count; i++) { NSString *topic = allTopics[i]; NSNumber *gQos = gQoss[i]; [strongSelf.subscriptionLock lock]; NSMutableDictionary *newEffectiveSubscriptions = [strongSelf.subscriptions mutableCopy]; newEffectiveSubscriptions[topic] = gQos; strongSelf.effectiveSubscriptions = newEffectiveSubscriptions; [strongSelf.subscriptionLock unlock]; } } }]; } self.reconnectFlag = TRUE; } } - (void)messageDelivered:(MQTTSession *)session msgID:(UInt16)msgID { if (self.delegate) { if ([self.delegate respondsToSelector:@selector(sessionManager:didDeliverMessage:)]) { [self.delegate sessionManager:self didDeliverMessage:msgID]; } if ([self.delegate respondsToSelector:@selector(messageDelivered:)]) { [self.delegate messageDelivered:msgID]; } } } - (void)connectToInternal:(MQTTConnectHandler)connectHandler { if (self.session && self.state == MQTTSessionManagerStateStarting) { [self updateState:MQTTSessionManagerStateConnecting]; MQTTCFSocketTransport *transport; if (self.securityPolicy) { transport = [[MQTTSSLSecurityPolicyTransport alloc] init]; ((MQTTSSLSecurityPolicyTransport *)transport).securityPolicy = self.securityPolicy; } else { transport = [[MQTTCFSocketTransport alloc] init]; } transport.host = self.host; transport.port = self.port; transport.tls = self.tls; transport.certificates = self.certificates; transport.voip = self.session.voip; transport.queue = self.queue; transport.streamSSLLevel = self.streamSSLLevel; self.session.transport = transport; [self.session connectWithConnectHandler:connectHandler]; } } - (void)reconnect:(MQTTConnectHandler)connectHandler { [self updateState:MQTTSessionManagerStateStarting]; [self connectToInternal:connectHandler]; } - (void)connectToLast:(MQTTConnectHandler)connectHandler { if (self.state == MQTTSessionManagerStateConnected) { return; } [self.reconnectTimer resetRetryInterval]; [self reconnect:connectHandler]; } - (void)triggerDelayedReconnect { [self.reconnectTimer schedule]; } - (NSDictionary *)subscriptions { return self.internalSubscriptions; } - (void)setSubscriptions:(NSDictionary *)newSubscriptions { if (self.state == MQTTSessionManagerStateConnected) { NSDictionary *currentSubscriptions = [self.effectiveSubscriptions copy]; for (NSString *topicFilter in currentSubscriptions) { if (!newSubscriptions[topicFilter]) { __weak MQTTSessionManager *weakSelf = self; [self.session unsubscribeTopic:topicFilter unsubscribeHandler:^(NSError *error) { MQTTSessionManager *strongSelf = weakSelf; if (!error) { [strongSelf.subscriptionLock lock]; NSMutableDictionary *newEffectiveSubscriptions = [strongSelf.subscriptions mutableCopy]; [newEffectiveSubscriptions removeObjectForKey:topicFilter]; strongSelf.effectiveSubscriptions = newEffectiveSubscriptions; [strongSelf.subscriptionLock unlock]; } }]; } } for (NSString *topicFilter in newSubscriptions) { if (!currentSubscriptions[topicFilter]) { NSNumber *number = newSubscriptions[topicFilter]; MQTTQosLevel qos = number.unsignedIntValue; __weak MQTTSessionManager *weakSelf = self; [self.session subscribeToTopic:topicFilter atLevel:qos subscribeHandler:^(NSError *error, NSArray *gQoss) { MQTTSessionManager *strongSelf = weakSelf; if (!error) { NSNumber *gQos = gQoss[0]; [strongSelf.subscriptionLock lock]; NSMutableDictionary *newEffectiveSubscriptions = [strongSelf.subscriptions mutableCopy]; newEffectiveSubscriptions[topicFilter] = gQos; strongSelf.effectiveSubscriptions = newEffectiveSubscriptions; [strongSelf.subscriptionLock unlock]; } }]; } } } self.internalSubscriptions = newSubscriptions; DDLogVerbose(@"MQTTSessionManager internalSubscriptions: %@", self.internalSubscriptions); } @end