iOS MQTT 3 - 发送订阅消息以及发送过程

目录:



该系列文章预计包括:



前言:

这里的代码是从MQTTSessionManager为切入点进入的,所以下面的方法如果没有特殊描述都是从这个类然后进入内部。 主题 == topic文章中可能出现多次,大家自己转化一下。


正文:

当你连接成功之后,就要开始着手发布消息和订阅消息。在发布和订阅之前呢,首先你得要有一个主题(topic),不然你对谁发,一个topic可以被多次订阅,比如iOS设备订阅了,Android设备也订阅了,这个时候用任意一端设备发送消息,其他一端都可以收到。所以这里你首先要知道主题(一般问服务器要)。

上一个文章中,假设你没有在创建的时候就传入topic,那么在发送数据之前你可以先订阅一下这个topic,如果发送成功了,会收到一样的消息(这里笔者之前是调试用的时候用,当然你也可以不用)。

主题的格式:标记:级别

啥意思呢?

比如你的主题叫: "ijk-dr-rec" 级别(qos)是:1

那么你给的参数就是:

 OC:  @{ @"ijk-dr-rec": @(1) }

Swift: ["ijk-dr-rec", NSNumber(value: 1)]

一般来说我们最少会有两个主题:一个主题用户发布数据,一个用于订阅信息

订阅主题:

通过对这个属性的赋值subscriptions,然后监听set方法,在set方法内部处理

- (void)setSubscriptions:(NSDictionary<NSString *, NSNumber *> *)newSubscriptions {
    // 判断连接上了没,没连接上就不订阅和取消订阅的操作
    if (self.state == MQTTSessionManagerStateConnected) {
        NSDictionary *currentSubscriptions = [self.effectiveSubscriptions copy];

        // 拿到已有的topic,然后取消订阅
        for (NSString *topicFilter in currentSubscriptions) {
            if (!newSubscriptions[topicFilter]) {
                __weak MQTTSessionManager *weakSelf = self;
                /// 取消订阅
                [self.session unsubscribeTopic:topicFilter unsubscribeHandler:^(NSError *error) {
                    // 如果取消成功了,就移除掉
                    MQTTSessionManager *strongSelf = weakSelf;
                    if (!error) {
                        /// 因为这里是异步的,所以锁起来了
                        [strongSelf.subscriptionLock lock];
                        NSMutableDictionary *newEffectiveSubscriptions = [strongSelf.subscriptions mutableCopy];
                        [newEffectiveSubscriptions removeObjectForKey:topicFilter];
                        strongSelf.effectiveSubscriptions = newEffectiveSubscriptions;
                        [strongSelf.subscriptionLock unlock];
                    }
                }];
            }
        }

        // 订阅新的主题,流程和上面一样,只不过一个是移除,一个是添加
        for (NSString *topicFilter in newSubscriptions) {
            if (!currentSubscriptions[topicFilter]) {
                NSNumber *number = newSubscriptions[topicFilter];
                MQTTQosLevel qos = number.unsignedIntValue;
                __weak MQTTSessionManager *weakSelf = self;
                [self.session subscribeToTopic:topicFilter atLevel:qos subscribeHandler:^(NSError *error, NSArray<NSNumber *> *gQoss) {
                    MQTTSessionManager *strongSelf = weakSelf;
                    if (!error) {
                        NSNumber *gQos = gQoss[0];
                        [strongSelf.subscriptionLock lock];
                        NSMutableDictionary *newEffectiveSubscriptions = [strongSelf.subscriptions mutableCopy];
                        newEffectiveSubscriptions[topicFilter] = gQos;
                        strongSelf.effectiveSubscriptions = newEffectiveSubscriptions;
                        [strongSelf.subscriptionLock unlock];
                    }
                }];
            }
        }
    }
    // 把新的主题赋值给 internalSubscriptions
    self.internalSubscriptions = newSubscriptions;
    DDLogVerbose(@"MQTTSessionManager internalSubscriptions: %@", self.internalSubscriptions);
}

代码解释:

上面的方法中主要做的就是,取消订阅之前旧的主题,然后订阅新的主题,在把新的主题保存起来, 如果你的程序员中不能一开始就确定好主题数量,建议自己维护主题

发送消息:

终于到了这个令人愉快的时候, 调用- (UInt16)sendData:(NSData *)data topic:(NSString *)topic qos:(MQTTQosLevel)qos retain:(BOOL)retainFlag

