//
|
// MQTTCFSocketEncoder.m
|
// MQTTClient.framework
|
//
|
// Copyright © 2013-2017, Christoph Krey. All rights reserved.
|
//
|
|
#import "MQTTCFSocketEncoder.h"
|
|
#import "MQTTLog.h"
|
|
@interface MQTTCFSocketEncoder()
|
|
@property (strong, nonatomic) NSMutableData *buffer;
|
|
@end
|
|
@implementation MQTTCFSocketEncoder
|
|
- (instancetype)init {
|
self = [super init];
|
self.state = MQTTCFSocketEncoderStateInitializing;
|
self.buffer = [[NSMutableData alloc] init];
|
self.stream = nil;
|
return self;
|
}
|
|
- (void)dealloc {
|
[self close];
|
}
|
|
- (void)open {
|
(self.stream).delegate = self;
|
[self.stream open];
|
}
|
|
- (void)close {
|
[self.stream close];
|
[self.stream setDelegate:nil];
|
}
|
|
- (void)setState:(MQTTCFSocketEncoderState)state {
|
DDLogVerbose(@"[MQTTCFSocketEncoder] setState %ld/%ld", (long)_state, (long)state);
|
_state = state;
|
}
|
|
- (void)stream:(NSStream *)sender handleEvent:(NSStreamEvent)eventCode {
|
if (eventCode & NSStreamEventOpenCompleted) {
|
DDLogVerbose(@"[MQTTCFSocketEncoder] NSStreamEventOpenCompleted");
|
}
|
if (eventCode & NSStreamEventHasBytesAvailable) {
|
DDLogVerbose(@"[MQTTCFSocketEncoder] NSStreamEventHasBytesAvailable");
|
}
|
if (eventCode & NSStreamEventHasSpaceAvailable) {
|
DDLogVerbose(@"[MQTTCFSocketEncoder] NSStreamEventHasSpaceAvailable");
|
if (self.state == MQTTCFSocketEncoderStateInitializing) {
|
self.state = MQTTCFSocketEncoderStateReady;
|
[self.delegate encoderDidOpen:self];
|
}
|
|
if (self.state == MQTTCFSocketEncoderStateReady) {
|
if (self.buffer.length) {
|
[self send:nil];
|
}
|
}
|
}
|
if (eventCode & NSStreamEventEndEncountered) {
|
DDLogVerbose(@"[MQTTCFSocketEncoder] NSStreamEventEndEncountered");
|
self.state = MQTTCFSocketEncoderStateInitializing;
|
self.error = nil;
|
[self.delegate encoderdidClose:self];
|
}
|
if (eventCode & NSStreamEventErrorOccurred) {
|
DDLogVerbose(@"[MQTTCFSocketEncoder] NSStreamEventErrorOccurred");
|
self.state = MQTTCFSocketEncoderStateError;
|
self.error = self.stream.streamError;
|
[self.delegate encoder:self didFailWithError:self.error];
|
}
|
}
|
|
- (BOOL)send:(NSData *)data {
|
@synchronized(self) {
|
if (self.state != MQTTCFSocketEncoderStateReady) {
|
DDLogInfo(@"[MQTTCFSocketEncoder] not MQTTCFSocketEncoderStateReady");
|
return NO;
|
}
|
|
if (data) {
|
[self.buffer appendData:data];
|
}
|
|
if (self.buffer.length) {
|
DDLogVerbose(@"[MQTTCFSocketEncoder] buffer to write (%lu)=%@...",
|
(unsigned long)self.buffer.length,
|
[self.buffer subdataWithRange:NSMakeRange(0, MIN(256, self.buffer.length))]);
|
|
NSInteger n = [self.stream write:self.buffer.bytes maxLength:self.buffer.length];
|
|
if (n == -1) {
|
DDLogVerbose(@"[MQTTCFSocketEncoder] streamError: %@", self.error);
|
self.state = MQTTCFSocketEncoderStateError;
|
self.error = self.stream.streamError;
|
return NO;
|
} else {
|
if (n < self.buffer.length) {
|
DDLogVerbose(@"[MQTTCFSocketEncoder] buffer partially written: %ld", (long)n);
|
}
|
[self.buffer replaceBytesInRange:NSMakeRange(0, n) withBytes:NULL length:0];
|
}
|
}
|
return YES;
|
}
|
}
|
|
@end
|