目录:
该系列文章预计包括:
- 1 - 简介
- 2 - 连接以及连接过程解析
- 3 - 发送订阅消息以及发送过程
- 4 - 各个类的解析
前言:
这里的代码是从MQTTSessionManager
为切入点进入的,所以下面的方法如果没有特殊描述都是从这个类然后进入内部。 主题 == topic
文章中可能出现多次,大家自己转化一下。
正文:
当你连接成功之后,就要开始着手发布消息和订阅消息。在发布和订阅之前呢,首先你得要有一个主题(topic
),不然你对谁发,一个topic
可以被多次订阅,比如iOS
设备订阅了,Android
设备也订阅了,这个时候用任意一端设备发送消息,其他一端都可以收到。所以这里你首先要知道主题(一般问服务器要)。
在上一个文章中,假设你没有在创建的时候就传入topic
,那么在发送数据之前你可以先订阅一下这个topic
,如果发送成功了,会收到一样的消息(这里笔者之前是调试用的时候用,当然你也可以不用)。
主题的格式:标记:级别
啥意思呢?
比如你的主题叫: "ijk-dr-rec" 级别(qos
)是:1
那么你给的参数就是:
OC: @{ @"ijk-dr-rec": @(1) }
Swift: ["ijk-dr-rec", NSNumber(value: 1)]
一般来说我们最少会有两个主题:一个主题用户发布数据,一个用于订阅信息
订阅主题:
通过对这个属性的赋值subscriptions
,然后监听set
方法,在set
方法内部处理
- (void)setSubscriptions:(NSDictionary<NSString *, NSNumber *> *)newSubscriptions {
// 判断连接上了没,没连接上就不订阅和取消订阅的操作
if (self.state == MQTTSessionManagerStateConnected) {
NSDictionary *currentSubscriptions = [self.effectiveSubscriptions copy];
// 拿到已有的topic,然后取消订阅
for (NSString *topicFilter in currentSubscriptions) {
if (!newSubscriptions[topicFilter]) {
__weak MQTTSessionManager *weakSelf = self;
/// 取消订阅
[self.session unsubscribeTopic:topicFilter unsubscribeHandler:^(NSError *error) {
// 如果取消成功了,就移除掉
MQTTSessionManager *strongSelf = weakSelf;
if (!error) {
/// 因为这里是异步的,所以锁起来了
[strongSelf.subscriptionLock lock];
NSMutableDictionary *newEffectiveSubscriptions = [strongSelf.subscriptions mutableCopy];
[newEffectiveSubscriptions removeObjectForKey:topicFilter];
strongSelf.effectiveSubscriptions = newEffectiveSubscriptions;
[strongSelf.subscriptionLock unlock];
}
}];
}
}
// 订阅新的主题,流程和上面一样,只不过一个是移除,一个是添加
for (NSString *topicFilter in newSubscriptions) {
if (!currentSubscriptions[topicFilter]) {
NSNumber *number = newSubscriptions[topicFilter];
MQTTQosLevel qos = number.unsignedIntValue;
__weak MQTTSessionManager *weakSelf = self;
[self.session subscribeToTopic:topicFilter atLevel:qos subscribeHandler:^(NSError *error, NSArray<NSNumber *> *gQoss) {
MQTTSessionManager *strongSelf = weakSelf;
if (!error) {
NSNumber *gQos = gQoss[0];
[strongSelf.subscriptionLock lock];
NSMutableDictionary *newEffectiveSubscriptions = [strongSelf.subscriptions mutableCopy];
newEffectiveSubscriptions[topicFilter] = gQos;
strongSelf.effectiveSubscriptions = newEffectiveSubscriptions;
[strongSelf.subscriptionLock unlock];
}
}];
}
}
}
// 把新的主题赋值给 internalSubscriptions
self.internalSubscriptions = newSubscriptions;
DDLogVerbose(@"MQTTSessionManager internalSubscriptions: %@", self.internalSubscriptions);
}
代码解释:
上面的方法中主要做的就是,取消订阅之前旧的主题,然后订阅新的主题,在把新的主题保存起来, 如果你的程序员中不能一开始就确定好主题数量,建议自己维护主题
发送消息:
终于到了这个令人愉快的时候, 调用- (UInt16)sendData:(NSData *)data topic:(NSString *)topic qos:(MQTTQosLevel)qos retain:(BOOL)retainFlag
- (UInt16)sendData:(NSData *)data topic:(NSString *)topic qos:(MQTTQosLevel)qos retain:(BOOL)retainFlag {
if (self.state != MQTTSessionManagerStateConnected) {
[self connectToLast:nil];
}
UInt16 msgId = [self.session publishData:data
onTopic:topic
retain:retainFlag
qos:qos];
return msgId;
}
data:表示发送数据
topic:表示主题
qos:传输的方式(三个级别)
retainFlag: 是否存到MQTT的消息队列上
可以看到内部做了如下处理:
- 1、判断是否在线,如果不在线,就先连接
- 2、调用
session
发送数据,并生产msgId
- 3、返回
msgId
接收消息(订阅):
1、首先要设置代理
manager.delegate = self;
2、实现代理方法
- (void)handleMessage:(NSData *)data onTopic:(NSString *)topic retained:(BOOL)retained {
NSLog(@"data - %@ topic - %@", data, topic);
}
可以看到在这个方法中,一共返回了三个参数
data
: 表示收到的数据
topic
: 表示主题
retained
:表示数据是否是服务器重新传输
到这里,我们就完成了发送和订阅消息,也了解了主题。
如果你只是需要发送和订阅,到这里就完成了(左/右上角X掉)
发送过程
首先manager
内部会只有一个对象,刚才发送、订阅、监听消息等一系列操作都是这个对象在做,这个对象就是session
回到发送的方法,我们会看到里面发送是这样子调用的(隐藏了一些不必要的代码
)
- (UInt16)sendData:(NSData *)data topic:(NSString *)topic qos:(MQTTQosLevel)qos retain:(BOOL)retainFlag {
....
UInt16 msgId = [self.session publishData:data
onTopic:topic
retain:retainFlag
qos:qos];
...
}
然后点击进去看看,究竟发生了什么!
宝宝不开心,发现他也只是调用其他的方法进行发送
- (UInt16)publishData:(NSData*)data
onTopic:(NSString*)topic
retain:(BOOL)retainFlag
qos:(MQTTQosLevel)qos {
return [self publishData:data onTopic:topic retain:retainFlag qos:qos publishHandler:nil];
}
在点击进入看看(这个代码就有点长,稍微有点耐心,我们来看看, 我会在代码中打上注释)
- (UInt16)publishData:(NSData *)data
onTopic:(NSString *)topic
retain:(BOOL)retainFlag
qos:(MQTTQosLevel)qos
publishHandler:(MQTTPublishHandler)publishHandler
{
/// 这个就是打印信息,目的就是告诉小伙伴们,我现在在发送数据了
DDLogVerbose(@"[MQTTSession] publishData:%@... onTopic:%@ retain:%d qos:%ld publishHandler:%p",
[data subdataWithRange:NSMakeRange(0, MIN(256, data.length))],
[topic substringWithRange:NSMakeRange(0, MIN(256, topic.length))],
retainFlag,
(long)qos,
publishHandler);
/// 判断主题是否为空,为空抛出异常
if (MQTTStrict.strict &&
!topic) {
NSException* myException = [NSException
exceptionWithName:@"topic must not be nil"
reason:[NSString stringWithFormat:@"%@", topic]
userInfo:nil];
@throw myException;
}
/// 判断主题长度是不是小于1,为了防止这样子的字符串 @"", 否则抛出异常
if (MQTTStrict.strict &&
topic &&
topic.length < 1) {
NSException* myException = [NSException
exceptionWithName:@"topic must not at least 1 character long"
reason:[NSString stringWithFormat:@"%@", topic]
userInfo:nil];
@throw myException;
}
/// 判断主题是不是大于65535 否则抛出异常
if (MQTTStrict.strict &&
topic &&
[topic dataUsingEncoding:NSUTF8StringEncoding].length > 65535L) {
NSException* myException = [NSException
exceptionWithName:@"topic may not be longer than 65535 bytes in UTF8 representation"
reason:[NSString stringWithFormat:@"topic length = %lu",
(unsigned long)[topic dataUsingEncoding:NSUTF8StringEncoding].length]
userInfo:nil];
@throw myException;
}
/// 判断主题编码是不是可以使用 utf8编码进行编码, 否则抛出异常
if (MQTTStrict.strict &&
topic &&
![topic dataUsingEncoding:NSUTF8StringEncoding]) {
NSException* myException = [NSException
exceptionWithName:@"topic must not contain non-UTF8 characters"
reason:[NSString stringWithFormat:@"topic = %@", topic]
userInfo:nil];
@throw myException;
}
/// 判断主题是否有通配符 有抛出异常
if (MQTTStrict.strict &&
self.willTopic &&
([self.willTopic containsString:@"+"] ||
[self.willTopic containsString:@"#"])
) {
NSException* myException = [NSException
exceptionWithName:@"willTopic must not contain wildcards"
reason:[NSString stringWithFormat:@"willTopic = %@", self.willTopic]
userInfo:nil];
@throw myException;
}
/// 判断输入的QOS级别是不是这个三个级别中的其中一个,不是就抛出异常
if (MQTTStrict.strict &&
qos != MQTTQosLevelAtMostOnce &&
qos != MQTTQosLevelAtLeastOnce &&
qos != MQTTQosLevelExactlyOnce) {
NSException* myException = [NSException
exceptionWithName:@"Illegal QoS level"
reason:[NSString stringWithFormat:@"%d is not 0, 1, or 2", qos]
userInfo:nil];
@throw myException;
}
/// 先创建一个msgId,等下要返回回去的
UInt16 msgId = 0;
if (!qos) { /// 如果qos级别等于0
/// 新建msg对象
MQTTMessage *msg = [MQTTMessage publishMessageWithData:data
onTopic:topic
qos:qos
msgId:msgId
retainFlag:retainFlag
dupFlag:FALSE
protocolLevel:self.protocolLevel
payloadFormatIndicator:nil
publicationExpiryInterval:nil
topicAlias:nil
responseTopic:nil
correlationData:nil
userProperty:nil
contentType:nil];
NSError *error = nil;
/// 编码msg对象,如果失败了,就给error对象赋值
if (![self encode:msg]) {
error = [NSError errorWithDomain:MQTTSessionErrorDomain
code:MQTTSessionErrorEncoderNotReady
userInfo:@{NSLocalizedDescriptionKey : @"Encoder not ready"}];
}
/// 如果实现了这个block,就把error回调出去,这个block通常我们用来监听是否发送成功
if (publishHandler) {
[self onPublish:publishHandler error:error];
}
} else { // 当qos不是0的时候
msgId = [self nextMsgId]; // 得到msgId
MQTTMessage *msg = nil;
/// 缓存数据用的
id<MQTTFlow> flow;
/// 判断连接状态
if (self.status == MQTTSessionStatusConnected) {
/// 拿到所有数据
NSArray *flows = [self.persistence allFlowsforClientId:self.clientId
incomingFlag:NO];
/// 计算窗口数量 和 不存在未处理的消息
BOOL unprocessedMessageNotExists = TRUE;
NSUInteger windowSize = 0;
for (id<MQTTFlow> flow in flows) {
if ((flow.commandType).intValue != MQTT_None) {
windowSize++;
} else {
unprocessedMessageNotExists = FALSE;
}
}
/// 默认maxWindowsSize = 16, 如果上面计算完了,然后判断成功,就初始化msg 和 flow对象
if (unprocessedMessageNotExists && windowSize <= self.persistence.maxWindowSize) {
msg = [MQTTMessage publishMessageWithData:data
onTopic:topic
qos:qos
msgId:msgId
retainFlag:retainFlag
dupFlag:FALSE
protocolLevel:self.protocolLevel
payloadFormatIndicator:nil
publicationExpiryInterval:nil
topicAlias:nil
responseTopic:nil
correlationData:nil
userProperty:nil
contentType:nil];
flow = [self.persistence storeMessageForClientId:self.clientId
topic:topic
data:data
retainFlag:retainFlag
qos:qos
msgId:msgId
incomingFlag:NO
commandType:MQTTPublish
deadline:[NSDate dateWithTimeIntervalSinceNow:self.dupTimeout]];
}
}
/// 如果msg不存在,就初始化一个msg对象
if (!msg) {
flow = [self.persistence storeMessageForClientId:self.clientId
topic:topic
data:data
retainFlag:retainFlag
qos:qos
msgId:msgId
incomingFlag:NO
commandType:MQTT_None
deadline:[NSDate date]];
}
/// 如果flow不存在就抛出异常
if (!flow) {
DDLogWarn(@"[MQTTSession] dropping outgoing message %d", msgId);
NSError *error = [NSError errorWithDomain:MQTTSessionErrorDomain
code:MQTTSessionErrorDroppingOutgoingMessage
userInfo:@{NSLocalizedDescriptionKey : @"Dropping outgoing Message"}];
if (publishHandler) {
[self onPublish:publishHandler error:error];
}
msgId = 0;
} else {
[self.persistence sync];
/// 判断是否实现了block,如果实现了就保存起来,
if (publishHandler) {
(self.publishHandlers)[@(msgId)] = [publishHandler copy];
} else { // 否则就删除这个msg的block
[self.publishHandlers removeObjectForKey:@(msgId)];
}
// 修改flow数据,然后存起来
if ((flow.commandType).intValue == MQTTPublish) {
DDLogVerbose(@"[MQTTSession] PUBLISH %d", msgId);
// 编码失败了,修改数据
if (![self encode:msg]) {
DDLogInfo(@"[MQTTSession] queueing message %d after unsuccessfull attempt", msgId);
flow.commandType = [NSNumber numberWithUnsignedInt:MQTT_None];
flow.deadline = [NSDate date];
[self.persistence sync];
}
} else {
DDLogInfo(@"[MQTTSession] queueing message %d", msgId);
}
}
}
/// 调用tell方法
[self tell];
/// 返回msgId
return msgId;
}
总结:上面这个方法就是先判断一些先行条件是否成功,然后判断QOS
的级别,根据不同的级别分别处理数据,然后调用tell
方法,最后返回msgId
,所以这里面tell
方法和encode
方法是我们需要关注的。
按照先后顺序,我们要先看看encode
方法,做了啥子。
- (BOOL)encode:(MQTTMessage *)message {
if (message) {
NSData *wireFormat = message.wireFormat;
if (wireFormat) {
if (self.delegate) {
if ([self.delegate respondsToSelector:@selector(sending:type:qos:retained:duped:mid:data:)]) {
[self.delegate sending:self
type:message.type
qos:message.qos
retained:message.retainFlag
duped:message.dupFlag
mid:message.mid
data:message.data];
}
}
DDLogVerbose(@"[MQTTSession] mqttTransport send");
return [self.transport send:wireFormat];
} else {
DDLogError(@"[MQTTSession] trying to send message without wire format");
return false;
}
} else {
DDLogError(@"[MQTTSession] trying to send nil message");
return false;
}
}
这个方法就比较简单了,判断代理是否存在,存在就调用,然后使用transport
发送数据
在上篇博客中我们提到过,有三种transport
,这里我们就不做介绍了。
刚才我们新见到一个类,MQTTMessage
,这个类就是负责把传入的参数生成Data
,然后提供给transport
发送出去。
然后我们在看看tell
方法,这个方法就没啥子好说的了, 就是调用代理方法(如果实现)。
- (void)tell {
NSUInteger incoming = [self.persistence allFlowsforClientId:self.clientId
incomingFlag:YES].count;
NSUInteger outflowing = [self.persistence allFlowsforClientId:self.clientId
incomingFlag:NO].count;
if ([self.delegate respondsToSelector:@selector(buffered:flowingIn:flowingOut:)]) {
[self.delegate buffered:self
flowingIn:incoming
flowingOut:outflowing];
}
if ([self.delegate respondsToSelector:@selector(buffered:queued:flowingIn:flowingOut:)]) {
[self.delegate buffered:self
queued:0
flowingIn:incoming
flowingOut:outflowing];
}
}
Transport 发送过程
以(MQTTCFSocketEncoder)
为例:
- (BOOL)send:(NSData *)data {
// 上来就是一把锁🔐 😏
@synchronized(self) {
/// 检测encoder是不是准备好了
if (self.state != MQTTCFSocketEncoderStateReady) {
DDLogInfo(@"[MQTTCFSocketEncoder] not MQTTCFSocketEncoderStateReady");
return NO;
}
/// 如果data有数据,就添加到buffer里面去
if (data) {
[self.buffer appendData:data];
}
/// 判断buffer的长度
if (self.buffer.length) {
/// 打印log
DDLogVerbose(@"[MQTTCFSocketEncoder] buffer to write (%lu)=%@...",
(unsigned long)self.buffer.length,
[self.buffer subdataWithRange:NSMakeRange(0, MIN(256, self.buffer.length))]);
/// 写数据, 这里stream就是 NSOutputStream ,所以上面一顿操作猛如虎,最后就是用NSOutputStream发送数据的 然后通过设置stream代理,判断是不是发送成功了
NSInteger n = [self.stream write:self.buffer.bytes maxLength:self.buffer.length];
/// 写失败了
if (n == -1) {
DDLogVerbose(@"[MQTTCFSocketEncoder] streamError: %@", self.error);
self.state = MQTTCFSocketEncoderStateError;
self.error = self.stream.streamError;
return NO;
} else {
/// 打印数据
if (n < self.buffer.length) {
DDLogVerbose(@"[MQTTCFSocketEncoder] buffer partially written: %ld", (long)n);
}
/// 清空buffer
[self.buffer replaceBytesInRange:NSMakeRange(0, n) withBytes:NULL length:0];
}
}
return YES;
}
}
总结 : emmm,都写在注释里面了,底层都是通过NSOutputStream
发送数据,NSInputStream
接收数据 。
ps:
如果你和作者一样,用的是MQTTWebsocketTransport
,那你可能需要手动去修改一下manager这个类才可以使用了,现在manager
这个类里面用的是MQTTCFSocketTransport
在这里改可以
- (void)connectToInternal:(MQTTConnectHandler)connectHandler {
if (self.session && self.state == MQTTSessionManagerStateStarting) {
[self updateState:MQTTSessionManagerStateConnecting];
[self.session connectToHost:self.host
port:self.port
usingSSL:self.tls
connectHandler:connectHandler];
}
}
在这里新增一个方法或者直接修改这个方法也可以(不建议用后者)
- (void)connectToHost:(NSString *)host
port:(UInt32)port
usingSSL:(BOOL)usingSSL
connectHandler:(MQTTConnectHandler)connectHandler {
DDLogVerbose(@"MQTTSessionLegacy connectToHost:%@ port:%d usingSSL:%d connectHandler:%p",
host, (unsigned int)port, usingSSL, connectHandler);
/// 在这里替换成为你需要用到的transport
MQTTCFSocketTransport *transport;
if (self.securityPolicy) {
transport = [[MQTTSSLSecurityPolicyTransport alloc] init];
((MQTTSSLSecurityPolicyTransport *)transport).securityPolicy = self.securityPolicy;
} else {
transport = [[MQTTCFSocketTransport alloc] init];
}
transport.host = host;
transport.port = port;
transport.tls = usingSSL;
transport.certificates = self.certificates;
transport.voip = self.voip;
transport.queue = self.queue;
transport.streamSSLLevel = self.streamSSLLevel;
self.transport = transport;
[self connectWithConnectHandler:connectHandler];
}