// // MQTTSession.m // MQTTClient.framework // // Copyright © 2013-2017, Christoph Krey. All rights reserved. // #import "MQTTSession.h" #import "MQTTDecoder.h" #import "MQTTStrict.h" #import "MQTTProperties.h" #import "MQTTMessage.h" #import "MQTTCoreDataPersistence.h" #import "GCDTimer.h" @class MQTTSSLSecurityPolicy; #import "MQTTLog.h" NSString * const MQTTSessionErrorDomain = @"MQTT"; @interface MQTTSession() @property (nonatomic, readwrite) MQTTSessionStatus status; @property (nonatomic, readwrite) BOOL sessionPresent; @property (strong, nonatomic) GCDTimer *keepAliveTimer; @property (strong, nonatomic) NSNumber *serverKeepAlive; @property (nonatomic) UInt16 effectiveKeepAlive; @property (strong, nonatomic) GCDTimer *checkDupTimer; @property (strong, nonatomic) MQTTDecoder *decoder; @property (copy, nonatomic) MQTTDisconnectHandler disconnectHandler; @property (nonatomic, strong) NSMutableDictionary *subscribeHandlers; @property (nonatomic, strong) NSMutableDictionary *unsubscribeHandlers; @property (nonatomic, strong) NSMutableDictionary *publishHandlers; @property (nonatomic) UInt16 txMsgId; @property (nonatomic) BOOL synchronPub; @property (nonatomic) UInt16 synchronPubMid; @property (nonatomic) BOOL synchronUnsub; @property (nonatomic) UInt16 synchronUnsubMid; @property (nonatomic) BOOL synchronSub; @property (nonatomic) UInt16 synchronSubMid; @property (nonatomic) BOOL synchronConnect; @property (nonatomic) BOOL synchronDisconnect; @property (strong, nonatomic) MQTTSSLSecurityPolicy *securityPolicy; @end #define DUPLOOP 1.0 @implementation MQTTSession @synthesize certificates; - (void)setCertificates:(NSArray *)newCertificates { certificates = newCertificates; if (self.transport) { if ([self.transport respondsToSelector:@selector(setCertificates:)]) { [self.transport performSelector:@selector(setCertificates:) withObject:certificates]; } } } - (instancetype)init { DDLogVerbose(@"[MQTTSession] init"); self = [super init]; self.txMsgId = 1; self.persistence = [[MQTTCoreDataPersistence alloc] init]; self.subscribeHandlers = [[NSMutableDictionary alloc] init]; self.unsubscribeHandlers = [[NSMutableDictionary alloc] init]; self.publishHandlers = [[NSMutableDictionary alloc] init]; self.clientId = nil; self.userName = nil; self.password = nil; self.keepAliveInterval = 60; self.dupTimeout = 20.0; self.cleanSessionFlag = true; self.willFlag = false; self.willTopic = nil; self.willMsg = nil; self.willQoS = MQTTQosLevelAtMostOnce; self.willRetainFlag = false; self.protocolLevel = MQTTProtocolVersion311; self.queue = dispatch_get_main_queue(); self.status = MQTTSessionStatusCreated; self.streamSSLLevel = (NSString *)kCFStreamSocketSecurityLevelNegotiatedSSL; return self; } - (void)dealloc { [self.keepAliveTimer invalidate]; [self.checkDupTimer invalidate]; } - (NSString *)host { return _transport.host; } - (UInt32)port { return _transport.port; } - (void)setClientId:(NSString *)clientId { if (!clientId) { clientId = [NSString stringWithFormat:@"MQTTClient%.0f",fmod([NSDate date].timeIntervalSince1970, 1.0) * 1000000.0]; } _clientId = clientId; } - (void)setStreamSSLLevel:(NSString *)streamSSLLevel { _streamSSLLevel = streamSSLLevel; self.transport.streamSSLLevel = self.streamSSLLevel; } - (UInt16)subscribeToTopic:(NSString *)topic atLevel:(MQTTQosLevel)qosLevel { return [self subscribeToTopic:topic atLevel:qosLevel subscribeHandler:nil]; } - (UInt16)subscribeToTopic:(NSString *)topic atLevel:(MQTTQosLevel)qosLevel subscribeHandler:(MQTTSubscribeHandler)subscribeHandler { return [self subscribeToTopics:topic ? @{topic: @(qosLevel)} : @{} subscribeHandler:subscribeHandler]; } - (UInt16)subscribeToTopics:(NSDictionary *)topics { return [self subscribeToTopics:topics subscribeHandler:nil]; } - (void)checkTopicFilters:(NSArray *)topicFilters { if (MQTTStrict.strict && topicFilters.count == 0) { NSException* myException = [NSException exceptionWithName:@"topicFilter array in SUBSCRIBE or UNSUBSRIBE must not be empty" reason:[NSString stringWithFormat:@"%@", topicFilters] userInfo:nil]; @throw myException; } for (NSString *topicFilter in topicFilters) { if (MQTTStrict.strict && topicFilter.length < 1) { NSException* myException = [NSException exceptionWithName:@"topicFilter must be at least 1 characters long" reason:[NSString stringWithFormat:@"%@", topicFilter] userInfo:nil]; @throw myException; } if (MQTTStrict.strict && [topicFilter dataUsingEncoding:NSUTF8StringEncoding].length > 65535L) { NSException* myException = [NSException exceptionWithName:@"topicFilter may not be longer than 65535 bytes in UTF8 representation" reason:[NSString stringWithFormat:@"topicFilter length = %lu", (unsigned long)[topicFilter dataUsingEncoding:NSUTF8StringEncoding].length] userInfo:nil]; @throw myException; } if (MQTTStrict.strict && ![topicFilter dataUsingEncoding:NSUTF8StringEncoding]) { NSException* myException = [NSException exceptionWithName:@"topicFilter must not contain non-UTF8 characters" reason:[NSString stringWithFormat:@"topicFilter = %@", topicFilter] userInfo:nil]; @throw myException; } if (MQTTStrict.strict) { NSArray *components = [topicFilter componentsSeparatedByString:@"/"]; for (int level = 0; level < components.count; level++) { if ([components[level] rangeOfString:@"+"].location != NSNotFound && components[level].length > 1) { NSException* myException = [NSException exceptionWithName:@"singlelevel wildcard must be alone on a level of a topic filter" reason:[NSString stringWithFormat:@"topicFilter = %@", topicFilter] userInfo:nil]; @throw myException; } } for (int level = 0; level < components.count - 1; level++) { if ([components[level] rangeOfString:@"#"].location != NSNotFound) { NSException* myException = [NSException exceptionWithName:@"multilevel wildcard must be on the last level of a topic filter" reason:[NSString stringWithFormat:@"topicFilter = %@", topicFilter] userInfo:nil]; @throw myException; } } if ([components[components.count - 1] rangeOfString:@"#"].location != NSNotFound && components[components.count - 1].length > 1) { NSException* myException = [NSException exceptionWithName:@"multilevel wildcard must be alone on a level of a topic filter" reason:[NSString stringWithFormat:@"topicFilter = %@", topicFilter] userInfo:nil]; @throw myException; } } if (MQTTStrict.strict && [topicFilter rangeOfString:@"#"].location != NSNotFound && [topicFilter rangeOfString:@"#"].location != topicFilter.length && (topicFilter.length == 1 || [[topicFilter substringWithRange:NSMakeRange(topicFilter.length - 2, 1)] isEqualToString:@"/"]) ) { NSException* myException = [NSException exceptionWithName:@"multilevel wildcard must alone on the last level of a topic filter" reason:[NSString stringWithFormat:@"topicFilter = %@", topicFilter] userInfo:nil]; @throw myException; } } } - (UInt16)subscribeToTopics:(NSDictionary *)topics subscribeHandler:(MQTTSubscribeHandler)subscribeHandler { DDLogVerbose(@"[MQTTSession] subscribeToTopics:%@]", topics); [self checkTopicFilters:topics.allKeys]; for (NSNumber *qos in topics.allValues) { if (MQTTStrict.strict && qos.intValue != MQTTQosLevelAtMostOnce && qos.intValue != MQTTQosLevelAtLeastOnce && qos.intValue != MQTTQosLevelExactlyOnce) { NSException* myException = [NSException exceptionWithName:@"Illegal QoS level" reason:[NSString stringWithFormat:@"%d is not 0, 1, or 2", qos.intValue] userInfo:nil]; @throw myException; } } UInt16 mid = [self nextMsgId]; if (subscribeHandler) { (self.subscribeHandlers)[@(mid)] = [subscribeHandler copy]; } else { [self.subscribeHandlers removeObjectForKey:@(mid)]; } (void)[self encode:[MQTTMessage subscribeMessageWithMessageId:mid topics:topics protocolLevel:self.protocolLevel subscriptionIdentifier:nil]]; return mid; } - (UInt16)unsubscribeTopic:(NSString*)topic { return [self unsubscribeTopic:topic unsubscribeHandler:nil]; } - (UInt16)unsubscribeTopic:(NSString *)topic unsubscribeHandler:(MQTTUnsubscribeHandler)unsubscribeHandler { return [self unsubscribeTopics:topic ? @[topic] : @[] unsubscribeHandler:unsubscribeHandler]; } - (UInt16)unsubscribeTopics:(NSArray *)topics { return [self unsubscribeTopics:topics unsubscribeHandler:nil]; } - (UInt16)unsubscribeTopics:(NSArray *)topics unsubscribeHandler:(MQTTUnsubscribeHandler)unsubscribeHandler { DDLogVerbose(@"[MQTTSession] unsubscribeTopics:%@", topics); [self checkTopicFilters:topics]; UInt16 mid = [self nextMsgId]; if (unsubscribeHandler) { (self.unsubscribeHandlers)[@(mid)] = [unsubscribeHandler copy]; } else { [self.unsubscribeHandlers removeObjectForKey:@(mid)]; } (void)[self encode:[MQTTMessage unsubscribeMessageWithMessageId:mid topics:topics protocolLevel:self.protocolLevel]]; return mid; } - (UInt16)publishData:(NSData*)data onTopic:(NSString*)topic retain:(BOOL)retainFlag qos:(MQTTQosLevel)qos { return [self publishData:data onTopic:topic retain:retainFlag qos:qos publishHandler:nil]; } - (UInt16)publishData:(NSData *)data onTopic:(NSString *)topic retain:(BOOL)retainFlag qos:(MQTTQosLevel)qos publishHandler:(MQTTPublishHandler)publishHandler { DDLogVerbose(@"[MQTTSession] publishData:%@... onTopic:%@ retain:%d qos:%ld publishHandler:%p", [data subdataWithRange:NSMakeRange(0, MIN(256, data.length))], [topic substringWithRange:NSMakeRange(0, MIN(256, topic.length))], retainFlag, (long)qos, publishHandler); if (MQTTStrict.strict && !topic) { NSException* myException = [NSException exceptionWithName:@"topic must not be nil" reason:[NSString stringWithFormat:@"%@", topic] userInfo:nil]; @throw myException; } if (MQTTStrict.strict && topic && topic.length < 1) { NSException* myException = [NSException exceptionWithName:@"topic must not at least 1 character long" reason:[NSString stringWithFormat:@"%@", topic] userInfo:nil]; @throw myException; } if (MQTTStrict.strict && topic && [topic dataUsingEncoding:NSUTF8StringEncoding].length > 65535L) { NSException* myException = [NSException exceptionWithName:@"topic may not be longer than 65535 bytes in UTF8 representation" reason:[NSString stringWithFormat:@"topic length = %lu", (unsigned long)[topic dataUsingEncoding:NSUTF8StringEncoding].length] userInfo:nil]; @throw myException; } if (MQTTStrict.strict && topic && ![topic dataUsingEncoding:NSUTF8StringEncoding]) { NSException* myException = [NSException exceptionWithName:@"topic must not contain non-UTF8 characters" reason:[NSString stringWithFormat:@"topic = %@", topic] userInfo:nil]; @throw myException; } if (MQTTStrict.strict && self.willTopic && ([self.willTopic containsString:@"+"] || [self.willTopic containsString:@"#"]) ) { NSException* myException = [NSException exceptionWithName:@"willTopic must not contain wildcards" reason:[NSString stringWithFormat:@"willTopic = %@", self.willTopic] userInfo:nil]; @throw myException; } if (MQTTStrict.strict && qos != MQTTQosLevelAtMostOnce && qos != MQTTQosLevelAtLeastOnce && qos != MQTTQosLevelExactlyOnce) { NSException* myException = [NSException exceptionWithName:@"Illegal QoS level" reason:[NSString stringWithFormat:@"%d is not 0, 1, or 2", qos] userInfo:nil]; @throw myException; } UInt16 msgId = 0; if (!qos) { MQTTMessage *msg = [MQTTMessage publishMessageWithData:data onTopic:topic qos:qos msgId:msgId retainFlag:retainFlag dupFlag:FALSE protocolLevel:self.protocolLevel payloadFormatIndicator:nil publicationExpiryInterval:nil topicAlias:nil responseTopic:nil correlationData:nil userProperty:nil contentType:nil]; NSError *error = nil; if (![self encode:msg]) { error = [NSError errorWithDomain:MQTTSessionErrorDomain code:MQTTSessionErrorEncoderNotReady userInfo:@{NSLocalizedDescriptionKey : @"Encoder not ready"}]; } if (publishHandler) { [self onPublish:publishHandler error:error]; } } else { msgId = [self nextMsgId]; MQTTMessage *msg = nil; id flow; if (self.status == MQTTSessionStatusConnected) { NSArray *flows = [self.persistence allFlowsforClientId:self.clientId incomingFlag:NO]; BOOL unprocessedMessageNotExists = TRUE; NSUInteger windowSize = 0; for (id flow in flows) { if ((flow.commandType).intValue != MQTT_None) { windowSize++; } else { unprocessedMessageNotExists = FALSE; } } if (unprocessedMessageNotExists && windowSize <= self.persistence.maxWindowSize) { msg = [MQTTMessage publishMessageWithData:data onTopic:topic qos:qos msgId:msgId retainFlag:retainFlag dupFlag:FALSE protocolLevel:self.protocolLevel payloadFormatIndicator:nil publicationExpiryInterval:nil topicAlias:nil responseTopic:nil correlationData:nil userProperty:nil contentType:nil]; flow = [self.persistence storeMessageForClientId:self.clientId topic:topic data:data retainFlag:retainFlag qos:qos msgId:msgId incomingFlag:NO commandType:MQTTPublish deadline:[NSDate dateWithTimeIntervalSinceNow:self.dupTimeout]]; } } if (!msg) { flow = [self.persistence storeMessageForClientId:self.clientId topic:topic data:data retainFlag:retainFlag qos:qos msgId:msgId incomingFlag:NO commandType:MQTT_None deadline:[NSDate date]]; } if (!flow) { DDLogWarn(@"[MQTTSession] dropping outgoing message %d", msgId); NSError *error = [NSError errorWithDomain:MQTTSessionErrorDomain code:MQTTSessionErrorDroppingOutgoingMessage userInfo:@{NSLocalizedDescriptionKey : @"Dropping outgoing Message"}]; if (publishHandler) { [self onPublish:publishHandler error:error]; } msgId = 0; } else { [self.persistence sync]; if (publishHandler) { (self.publishHandlers)[@(msgId)] = [publishHandler copy]; } else { [self.publishHandlers removeObjectForKey:@(msgId)]; } if ((flow.commandType).intValue == MQTTPublish) { DDLogVerbose(@"[MQTTSession] PUBLISH %d", msgId); if (![self encode:msg]) { DDLogInfo(@"[MQTTSession] queueing message %d after unsuccessfull attempt", msgId); flow.commandType = [NSNumber numberWithUnsignedInt:MQTT_None]; flow.deadline = [NSDate date]; [self.persistence sync]; } } else { DDLogInfo(@"[MQTTSession] queueing message %d", msgId); } } } [self tell]; return msgId; } - (void)closeWithDisconnectHandler:(MQTTDisconnectHandler)disconnectHandler { [self closeWithReturnCode:MQTTSuccess sessionExpiryInterval:nil reasonString:nil userProperty:nil disconnectHandler:disconnectHandler]; } - (void)closeWithReturnCode:(MQTTReturnCode)returnCode sessionExpiryInterval:(NSNumber *)sessionExpiryInterval reasonString:(NSString *)reasonString userProperty:(NSDictionary *)userProperty disconnectHandler:(MQTTDisconnectHandler)disconnectHandler { DDLogVerbose(@"[MQTTSession] closeWithDisconnectHandler:%p ", disconnectHandler); self.disconnectHandler = disconnectHandler; if (self.status == MQTTSessionStatusConnected) { [self disconnectWithReturnCode:returnCode sessionExpiryInterval:sessionExpiryInterval reasonString:reasonString userProperty:userProperty]; } else { [self closeInternal]; } } - (void)disconnect { [self disconnectWithReturnCode:MQTTSuccess sessionExpiryInterval:nil reasonString:nil userProperty:nil]; } - (void)disconnectWithReturnCode:(MQTTReturnCode)returnCode sessionExpiryInterval:(NSNumber *)sessionExpiryInterval reasonString:(NSString *)reasonString userProperty:(NSDictionary *)userProperty { DDLogVerbose(@"[MQTTSession] sending DISCONNECT"); self.status = MQTTSessionStatusDisconnecting; [self encode:[MQTTMessage disconnectMessage:self.protocolLevel returnCode:returnCode sessionExpiryInterval:sessionExpiryInterval reasonString:reasonString userProperty:userProperty]]; [self closeInternal]; } - (void)closeInternal { DDLogVerbose(@"[MQTTSession] closeInternal"); if (self.checkDupTimer) { [self.checkDupTimer invalidate]; self.checkDupTimer = nil; } if (self.keepAliveTimer) { [self.keepAliveTimer invalidate]; self.keepAliveTimer = nil; } if (self.transport) { [self.transport close]; self.transport.delegate = nil; } if(self.decoder){ [self.decoder close]; self.decoder.delegate = nil; } NSArray *flows = [self.persistence allFlowsforClientId:self.clientId incomingFlag:NO]; for (id flow in flows) { switch ((flow.commandType).intValue) { case MQTTPublish: case MQTTPubrel: flow.deadline = [flow.deadline dateByAddingTimeInterval:-self.dupTimeout]; [self.persistence sync]; break; } } self.status = MQTTSessionStatusClosed; if ([self.delegate respondsToSelector:@selector(handleEvent:event:error:)]) { [self.delegate handleEvent:self event:MQTTSessionEventConnectionClosed error:nil]; } if ([self.delegate respondsToSelector:@selector(connectionClosed:)]) { [self.delegate connectionClosed:self]; } NSError *error = [NSError errorWithDomain:MQTTSessionErrorDomain code:MQTTSessionErrorNoResponse userInfo:@{NSLocalizedDescriptionKey : @"No response"}]; NSArray *allSubscribeHandlers = self.subscribeHandlers.allValues; [self.subscribeHandlers removeAllObjects]; for (MQTTSubscribeHandler subscribeHandler in allSubscribeHandlers) { subscribeHandler(error, nil); } NSArray *allUnsubscribeHandlers = self.unsubscribeHandlers.allValues; [self.unsubscribeHandlers removeAllObjects]; for (MQTTUnsubscribeHandler unsubscribeHandler in allUnsubscribeHandlers) { unsubscribeHandler(error); } MQTTDisconnectHandler disconnectHandler = self.disconnectHandler; if (disconnectHandler) { self.disconnectHandler = nil; disconnectHandler(nil); } [self tell]; self.synchronPub = FALSE; self.synchronPubMid = 0; self.synchronSub = FALSE; self.synchronSubMid = 0; self.synchronUnsub = FALSE; self.synchronUnsubMid = 0; } - (void)keepAlive { DDLogVerbose(@"[MQTTSession] keepAlive %@ @%.0f", self.clientId, [[NSDate date] timeIntervalSince1970]); (void)[self encode:[MQTTMessage pingreqMessage]]; } - (void)checkDup { DDLogVerbose(@"[MQTTSession] checkDup %@ @%.0f", self.clientId, [[NSDate date] timeIntervalSince1970]); [self checkTxFlows]; } - (void)checkTxFlows { NSUInteger windowSize; MQTTMessage *message; if (self.status != MQTTSessionStatusConnected) { return; } NSArray *flows = [self.persistence allFlowsforClientId:self.clientId incomingFlag:NO]; windowSize = 0; message = nil; for (id flow in flows) { if ((flow.commandType).intValue != MQTT_None) { windowSize++; } } for (id flow in flows) { DDLogVerbose(@"[MQTTSession] %@ flow %@ %@ %@", self.clientId, flow.deadline, flow.commandType, flow.messageId); if ([flow.deadline compare:[NSDate date]] == NSOrderedAscending) { switch ((flow.commandType).intValue) { case 0: if (windowSize <= self.persistence.maxWindowSize) { DDLogVerbose(@"[MQTTSession] PUBLISH queued message %@", flow.messageId); message = [MQTTMessage publishMessageWithData:flow.data onTopic:flow.topic qos:(flow.qosLevel).intValue msgId:(flow.messageId).intValue retainFlag:(flow.retainedFlag).boolValue dupFlag:NO protocolLevel:self.protocolLevel payloadFormatIndicator:nil publicationExpiryInterval:nil topicAlias:nil responseTopic:nil correlationData:nil userProperty:nil contentType:nil]; if ([self encode:message]) { flow.commandType = @(MQTTPublish); flow.deadline = [NSDate dateWithTimeIntervalSinceNow:self.dupTimeout]; [self.persistence sync]; windowSize++; } } break; case MQTTPublish: DDLogInfo(@"[MQTTSession] resend PUBLISH %@", flow.messageId); message = [MQTTMessage publishMessageWithData:flow.data onTopic:flow.topic qos:(flow.qosLevel).intValue msgId:(flow.messageId).intValue retainFlag:(flow.retainedFlag).boolValue dupFlag:YES protocolLevel:self.protocolLevel payloadFormatIndicator:nil publicationExpiryInterval:nil topicAlias:nil responseTopic:nil correlationData:nil userProperty:nil contentType:nil]; if ([self encode:message]) { flow.deadline = [NSDate dateWithTimeIntervalSinceNow:self.dupTimeout]; [self.persistence sync]; } break; case MQTTPubrel: DDLogInfo(@"[MQTTSession] resend PUBREL %@", flow.messageId); message = [MQTTMessage pubrelMessageWithMessageId:(flow.messageId).intValue protocolLevel:self.protocolLevel returnCode:MQTTSuccess reasonString:nil userProperty:nil]; if ([self encode:message]) { flow.deadline = [NSDate dateWithTimeIntervalSinceNow:self.dupTimeout]; [self.persistence sync]; } break; default: break; } } } } - (void)decoder:(MQTTDecoder *)sender handleEvent:(MQTTDecoderEvent)eventCode error:(NSError *)error { __unused NSArray *events = @[ @"MQTTDecoderEventProtocolError", @"MQTTDecoderEventConnectionClosed", @"MQTTDecoderEventConnectionError" ]; DDLogVerbose(@"[MQTTSession] decoder handleEvent: %@ (%d) %@", events[eventCode % [events count]], eventCode, [error description]); switch (eventCode) { case MQTTDecoderEventConnectionClosed: [self error:MQTTSessionEventConnectionClosedByBroker error:error]; break; case MQTTDecoderEventConnectionError: [self connectionError:error]; break; case MQTTDecoderEventProtocolError: [self protocolError:error]; break; } MQTTConnectHandler connectHandler = self.connectHandler; if (connectHandler) { self.connectHandler = nil; [self onConnect:connectHandler error:error]; } } - (void)decoder:(MQTTDecoder *)sender didReceiveMessage:(NSData *)data { MQTTMessage *message = [MQTTMessage messageFromData:data protocolLevel:self.protocolLevel]; if (!message) { DDLogError(@"[MQTTSession] MQTT illegal message received"); NSError * error = [NSError errorWithDomain:MQTTSessionErrorDomain code:MQTTSessionErrorIllegalMessageReceived userInfo:@{NSLocalizedDescriptionKey : @"MQTT illegal message received"}]; [self protocolError:error]; return; } @synchronized(sender) { if ([self.delegate respondsToSelector:@selector(received:type:qos:retained:duped:mid:data:)]) { [self.delegate received:self type:message.type qos:message.qos retained:message.retainFlag duped:message.dupFlag mid:message.mid data:message.data]; } if ([self.delegate respondsToSelector:@selector(ignoreReceived:type:qos:retained:duped:mid:data:)]) { if ([self.delegate ignoreReceived:self type:message.type qos:message.qos retained:message.retainFlag duped:message.dupFlag mid:message.mid data:message.data]) { return; } } switch (self.status) { case MQTTSessionStatusConnecting: switch (message.type) { case MQTTConnack: if ((self.protocolLevel == MQTTProtocolVersion50 && message.data.length < 3) || (self.protocolLevel != MQTTProtocolVersion50 && message.data.length != 2)) { NSError *error = [NSError errorWithDomain:MQTTSessionErrorDomain code:MQTTSessionErrorInvalidConnackReceived userInfo:@{NSLocalizedDescriptionKey : @"MQTT protocol CONNACK expected"}]; [self protocolError:error]; MQTTConnectHandler connectHandler = self.connectHandler; if (connectHandler) { self.connectHandler = nil; [self onConnect:connectHandler error:error]; } } else { if (message.returnCode && (message.returnCode).intValue == MQTTSuccess) { self.status = MQTTSessionStatusConnected; if (message.connectAcknowledgeFlags && ((message.connectAcknowledgeFlags).unsignedIntValue & 0x01) == 0x01) { self.sessionPresent = true; } else { self.sessionPresent = false; } __weak typeof(self) weakSelf = self; self.checkDupTimer = [GCDTimer scheduledTimerWithTimeInterval:DUPLOOP repeats:YES queue:self.queue block:^{ [weakSelf checkDup]; }]; [self checkDup]; if (message.properties) { self.serverKeepAlive = message.properties.serverKeepAlive; } if (self.serverKeepAlive) { self.effectiveKeepAlive = (self.serverKeepAlive).unsignedShortValue; } else { self.effectiveKeepAlive = self.keepAliveInterval; } if (self.effectiveKeepAlive > 0) { self.keepAliveTimer = [GCDTimer scheduledTimerWithTimeInterval:self.effectiveKeepAlive repeats:YES queue: self.queue block:^() { [weakSelf keepAlive]; }]; } if ([self.delegate respondsToSelector:@selector(handleEvent:event:error:)]) { [self.delegate handleEvent:self event:MQTTSessionEventConnected error:nil]; } if ([self.delegate respondsToSelector:@selector(connected:)]) { [self.delegate connected:self]; } if ([self.delegate respondsToSelector:@selector(connected:sessionPresent:)]) { [self.delegate connected:self sessionPresent:self.sessionPresent]; } if (self.connectionHandler) { self.connectionHandler(MQTTSessionEventConnected); } MQTTConnectHandler connectHandler = self.connectHandler; if (connectHandler) { self.connectHandler = nil; [self onConnect:connectHandler error:nil]; } } else { NSString *errorDescription = @"unknown"; NSInteger errorCode = 0; if (message.returnCode) { switch ((message.returnCode).intValue) { case 1: errorDescription = @"MQTT CONNACK: unacceptable protocol version"; errorCode = MQTTSessionErrorConnackUnacceptableProtocolVersion; break; case 2: errorDescription = @"MQTT CONNACK: identifier rejected"; errorCode = MQTTSessionErrorConnackIdentifierRejected; break; case 3: errorDescription = @"MQTT CONNACK: server unavailable"; errorCode = MQTTSessionErrorConnackServeUnavailable; break; case 4: errorDescription = @"MQTT CONNACK: bad user name or password"; errorCode = MQTTSessionErrorConnackBadUsernameOrPassword; break; case 5: errorDescription = @"MQTT CONNACK: not authorized"; errorCode = MQTTSessionErrorConnackNotAuthorized; break; default: errorDescription = @"MQTT CONNACK: reserved for future use"; errorCode = MQTTSessionErrorConnackReserved; break; } } NSError *error = [NSError errorWithDomain:MQTTSessionErrorDomain code:errorCode userInfo:@{NSLocalizedDescriptionKey : errorDescription}]; [self error:MQTTSessionEventConnectionRefused error:error]; if ([self.delegate respondsToSelector:@selector(connectionRefused:error:)]) { [self.delegate connectionRefused:self error:error]; } MQTTConnectHandler connectHandler = self.connectHandler; if (connectHandler) { self.connectHandler = nil; [self onConnect:connectHandler error:error]; } } self.synchronConnect = FALSE; } break; case MQTTDisconnect: { NSError *error = [NSError errorWithDomain:MQTTSessionErrorDomain code:(message.returnCode).intValue userInfo:@{NSLocalizedDescriptionKey : @"MQTT protocol DISCONNECT instead of CONNACK"}]; [self protocolError:error]; MQTTConnectHandler connectHandler = self.connectHandler; if (connectHandler) { self.connectHandler = nil; [self onConnect:connectHandler error:error]; } break; } default: { NSError * error = [NSError errorWithDomain:MQTTSessionErrorDomain code:MQTTSessionErrorNoConnackReceived userInfo:@{NSLocalizedDescriptionKey : @"MQTT protocol no CONNACK"}]; [self protocolError:error]; MQTTConnectHandler connectHandler = self.connectHandler; if (connectHandler) { self.connectHandler = nil; [self onConnect:connectHandler error:error]; } break; } } break; case MQTTSessionStatusConnected: switch (message.type) { case MQTTPublish: [self handlePublish:message]; break; case MQTTPuback: [self handlePuback:message]; break; case MQTTPubrec: [self handlePubrec:message]; break; case MQTTPubrel: [self handlePubrel:message]; break; case MQTTPubcomp: [self handlePubcomp:message]; break; case MQTTSuback: [self handleSuback:message]; break; case MQTTUnsuback: [self handleUnsuback:message]; break; case MQTTDisconnect: { NSError *error = [NSError errorWithDomain:MQTTSessionErrorDomain code:(message.returnCode).intValue userInfo:@{NSLocalizedDescriptionKey : @"MQTT protocol DISCONNECT received"}]; [self protocolError:error]; } default: break; } break; default: DDLogError(@"[MQTTSession] other state"); break; } } } - (void)handlePublish:(MQTTMessage*)msg { NSData *data = msg.data; if (data.length < 2) { return; } UInt8 const *bytes = data.bytes; UInt16 topicLength = 256 * bytes[0] + bytes[1]; if (data.length < 2 + topicLength) { return; } NSData *topicData = [data subdataWithRange:NSMakeRange(2, topicLength)]; NSString *topic = [[NSString alloc] initWithData:topicData encoding:NSUTF8StringEncoding]; if (!topic) { topic = [[NSString alloc] initWithData:topicData encoding:NSISOLatin1StringEncoding]; DDLogError(@"non UTF8 topic %@", topic); } NSRange range = NSMakeRange(2 + topicLength, data.length - topicLength - 2); data = [data subdataWithRange:range]; if (msg.qos == 0) { if (self.protocolLevel == MQTTProtocolVersion50) { int propertiesLength = [MQTTProperties getVariableLength:data]; int variableLength = [MQTTProperties variableIntLength:propertiesLength]; msg.properties = [[MQTTProperties alloc] initFromData:data]; NSRange range = NSMakeRange(variableLength + propertiesLength, data.length - variableLength - propertiesLength); data = [data subdataWithRange:range]; } if ([self.delegate respondsToSelector:@selector(newMessage:data:onTopic:qos:retained:mid:)]) { [self.delegate newMessage:self data:data onTopic:topic qos:msg.qos retained:msg.retainFlag mid:0]; } if ([self.delegate respondsToSelector:@selector(newMessageWithFeedback:data:onTopic:qos:retained:mid:)]) { [self.delegate newMessageWithFeedback:self data:data onTopic:topic qos:msg.qos retained:msg.retainFlag mid:0]; } if (self.messageHandler) { self.messageHandler(data, topic); } } else { if (data.length >= 2) { bytes = data.bytes; UInt16 msgId = 256 * bytes[0] + bytes[1]; msg.mid = msgId; data = [data subdataWithRange:NSMakeRange(2, data.length - 2)]; if (msg.qos == 1) { if (self.protocolLevel == MQTTProtocolVersion50) { int propertiesLength = [MQTTProperties getVariableLength:data]; int variableLength = [MQTTProperties variableIntLength:propertiesLength]; msg.properties = [[MQTTProperties alloc] initFromData:data]; NSRange range = NSMakeRange(variableLength + propertiesLength, data.length - variableLength - propertiesLength); data = [data subdataWithRange:range]; } BOOL processed = true; if ([self.delegate respondsToSelector:@selector(newMessage:data:onTopic:qos:retained:mid:)]) { [self.delegate newMessage:self data:data onTopic:topic qos:msg.qos retained:msg.retainFlag mid:msgId]; } if ([self.delegate respondsToSelector:@selector(newMessageWithFeedback:data:onTopic:qos:retained:mid:)]) { processed = [self.delegate newMessageWithFeedback:self data:data onTopic:topic qos:msg.qos retained:msg.retainFlag mid:msgId]; } if (self.messageHandler) { self.messageHandler(data, topic); } if (processed) { (void)[self encode:[MQTTMessage pubackMessageWithMessageId:msgId protocolLevel:self.protocolLevel returnCode:MQTTSuccess reasonString:nil userProperty:nil]]; } return; } else { if (![self.persistence storeMessageForClientId:self.clientId topic:topic data:data retainFlag:msg.retainFlag qos:msg.qos msgId:msgId incomingFlag:YES commandType:MQTTPubrec deadline:[NSDate dateWithTimeIntervalSinceNow:self.dupTimeout]]) { DDLogWarn(@"[MQTTSession] dropping incoming messages"); } else { [self.persistence sync]; [self tell]; (void)[self encode:[MQTTMessage pubrecMessageWithMessageId:msgId protocolLevel:self.protocolLevel returnCode:MQTTSuccess reasonString:nil userProperty:nil]]; } } } } } - (void)handlePuback:(MQTTMessage*)msg { id flow = [self.persistence flowforClientId:self.clientId incomingFlag:NO messageId:msg.mid]; if (flow) { if ((flow.commandType).intValue == MQTTPublish && (flow.qosLevel).intValue == MQTTQosLevelAtLeastOnce) { if ([self.delegate respondsToSelector:@selector(messageDelivered:msgID:)]) { [self.delegate messageDelivered:self msgID:msg.mid]; } if ([self.delegate respondsToSelector:@selector(messageDelivered:msgID:topic:data:qos:retainFlag:)]) { [self.delegate messageDelivered:self msgID:msg.mid topic:flow.topic data:flow.data qos:(flow.qosLevel).intValue retainFlag:(flow.retainedFlag).boolValue]; } if (self.synchronPub && self.synchronPubMid == msg.mid) { self.synchronPub = FALSE; } MQTTPublishHandler publishHandler = (self.publishHandlers)[@(msg.mid)]; if (publishHandler) { [self.publishHandlers removeObjectForKey:@(msg.mid)]; [self onPublish:publishHandler error:nil]; } [self.persistence deleteFlow:flow]; [self.persistence sync]; [self tell]; } } } - (void)handleSuback:(MQTTMessage*)msg { if (msg.data.length >= 3) { UInt8 const *bytes = msg.data.bytes; UInt16 messageId = (256 * bytes[0] + bytes[1]); msg.mid = messageId; NSMutableArray *qoss = [[NSMutableArray alloc] init]; for (int i = 2; i < msg.data.length; i++) { [qoss addObject:@(bytes[i])]; } if ([self.delegate respondsToSelector:@selector(subAckReceived:msgID:grantedQoss:)]) { [self.delegate subAckReceived:self msgID:msg.mid grantedQoss:qoss]; } if (self.synchronSub && self.synchronSubMid == msg.mid) { self.synchronSub = FALSE; } MQTTSubscribeHandler subscribeHandler = (self.subscribeHandlers)[@(msg.mid)]; if (subscribeHandler) { [self.subscribeHandlers removeObjectForKey:@(msg.mid)]; [self onSubscribe:subscribeHandler error:nil gQoss:qoss]; } } } - (void)handleUnsuback:(MQTTMessage *)message { if ([self.delegate respondsToSelector:@selector(unsubAckReceived:msgID:)]) { [self.delegate unsubAckReceived:self msgID:message.mid]; } if (self.synchronUnsub && self.synchronUnsubMid == message.mid) { self.synchronUnsub = FALSE; } MQTTUnsubscribeHandler unsubscribeHandler = (self.unsubscribeHandlers)[@(message.mid)]; if (unsubscribeHandler) { [self.unsubscribeHandlers removeObjectForKey:@(message.mid)]; [self onUnsubscribe:unsubscribeHandler error:nil]; } } - (void)handlePubrec:(MQTTMessage *)message { MQTTMessage *pubrelmessage = [MQTTMessage pubrelMessageWithMessageId:message.mid protocolLevel:self.protocolLevel returnCode:MQTTSuccess reasonString:nil userProperty:nil]; id flow = [self.persistence flowforClientId:self.clientId incomingFlag:NO messageId:message.mid]; if (flow) { if ((flow.commandType).intValue == MQTTPublish && (flow.qosLevel).intValue == MQTTQosLevelExactlyOnce) { flow.commandType = @(MQTTPubrel); flow.deadline = [NSDate dateWithTimeIntervalSinceNow:self.dupTimeout]; [self.persistence sync]; } } (void)[self encode:pubrelmessage]; } - (void)handlePubrel:(MQTTMessage *)message { id flow = [self.persistence flowforClientId:self.clientId incomingFlag:YES messageId:message.mid]; if (flow) { BOOL processed = true; NSData *data = flow.data; if (self.protocolLevel == MQTTProtocolVersion50) { int propertiesLength = [MQTTProperties getVariableLength:data]; int variableLength = [MQTTProperties variableIntLength:propertiesLength]; NSRange range = NSMakeRange(variableLength + propertiesLength, data.length - variableLength - propertiesLength); data = [data subdataWithRange:range]; } if ([self.delegate respondsToSelector:@selector(newMessage:data:onTopic:qos:retained:mid:)]) { [self.delegate newMessage:self data:data onTopic:flow.topic qos:(flow.qosLevel).intValue retained:(flow.retainedFlag).boolValue mid:(flow.messageId).intValue ]; } if ([self.delegate respondsToSelector:@selector(newMessageWithFeedback:data:onTopic:qos:retained:mid:)]) { processed = [self.delegate newMessageWithFeedback:self data:data onTopic:flow.topic qos:(flow.qosLevel).intValue retained:(flow.retainedFlag).boolValue mid:(flow.messageId).intValue ]; } if(self.messageHandler){ self.messageHandler(flow.data, flow.topic); } if (processed) { [self.persistence deleteFlow:flow]; [self.persistence sync]; [self tell]; (void)[self encode:[MQTTMessage pubcompMessageWithMessageId:message.mid protocolLevel:self.protocolLevel returnCode:MQTTSuccess reasonString:nil userProperty:nil]]; } } } - (void)handlePubcomp:(MQTTMessage *)message { id flow = [self.persistence flowforClientId:self.clientId incomingFlag:NO messageId:message.mid]; if (flow && (flow.commandType).intValue == MQTTPubrel) { if ([self.delegate respondsToSelector:@selector(messageDelivered:msgID:)]) { [self.delegate messageDelivered:self msgID:message.mid]; } if ([self.delegate respondsToSelector:@selector(messageDelivered:msgID:topic:data:qos:retainFlag:)]) { [self.delegate messageDelivered:self msgID:message.mid topic:flow.topic data:flow.data qos:(flow.qosLevel).intValue retainFlag:(flow.retainedFlag).boolValue]; } if (self.synchronPub && self.synchronPubMid == message.mid) { self.synchronPub = FALSE; } MQTTPublishHandler publishHandler = (self.publishHandlers)[@(message.mid)]; if (publishHandler) { [self.publishHandlers removeObjectForKey:@(message.mid)]; [self onPublish:publishHandler error:nil]; } [self.persistence deleteFlow:flow]; [self.persistence sync]; [self tell]; } } - (void)connectionError:(NSError *)error { [self error:MQTTSessionEventConnectionError error:error]; if ([self.delegate respondsToSelector:@selector(connectionError:error:)]) { [self.delegate connectionError:self error:error]; } if (self.connectHandler) { MQTTConnectHandler connectHandler = self.connectHandler; self.connectHandler = nil; [self onConnect:connectHandler error:error]; } } - (void)protocolError:(NSError *)error { [self error:MQTTSessionEventProtocolError error:error]; if ([self.delegate respondsToSelector:@selector(protocolError:error:)]) { [self.delegate protocolError:self error:error]; } } - (void)error:(MQTTSessionEvent)eventCode error:(NSError *)error { self.status = MQTTSessionStatusError; if ([self.delegate respondsToSelector:@selector(handleEvent:event:error:)]) { [self.delegate handleEvent:self event:eventCode error:error]; } [self closeInternal]; if(self.connectionHandler){ self.connectionHandler(eventCode); } if (eventCode == MQTTSessionEventConnectionClosedByBroker && self.connectHandler) { error = [NSError errorWithDomain:MQTTSessionErrorDomain code:MQTTSessionErrorConnectionRefused userInfo:@{NSLocalizedDescriptionKey : @"Server has closed connection without connack."}]; MQTTConnectHandler connectHandler = self.connectHandler; self.connectHandler = nil; [self onConnect:connectHandler error:error]; } self.synchronPub = FALSE; self.synchronPubMid = 0; self.synchronSub = FALSE; self.synchronSubMid = 0; self.synchronUnsub = FALSE; self.synchronUnsubMid = 0; self.synchronConnect = FALSE; self.synchronDisconnect = FALSE; } - (UInt16)nextMsgId { DDLogVerbose(@"nextMsgId synchronizing"); @synchronized(self) { DDLogVerbose(@"nextMsgId synchronized"); self.txMsgId++; while (self.txMsgId == 0 || [self.persistence flowforClientId:self.clientId incomingFlag:NO messageId:self.txMsgId] != nil) { self.txMsgId++; } DDLogVerbose(@"nextMsgId synchronized done"); return self.txMsgId; } } - (void)tell { NSUInteger incoming = [self.persistence allFlowsforClientId:self.clientId incomingFlag:YES].count; NSUInteger outflowing = [self.persistence allFlowsforClientId:self.clientId incomingFlag:NO].count; if ([self.delegate respondsToSelector:@selector(buffered:flowingIn:flowingOut:)]) { [self.delegate buffered:self flowingIn:incoming flowingOut:outflowing]; } if ([self.delegate respondsToSelector:@selector(buffered:queued:flowingIn:flowingOut:)]) { [self.delegate buffered:self queued:0 flowingIn:incoming flowingOut:outflowing]; } } - (void)onConnect:(MQTTConnectHandler)connectHandler error:(NSError *)error { connectHandler(error); } - (void)onDisconnect:(MQTTDisconnectHandler)disconnectHandler error:(NSError *)error { disconnectHandler(error); } - (void)onSubscribe:(MQTTSubscribeHandler)subscribeHandler error:(NSError *)error gQoss:(NSArray *)gqoss { subscribeHandler(error, gqoss); } - (void)onUnsubscribe:(MQTTUnsubscribeHandler)unsubscribeHandler error:(NSError *)error { unsubscribeHandler(error); } - (void)onPublish:(MQTTPublishHandler)publishHandler error:(NSError *)error { publishHandler(error); } #pragma mark - MQTTTransport interface - (void)connect { if (MQTTStrict.strict && self.clientId && self.clientId.length < 1 && !self.cleanSessionFlag) { NSException* myException = [NSException exceptionWithName:@"clientId must be at least 1 character long if cleanSessionFlag is off" reason:[NSString stringWithFormat:@"clientId length = %lu", (unsigned long)[self.clientId dataUsingEncoding:NSUTF8StringEncoding].length] userInfo:nil]; @throw myException; } if (MQTTStrict.strict && !self.clientId) { NSException* myException = [NSException exceptionWithName:@"clientId must not be nil" reason:[NSString stringWithFormat:@"clientId length = %lu", (unsigned long)[self.clientId dataUsingEncoding:NSUTF8StringEncoding].length] userInfo:nil]; @throw myException; } if (MQTTStrict.strict && [self.clientId dataUsingEncoding:NSUTF8StringEncoding].length > 65535L) { NSException* myException = [NSException exceptionWithName:@"clientId may not be longer than 65535 bytes in UTF8 representation" reason:[NSString stringWithFormat:@"clientId length = %lu", (unsigned long)[self.clientId dataUsingEncoding:NSUTF8StringEncoding].length] userInfo:nil]; @throw myException; } if (MQTTStrict.strict && ![self.clientId dataUsingEncoding:NSUTF8StringEncoding]) { NSException* myException = [NSException exceptionWithName:@"clientId must not contain non-UTF8 characters" reason:[NSString stringWithFormat:@"clientId = %@", self.clientId] userInfo:nil]; @throw myException; } if (MQTTStrict.strict && [self.userName dataUsingEncoding:NSUTF8StringEncoding].length > 65535L) { NSException* myException = [NSException exceptionWithName:@"userName may not be longer than 65535 bytes in UTF8 representation" reason:[NSString stringWithFormat:@"userName length = %lu", (unsigned long)[self.userName dataUsingEncoding:NSUTF8StringEncoding].length] userInfo:nil]; @throw myException; } if (MQTTStrict.strict && ![self.userName dataUsingEncoding:NSUTF8StringEncoding]) { NSException* myException = [NSException exceptionWithName:@"userName must not contain non-UTF8 characters" reason:[NSString stringWithFormat:@"userName = %@", self.userName] userInfo:nil]; @throw myException; } if (MQTTStrict.strict && !self.userName) { NSException* myException = [NSException exceptionWithName:@"password specified without userName" reason:[NSString stringWithFormat:@"password = %@", self.password] userInfo:nil]; @throw myException; } if (MQTTStrict.strict && self.protocolLevel != MQTTProtocolVersion31 && self.protocolLevel != MQTTProtocolVersion311 && self.protocolLevel != MQTTProtocolVersion50) { NSException* myException = [NSException exceptionWithName:@"Illegal protocolLevel" reason:[NSString stringWithFormat:@"%d is not 3, 4, or 5", self.protocolLevel] userInfo:nil]; @throw myException; } if (MQTTStrict.strict && !self.willFlag && self.willTopic) { NSException* myException = [NSException exceptionWithName:@"Will topic must be nil if willFlag is false" reason:[NSString stringWithFormat:@"%@", self.willTopic] userInfo:nil]; @throw myException; } if (MQTTStrict.strict && !self.willFlag && self.willMsg) { NSException* myException = [NSException exceptionWithName:@"Will message must be nil if willFlag is false" reason:[NSString stringWithFormat:@"%@", self.willMsg] userInfo:nil]; @throw myException; } if (MQTTStrict.strict && !self.willFlag && self.willRetainFlag) { NSException* myException = [NSException exceptionWithName:@"Will retain must be false if willFlag is false" reason:[NSString stringWithFormat:@"%d", self.willRetainFlag] userInfo:nil]; @throw myException; } if (MQTTStrict.strict && !self.willFlag && self.willQoS != MQTTQosLevelAtMostOnce) { NSException* myException = [NSException exceptionWithName:@"Will QoS Level must be 0 if willFlag is false" reason:[NSString stringWithFormat:@"%d", self.willQoS] userInfo:nil]; @throw myException; } if (MQTTStrict.strict && self.willQoS != MQTTQosLevelAtMostOnce && self.willQoS != MQTTQosLevelAtLeastOnce && self.willQoS != MQTTQosLevelExactlyOnce) { NSException* myException = [NSException exceptionWithName:@"Illegal will QoS level" reason:[NSString stringWithFormat:@"%d is not 0, 1, or 2", self.willQoS] userInfo:nil]; @throw myException; } if (MQTTStrict.strict && self.willFlag && !self.willTopic) { NSException* myException = [NSException exceptionWithName:@"Will topic must not be nil if willFlag is true" reason:[NSString stringWithFormat:@"%@", self.willTopic] userInfo:nil]; @throw myException; } if (MQTTStrict.strict && self.willTopic && self.willTopic.length < 1) { NSException* myException = [NSException exceptionWithName:@"Will topic must be at least 1 character long" reason:[NSString stringWithFormat:@"%@", self.willTopic] userInfo:nil]; @throw myException; } if (MQTTStrict.strict && self.willTopic && [self.willTopic dataUsingEncoding:NSUTF8StringEncoding].length > 65535L) { NSException* myException = [NSException exceptionWithName:@"willTopic may not be longer than 65535 bytes in UTF8 representation" reason:[NSString stringWithFormat:@"willTopic length = %lu", (unsigned long)[self.willTopic dataUsingEncoding:NSUTF8StringEncoding].length] userInfo:nil]; @throw myException; } if (MQTTStrict.strict && self.willTopic && ![self.willTopic dataUsingEncoding:NSUTF8StringEncoding]) { NSException* myException = [NSException exceptionWithName:@"willTopic must not contain non-UTF8 characters" reason:[NSString stringWithFormat:@"willTopic = %@", self.willTopic] userInfo:nil]; @throw myException; } if (MQTTStrict.strict && self.willTopic && ([self.willTopic containsString:@"+"] || [self.willTopic containsString:@"#"]) ) { NSException* myException = [NSException exceptionWithName:@"willTopic must not contain wildcards" reason:[NSString stringWithFormat:@"willTopic = %@", self.self.willTopic] userInfo:nil]; @throw myException; } if (MQTTStrict.strict && self.willFlag && !self.willMsg) { NSException* myException = [NSException exceptionWithName:@"Will message must not be nil if willFlag is true" reason:[NSString stringWithFormat:@"%@", self.willMsg] userInfo:nil]; @throw myException; } DDLogVerbose(@"[MQTTSession] connecting"); if (self.cleanSessionFlag) { [self.persistence deleteAllFlowsForClientId:self.clientId]; [self.subscribeHandlers removeAllObjects]; [self.unsubscribeHandlers removeAllObjects]; [self.publishHandlers removeAllObjects]; } [self tell]; self.status = MQTTSessionStatusConnecting; self.decoder = [[MQTTDecoder alloc] init]; self.decoder.queue = self.queue; self.decoder.delegate = self; [self.decoder open]; self.transport.delegate = self; [self.transport open]; } - (void)connectWithConnectHandler:(MQTTConnectHandler)connectHandler { DDLogVerbose(@"[MQTTSession] connectWithConnectHandler:%p", connectHandler); self.connectHandler = connectHandler; [self connect]; } - (BOOL)encode:(MQTTMessage *)message { if (message) { NSData *wireFormat = message.wireFormat; if (wireFormat) { if (self.delegate) { if ([self.delegate respondsToSelector:@selector(sending:type:qos:retained:duped:mid:data:)]) { [self.delegate sending:self type:message.type qos:message.qos retained:message.retainFlag duped:message.dupFlag mid:message.mid data:message.data]; } } DDLogVerbose(@"[MQTTSession] mqttTransport send"); return [self.transport send:wireFormat]; } else { DDLogError(@"[MQTTSession] trying to send message without wire format"); return false; } } else { DDLogError(@"[MQTTSession] trying to send nil message"); return false; } } #pragma mark - MQTTTransport delegate - (void)mqttTransport:(id)mqttTransport didReceiveMessage:(NSData *)message { DDLogVerbose(@"[MQTTSession] mqttTransport didReceiveMessage"); [self.decoder decodeMessage:message]; } - (void)mqttTransportDidClose:(id)mqttTransport { DDLogVerbose(@"[MQTTSession] mqttTransport mqttTransportDidClose"); [self error:MQTTSessionEventConnectionClosedByBroker error:nil]; } - (void)mqttTransportDidOpen:(id)mqttTransport { DDLogVerbose(@"[MQTTSession] mqttTransportDidOpen"); DDLogVerbose(@"[MQTTSession] sending CONNECT"); if (!self.connectMessage) { (void)[self encode:[MQTTMessage connectMessageWithClientId:self.clientId userName:self.userName password:self.password keepAlive:self.keepAliveInterval cleanSession:self.cleanSessionFlag will:self.willFlag willTopic:self.willTopic willMsg:self.willMsg willQoS:self.willQoS willRetain:self.willRetainFlag protocolLevel:self.protocolLevel sessionExpiryInterval:self.sessionExpiryInterval authMethod:self.authMethod authData:self.authData requestProblemInformation:self.requestProblemInformation willDelayInterval:self.willDelayInterval requestResponseInformation:self.requestResponseInformation receiveMaximum:self.receiveMaximum topicAliasMaximum:self.topicAliasMaximum userProperty:self.userProperty maximumPacketSize:self.maximumPacketSize]]; } else { (void)[self encode:self.connectMessage]; } } - (void)mqttTransport:(id)mqttTransport didFailWithError:(NSError *)error { DDLogWarn(@"[MQTTSession] mqttTransport didFailWithError %@", error); [self connectionError:error]; } @end