- (UInt16)sendData:(NSData *)data topic:(NSString *)topic qos:(MQTTQosLevel)qos retain:(BOOL)retainFlag {
    if (self.state != MQTTSessionManagerStateConnected) {
        [self connectToLast:nil];
    }
    UInt16 msgId = [self.session publishData:data
                                     onTopic:topic
                                      retain:retainFlag
                                         qos:qos];
    return msgId;
}

    data:表示发送数据
    topic:表示主题
    qos:传输的方式(三个级别)
    retainFlag: 是否存到MQTT的消息队列上

可以看到内部做了如下处理:

  • 1、判断是否在线,如果不在线,就先连接
  • 2、调用session 发送数据,并生产msgId
  • 3、返回msgId

接收消息(订阅):

1、首先要设置代理

manager.delegate = self;

2、实现代理方法

- (void)handleMessage:(NSData *)data onTopic:(NSString *)topic retained:(BOOL)retained {
    NSLog(@"data - %@ topic - %@", data, topic);
}

可以看到在这个方法中,一共返回了三个参数

data: 表示收到的数据

topic: 表示主题

retained:表示数据是否是服务器重新传输

到这里,我们就完成了发送和订阅消息,也了解了主题。

如果你只是需要发送和订阅,到这里就完成了(左/右上角X掉)

发送过程

首先manager内部会只有一个对象,刚才发送、订阅、监听消息等一系列操作都是这个对象在做,这个对象就是session

回到发送的方法,我们会看到里面发送是这样子调用的(隐藏了一些不必要的代码)

- (UInt16)sendData:(NSData *)data topic:(NSString *)topic qos:(MQTTQosLevel)qos retain:(BOOL)retainFlag {
    ....
    UInt16 msgId = [self.session publishData:data
                                     onTopic:topic
                                      retain:retainFlag
                                         qos:qos];
    ...
}

然后点击进去看看,究竟发生了什么!

宝宝不开心,发现他也只是调用其他的方法进行发送

- (UInt16)publishData:(NSData*)data
              onTopic:(NSString*)topic
               retain:(BOOL)retainFlag
                  qos:(MQTTQosLevel)qos {
    return [self publishData:data onTopic:topic retain:retainFlag qos:qos publishHandler:nil];
}

在点击进入看看(这个代码就有点长,稍微有点耐心,我们来看看, 我会在代码中打上注释)

