MQTT客户端代码分析

MQTT客户端实现

MQTT是个轻量级的消息订阅/发布协议,基于TCP协议,在物联网中应用较广,当然也有的公司拿MQTT协议来做Push或IM。MQTT协议有很多客户端/服务端的实现,如Eclipse Paho就是其中一个。本文不对MQTT协议本身做介绍,而是主要分析下一个Paho MQTT客户端的代码实现。

消息基类

所有消息的基类就是MqttWireMessage,核心的方法无非是封包/拆包,创建包头,读取playload等等。各个消息子类如MqttSubscribe等,继承自MqttWireMessage,需要实现getMessageInfo、getVariableHeader(构造包头),getPayload(构造body)等方法

重要接口

IMqttAsyncClient 声明了与MQTT Server交互时的重要方法,如connect、publish、subscribe等,这些方法是异步的,有两种调用方式

//方式一 
       IMqttToken conToken;
        conToken = asyncClient.client.connect(conToken);
      ... do some work...
        conToken.waitForCompletion();
        
        //或者这样就可以把一个异步任务转化成同步调用
        IMqttToken token;
      token = asyncClient.method(parms).waitForCompletion();
 


//方式二,传入一个callback MqttAsyncActionListener,实现onSuccess及onFailure方法

        IMqttToken conToken;
        conToken = asyncClient.connect("some context",new new MqttAsyncActionListener() {
            public void onSuccess(IMqttToken asyncActionToken) {
                log("Connected");
            }
 
            public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                log ("connect failed" +exception);
            }
          });

我们可以把此处的IMqttToken理解为对异步的任务信息、操作的封装,里面包含了getTopics、setActionCallback、waitForCompletion、isComplete、getMessageId、getResponse等方法

MqttAsyncClient是对IMqttAsyncClient接口的具体实现,里面包含两个重要的类:ClientComms,用来和服务器交互的类,封装了底层的网络调用;MqttClientPersistence,按照协议的QoS规定用来做消息的持久化。

public IMqttToken connect(MqttConnectOptions options, Object userContext, IMqttActionListener callback)
            throws MqttException, MqttSecurityException {
        final String methodName = "connect";
        if (comms.isConnected()) {
            throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_CLIENT_CONNECTED);
        }
        ...

        //设置网络模块,NetworkModule是一个接口,包含start、stop、getInputStream、getOutputStream四个方法,
        //有TCPNetworkModule、SSLNetworkModule、LocalNetworkModule等不同的实现
        comms.setNetworkModules(createNetworkModules(serverURI, options));

        // Insert our own callback to iterate through the URIs till the connect succeeds
        //ConnectActionListener实现了IMqttActionListener接口,并把失败重试等逻辑都封装在该类里面;userToken是返回给上层调用者使用的对异步任务操作的封装类
        MqttToken userToken = new MqttToken(getClientId());
        ConnectActionListener connectActionListener = new ConnectActionListener(this, persistence, comms, options, userToken, userContext, callback);
        userToken.setActionCallback(connectActionListener);
        userToken.setUserContext(this);

        comms.setNetworkModuleIndex(0);
        connectActionListener.connect();

        return userToken;
    }

网络管理类

ClientComms是网络层重要的管理类,包含几个主要的类:

  • NetworkModule 底层网络实现 CommsSender 和 CommsReceiver里的输入输出流来自于NetworkModule层的Socket
  • CommsReceiver 接收消息 起一个线程,通过MqttInputStream解析出消息
  • CommsSender 发送消息 起一个线程消费ClientState里的发送队列,通过MqttOutputStream往外写消息
  • ClientState 管理消息的发送,里面有Vector pendingMessages pendingFlows 待发送的消息,结合不同的Qos进行处理
  • CommsCallback 收到消息后的回调处理,是Receiver和外部API调用之间的桥梁

ClientComms连接服务器时,会在一个异步线程ConnectBG中执行,包括启动网络模块,初始化CommsReceiver、CommsSender等