- (UInt16)publishData:(NSData *)data
              onTopic:(NSString *)topic
               retain:(BOOL)retainFlag
                  qos:(MQTTQosLevel)qos
       publishHandler:(MQTTPublishHandler)publishHandler
{
           /// 这个就是打印信息,目的就是告诉小伙伴们,我现在在发送数据了
    DDLogVerbose(@"[MQTTSession] publishData:%@... onTopic:%@ retain:%d qos:%ld publishHandler:%p",
                 [data subdataWithRange:NSMakeRange(0, MIN(256, data.length))],
                 [topic substringWithRange:NSMakeRange(0, MIN(256, topic.length))],
                 retainFlag,
                 (long)qos,
                 publishHandler);

           /// 判断主题是否为空,为空抛出异常
    if (MQTTStrict.strict &&
        !topic) {
        NSException* myException = [NSException
                                    exceptionWithName:@"topic must not be nil"
                                    reason:[NSString stringWithFormat:@"%@", topic]
                                    userInfo:nil];
        @throw myException;
    }

           /// 判断主题长度是不是小于1,为了防止这样子的字符串 @"", 否则抛出异常
    if (MQTTStrict.strict &&
        topic &&
        topic.length < 1) {
        NSException* myException = [NSException
                                    exceptionWithName:@"topic must not at least 1 character long"
                                    reason:[NSString stringWithFormat:@"%@", topic]
                                    userInfo:nil];
        @throw myException;
    }

           /// 判断主题是不是大于65535 否则抛出异常
    if (MQTTStrict.strict &&
        topic &&
        [topic dataUsingEncoding:NSUTF8StringEncoding].length > 65535L) {
        NSException* myException = [NSException
                                    exceptionWithName:@"topic may not be longer than 65535 bytes in UTF8 representation"
                                    reason:[NSString stringWithFormat:@"topic length = %lu",
                                            (unsigned long)[topic dataUsingEncoding:NSUTF8StringEncoding].length]
                                    userInfo:nil];
        @throw myException;
    }
    
           /// 判断主题编码是不是可以使用 utf8编码进行编码, 否则抛出异常
    if (MQTTStrict.strict &&
        topic &&
        ![topic dataUsingEncoding:NSUTF8StringEncoding]) {
        NSException* myException = [NSException
                                    exceptionWithName:@"topic must not contain non-UTF8 characters"
                                    reason:[NSString stringWithFormat:@"topic = %@", topic]
                                    userInfo:nil];
        @throw myException;
    }

           /// 判断主题是否有通配符 有抛出异常
    if (MQTTStrict.strict &&
        self.willTopic &&
        ([self.willTopic containsString:@"+"] ||
         [self.willTopic containsString:@"#"])
        ) {
        NSException* myException = [NSException
                                    exceptionWithName:@"willTopic must not contain wildcards"
                                    reason:[NSString stringWithFormat:@"willTopic = %@", self.willTopic]
                                    userInfo:nil];
        @throw myException;
    }

           /// 判断输入的QOS级别是不是这个三个级别中的其中一个,不是就抛出异常
    if (MQTTStrict.strict &&
        qos != MQTTQosLevelAtMostOnce &&
        qos != MQTTQosLevelAtLeastOnce &&
        qos != MQTTQosLevelExactlyOnce) {
        NSException* myException = [NSException
                                    exceptionWithName:@"Illegal QoS level"
                                    reason:[NSString stringWithFormat:@"%d is not 0, 1, or 2", qos]
                                    userInfo:nil];
        @throw myException;
    }

           /// 先创建一个msgId,等下要返回回去的
    UInt16 msgId = 0;
    if (!qos) { /// 如果qos级别等于0
        /// 新建msg对象
        MQTTMessage *msg = [MQTTMessage publishMessageWithData:data
                                                       onTopic:topic
                                                           qos:qos
                                                         msgId:msgId
                                                    retainFlag:retainFlag
                                                       dupFlag:FALSE
                                                 protocolLevel:self.protocolLevel
                                        payloadFormatIndicator:nil
                                     publicationExpiryInterval:nil
                                                    topicAlias:nil
                                                 responseTopic:nil
                                               correlationData:nil
                                                  userProperty:nil
                                                   contentType:nil];
        NSError *error = nil;
        /// 编码msg对象,如果失败了,就给error对象赋值
        if (![self encode:msg]) {
            error = [NSError errorWithDomain:MQTTSessionErrorDomain
                                        code:MQTTSessionErrorEncoderNotReady
                                    userInfo:@{NSLocalizedDescriptionKey : @"Encoder not ready"}];
        }
        /// 如果实现了这个block,就把error回调出去,这个block通常我们用来监听是否发送成功
        if (publishHandler) {
            [self onPublish:publishHandler error:error];
        }
    } else { // 当qos不是0的时候
        msgId = [self nextMsgId]; // 得到msgId
        MQTTMessage *msg = nil;

        /// 缓存数据用的
        id<MQTTFlow> flow;
        /// 判断连接状态
        if (self.status == MQTTSessionStatusConnected) {
            /// 拿到所有数据
            NSArray *flows = [self.persistence allFlowsforClientId:self.clientId
                                                      incomingFlag:NO];

            /// 计算窗口数量 和 不存在未处理的消息
            BOOL unprocessedMessageNotExists = TRUE;
            NSUInteger windowSize = 0;
            for (id<MQTTFlow> flow in flows) {
                if ((flow.commandType).intValue != MQTT_None) {
                    windowSize++;
                } else {
                    unprocessedMessageNotExists = FALSE;
                }
            }
            /// 默认maxWindowsSize = 16, 如果上面计算完了,然后判断成功,就初始化msg 和 flow对象
            if (unprocessedMessageNotExists && windowSize <= self.persistence.maxWindowSize) {
                msg = [MQTTMessage publishMessageWithData:data
                                                  onTopic:topic
                                                      qos:qos
                                                    msgId:msgId
                                               retainFlag:retainFlag
                                                  dupFlag:FALSE
                                            protocolLevel:self.protocolLevel
                                   payloadFormatIndicator:nil
                                publicationExpiryInterval:nil
                                               topicAlias:nil
                                            responseTopic:nil
                                          correlationData:nil
                                             userProperty:nil
                                              contentType:nil];
                flow = [self.persistence storeMessageForClientId:self.clientId
                                                           topic:topic
                                                            data:data
                                                      retainFlag:retainFlag
                                                             qos:qos
                                                           msgId:msgId
                                                    incomingFlag:NO
                                                     commandType:MQTTPublish
                                                        deadline:[NSDate dateWithTimeIntervalSinceNow:self.dupTimeout]];
            }
        }
        /// 如果msg不存在,就初始化一个msg对象
        if (!msg) {
            flow = [self.persistence storeMessageForClientId:self.clientId
                                                       topic:topic
                                                        data:data
                                                  retainFlag:retainFlag
                                                         qos:qos
                                                       msgId:msgId
                                                incomingFlag:NO
                                                 commandType:MQTT_None
                                                    deadline:[NSDate date]];
        }
        /// 如果flow不存在就抛出异常
        if (!flow) {
            DDLogWarn(@"[MQTTSession] dropping outgoing message %d", msgId);
            NSError *error = [NSError errorWithDomain:MQTTSessionErrorDomain
                                                 code:MQTTSessionErrorDroppingOutgoingMessage
                                             userInfo:@{NSLocalizedDescriptionKey : @"Dropping outgoing Message"}];
            
            if (publishHandler) {
                [self onPublish:publishHandler error:error];
            }
            msgId = 0;
        } else {
            [self.persistence sync];
            /// 判断是否实现了block,如果实现了就保存起来,
            if (publishHandler) {
                (self.publishHandlers)[@(msgId)] = [publishHandler copy];
            } else { // 否则就删除这个msg的block
                [self.publishHandlers removeObjectForKey:@(msgId)];
            }

            // 修改flow数据,然后存起来
            if ((flow.commandType).intValue == MQTTPublish) {
                DDLogVerbose(@"[MQTTSession] PUBLISH %d", msgId);
                // 编码失败了,修改数据
                if (![self encode:msg]) {
                    DDLogInfo(@"[MQTTSession] queueing message %d after unsuccessfull attempt", msgId);
                    flow.commandType = [NSNumber numberWithUnsignedInt:MQTT_None];
                    flow.deadline = [NSDate date];
                    [self.persistence sync];
                }
            } else {
                DDLogInfo(@"[MQTTSession] queueing message %d", msgId);
            }
        }
    }
    /// 调用tell方法
    [self tell];
    /// 返回msgId
    return msgId;
}