// Connect to the server at the network level e.g. TCP socket and then
                // start the background processing threads before sending the connect
                // packet.
                NetworkModule networkModule = networkModules[networkModuleIndex];
                networkModule.start();
                receiver = new CommsReceiver(clientComms, clientState, tokenStore, networkModule.getInputStream());
                receiver.start("MQTT Rec: "+getClient().getClientId());
                sender = new CommsSender(clientComms, clientState, tokenStore, networkModule.getOutputStream());
                sender.start("MQTT Snd: "+getClient().getClientId());
                callback.start("MQTT Call: "+getClient().getClientId());                
                internalSend(conPacket, conToken);
                

通过internalSend方法,将要发送的消息插入到ClientState中的pendingMessages队列中,CommsSender会去消费这个队列,把消息取出来,写到MqttOutputStream里,并根据Qos设置做一些持久化操作。写成功后会有回调通知外部API调用。其他消息的发送也是这个流程。

//CommsSender中的run方法
while (running && (out != null)) {
            try {
                message = clientState.get();
                if (message != null) {

                    if (message instanceof MqttAck) {
                        out.write(message);
                        out.flush();
                    } else {
                        MqttToken token = tokenStore.getToken(message);

                        if (token != null) {
                            synchronized (token) {
                                out.write(message);
                                try {
                                    out.flush();
                                } catch (IOException ex) {
                                    if (!(message instanceof MqttDisconnect)) {
                                        throw ex;
                                    }
                                }
                                clientState.notifySent(message);
                            }
                        }
                    }
                } 
            } 

CommsReceiver负责从InputStream读出消息,通过ClientState进行分发

public void run() {
        final String methodName = "run";
        MqttToken token = null;
        
        while (running && (in != null)) {
            try {
                //@TRACE 852=network read message
                log.fine(CLASS_NAME,methodName,"852");
                receiving = in.available() > 0;
                MqttWireMessage message = in.readMqttWireMessage();
                receiving = false;
                
                if (message instanceof MqttAck) {
                    token = tokenStore.getToken(message);
                    if (token!=null) {
                        synchronized (token) {
                            clientState.notifyReceivedAck((MqttAck)message);
                        }
                    } else {
                        throw new MqttException(MqttException.REASON_CODE_UNEXPECTED_ERROR);
                    }
                } else {
                    // A new message has arrived
                    clientState.notifyReceivedMsg(message);
                }
            }
            catch (MqttException ex) {
                running = false;
                // Token maybe null but that is handled in shutdown
                clientComms.shutdownConnection(token, ex);
            } 
            catch (IOException ioe) {
                //@TRACE 853=Stopping due to IOException
                log.fine(CLASS_NAME,methodName,"853");

                running = false;
                if (!clientComms.isDisconnecting()) {
                    clientComms.shutdownConnection(token, new MqttException(MqttException.REASON_CODE_CONNECTION_LOST, ioe));
                }
            }
            finally {
                receiving = false;
            }
        }

而在ClientState中,通过notifyReceivedMsg方法接收到消息,根据不同的Qos做持久化操作,并最终调用了CommsCallback的messageArrived方法,将消息加入到一个messageQueue队列中。

CommsCallback里也起了一个线程消费这个队列,将取出的消息在handleMessage方法中通过MqttCallback接口回调出去。上文提到的发送完毕时,会将token插入到CommsCallback的completeQueue方法里进行消费,也是在这个run方法里

//CommsCallback里的run方法
while (running) {
            try {

                if (running) {
                    // Check for deliveryComplete callbacks...
                    MqttToken token = null;
                    synchronized (completeQueue) {
                        if (!completeQueue.isEmpty()) {
                            // First call the delivery arrived callback if needed
                            token = (MqttToken) completeQueue.elementAt(0);
                            completeQueue.removeElementAt(0);
                        }
                    }
                    if (null != token) {
                        handleActionComplete(token);
                    }
                    
                    // Check for messageArrived callbacks...
                    MqttPublish message = null;
                    synchronized (messageQueue) {
                        if (!messageQueue.isEmpty()) {
                           
                            message = (MqttPublish) messageQueue.elementAt(0);

                            messageQueue.removeElementAt(0);
                        }
                    }
                    if (null != message) {
                        handleMessage(message);
                    }
                }

                
                
            } 
//处理发送完成的方法
private void handleActionComplete(MqttToken token)
            throws MqttException {
        final String methodName = "handleActionComplete";
        synchronized (token) {
            
            // Unblock any waiters and if pending complete now set completed
            token.internalTok.notifyComplete();
            
            if (!token.internalTok.isNotified()) {
                // If a callback is registered and delivery has finished 
                // call delivery complete callback. 
                if ( mqttCallback != null 
                    && token instanceof MqttDeliveryToken 
                    && token.isComplete()) {
                        mqttCallback.deliveryComplete((MqttDeliveryToken) token);
                }
                // Now call async action completion callbacks
                fireActionEvent(token);
            }
            
            // Set notified so we don't tell the user again about this action.
            if ( token.isComplete() ){
               if ( token instanceof MqttDeliveryToken || token.getActionCallback() instanceof IMqttActionListener ) {
                    token.internalTok.setNotified(true);
                }
            }
            

            if (token.isComplete()) {
                clientState.notifyComplete(token);
            }
        }
    }

使用介绍

以同步调用为例

    String tmpDir = System.getProperty("java.io.tmpdir");
        MqttDefaultFilePersistence dataStore = new MqttDefaultFilePersistence(tmpDir);

        try {
            // Construct the connection options object that contains connection parameters
            // such as cleanSession and LWT
            conOpt = new MqttConnectOptions();
            conOpt.setCleanSession(clean);
            if(password != null ) {
              conOpt.setPassword(this.password.toCharArray());
            }
            if(userName != null) {
              conOpt.setUserName(this.userName);
            }

            // Construct an MQTT blocking mode client
            client = new MqttClient(this.brokerUrl,clientId, dataStore);

            // Set this wrapper as the callback handler
            client.setCallback(this);

        } catch (MqttException e) {
            e.printStackTrace();
            log("Unable to set up client: "+e.toString());
            System.exit(1);
        }

设置各种连接参数,如用户名,密码,持久化存储路径等,并设置MqttCallback回调函数。

public interface MqttCallback {
//连接断开时的回调
public void connectionLost(Throwable cause);
//收到下推消息时的回调
public void messageArrived(String topic, MqttMessage message) throws Exception;
//消息发送成功时的回调
public void deliveryComplete(IMqttDeliveryToken token);
}

想要发布一个消息时,可以这样

public void publish(String topicName, int qos, byte[] payload) throws MqttException {

        client.connect(conOpt);

        String time = new Timestamp(System.currentTimeMillis()).toString();

        MqttMessage message = new MqttMessage(payload);
        message.setQos(qos);

        client.publish(topicName, message);

        client.disconnect();
    }

这个publish方法是个同步方法,里面的实现其实是代理给异步client+wait阻塞实现的

public void publish(String topic, MqttMessage message) throws MqttException,
            MqttPersistenceException {
        aClient.publish(topic, message, null, null).waitForCompletion(getTimeToWait());
    }

publish调用后将消息插入到ClientState的队列中,通过CommsSender线程中发送给服务器,发送完成时(或收到ack后)会回调MqttCallback接口中的deliveryComplete方法。用户还可以设置IMqttActionListener接口获取发送是成功还是失败的回调。如果收到一个新的消息,最终通过MqttCallback中的messageArrived回调给用户。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,723评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,485评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,998评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,323评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,355评论 5 374
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,079评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,389评论 3 400
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,019评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,519评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,971评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,100评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,738评论 4 324
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,293评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,289评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,517评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,547评论 2 354
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,834评论 2 345

推荐阅读更多精彩内容