总结:上面这个方法就是先判断一些先行条件是否成功,然后判断QOS的级别,根据不同的级别分别处理数据,然后调用tell方法,最后返回msgId,所以这里面tell方法和encode方法是我们需要关注的。

按照先后顺序,我们要先看看encode方法,做了啥子。

- (BOOL)encode:(MQTTMessage *)message {
    if (message) {
        NSData *wireFormat = message.wireFormat;
        if (wireFormat) {
            if (self.delegate) {
                if ([self.delegate respondsToSelector:@selector(sending:type:qos:retained:duped:mid:data:)]) {
                    [self.delegate sending:self
                                      type:message.type
                                       qos:message.qos
                                  retained:message.retainFlag
                                     duped:message.dupFlag
                                       mid:message.mid
                                      data:message.data];
                }
            }
            DDLogVerbose(@"[MQTTSession] mqttTransport send");
            return [self.transport send:wireFormat];
        } else {
            DDLogError(@"[MQTTSession] trying to send message without wire format");
            return false;
        }
    } else {
        DDLogError(@"[MQTTSession] trying to send nil message");
        return false;
    }
}

这个方法就比较简单了,判断代理是否存在,存在就调用,然后使用transport发送数据

上篇博客中我们提到过,有三种transport,这里我们就不做介绍了。

刚才我们新见到一个类,MQTTMessage,这个类就是负责把传入的参数生成Data,然后提供给transport发送出去。

然后我们在看看tell方法,这个方法就没啥子好说的了, 就是调用代理方法(如果实现)。

- (void)tell {
    NSUInteger incoming = [self.persistence allFlowsforClientId:self.clientId
                                                   incomingFlag:YES].count;
    NSUInteger outflowing = [self.persistence allFlowsforClientId:self.clientId
                                                     incomingFlag:NO].count;
    if ([self.delegate respondsToSelector:@selector(buffered:flowingIn:flowingOut:)]) {
        [self.delegate buffered:self
                      flowingIn:incoming
                     flowingOut:outflowing];
    }
    if ([self.delegate respondsToSelector:@selector(buffered:queued:flowingIn:flowingOut:)]) {
        [self.delegate buffered:self
                         queued:0
                      flowingIn:incoming
                     flowingOut:outflowing];
    }
}

Transport 发送过程

(MQTTCFSocketEncoder)为例:

- (BOOL)send:(NSData *)data {
    // 上来就是一把锁🔐 😏
    @synchronized(self) {
        
        /// 检测encoder是不是准备好了
        if (self.state != MQTTCFSocketEncoderStateReady) {
            DDLogInfo(@"[MQTTCFSocketEncoder] not MQTTCFSocketEncoderStateReady");
            return NO;
        }
        /// 如果data有数据,就添加到buffer里面去
        if (data) {
            [self.buffer appendData:data];
        }
        
        /// 判断buffer的长度
        if (self.buffer.length) {
            /// 打印log
            DDLogVerbose(@"[MQTTCFSocketEncoder] buffer to write (%lu)=%@...",
                         (unsigned long)self.buffer.length,
                         [self.buffer subdataWithRange:NSMakeRange(0, MIN(256, self.buffer.length))]);
            
            /// 写数据, 这里stream就是 NSOutputStream ,所以上面一顿操作猛如虎,最后就是用NSOutputStream发送数据的 然后通过设置stream代理,判断是不是发送成功了
            NSInteger n = [self.stream write:self.buffer.bytes maxLength:self.buffer.length];
            
            /// 写失败了
            if (n == -1) {
                DDLogVerbose(@"[MQTTCFSocketEncoder] streamError: %@", self.error);
                self.state = MQTTCFSocketEncoderStateError;
                self.error = self.stream.streamError;
                return NO;
            } else {
                /// 打印数据
                if (n < self.buffer.length) {
                    DDLogVerbose(@"[MQTTCFSocketEncoder] buffer partially written: %ld", (long)n);
                }
                /// 清空buffer
                [self.buffer replaceBytesInRange:NSMakeRange(0, n) withBytes:NULL length:0];
            }
        }
        return YES;
    }
}

总结 : emmm,都写在注释里面了,底层都是通过NSOutputStream发送数据,NSInputStream接收数据 。

ps:

如果你和作者一样,用的是MQTTWebsocketTransport,那你可能需要手动去修改一下manager这个类才可以使用了,现在manager这个类里面用的是MQTTCFSocketTransport

在这里改可以

- (void)connectToInternal:(MQTTConnectHandler)connectHandler {
    if (self.session && self.state == MQTTSessionManagerStateStarting) {
        [self updateState:MQTTSessionManagerStateConnecting];
        [self.session connectToHost:self.host
                               port:self.port
                           usingSSL:self.tls
                     connectHandler:connectHandler];
    }
}

在这里新增一个方法或者直接修改这个方法也可以(不建议用后者)

- (void)connectToHost:(NSString *)host
                 port:(UInt32)port
             usingSSL:(BOOL)usingSSL
       connectHandler:(MQTTConnectHandler)connectHandler {
    DDLogVerbose(@"MQTTSessionLegacy connectToHost:%@ port:%d usingSSL:%d connectHandler:%p",
                 host, (unsigned int)port, usingSSL, connectHandler);
    
           /// 在这里替换成为你需要用到的transport
    MQTTCFSocketTransport *transport;
    if (self.securityPolicy) {
        transport = [[MQTTSSLSecurityPolicyTransport alloc] init];
        ((MQTTSSLSecurityPolicyTransport *)transport).securityPolicy = self.securityPolicy;
    } else {
        transport = [[MQTTCFSocketTransport alloc] init];
    }
    transport.host = host;
    transport.port = port;
    transport.tls = usingSSL;
    transport.certificates = self.certificates;
    transport.voip = self.voip;
    transport.queue = self.queue;
    transport.streamSSLLevel = self.streamSSLLevel;
    self.transport = transport;
    
    [self connectWithConnectHandler:connectHandler];
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,529评论 5 475
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,015评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,409评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,385评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,387评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,466评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,880评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,528评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,727评论 1 295
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,528评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,602评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,302评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,873评论 3 306
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,890评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,132评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,777评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,310评论 2 342