MQTT简介和使用

前言

最近在做的智能家居项目中有用到MQTT做消息的推送,主要是为了实现低流量下的智能家居控制(我们用到的是劳沃协议),在使用的时候也是遇到很多坑(特别是重连),这里讲讲自己的个人经验和解决问题的方式。

一.MQTT介绍

1.简介

MQTT(message queuing telemetry transport)是IBM开发的即时通讯协议,是一种发布/订阅极其轻量级的消息传输协议,专门为网络受限设备、低宽带以及高延迟和不可靠的网络而设计的。由于以上轻量级的特点,是实现智能家居的首选传输协议,相比于XMPP,更加轻量级而且占用宽带低。

2.特点

a.由于采用发布/订阅的消息模式,可以提供一对多的消息发布
b.轻量级,网络开销小
c.对负载内容会有屏蔽的消息传输
d.有三种消息发布质量(Qos):
qos=0:“至多一次”,这一级别会发生消息丢失或重复,消息发布依赖于TCP/IP网络
qos=1:“至少一次”,确保消息到达,但消息重复可能会发生
qos=2:“只有一次”,确保消息到达一次
e.通知机制,异常中断时会通知双方

3.原理

14523188625918865.png

MQTT协议有三种身份:发布者、代理、订阅者,发布者和订阅者都为客户端,代理为服务器,同时消息的发布者也可以是订阅者(为了节约内存和流量发布者和订阅者一般都会定义在一起)。
MQTT传输的消息分为主题(Topic,可理解为消息的类型,订阅者订阅后,就会收到该主题的消息内容(payload))和负载(payload,可以理解为消息的内容)两部分。

二.MQTT通用使用

通用的使用方式的重连机制在安卓系统中需要自己去编写,下一篇会详细讲解阿里专门针对Android客户端的实现方式MqttAndroidClient

1.集成

1.build.grade中导入

implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.0'

2.添加权限

<uses-permission android:name="android.permission.INTERNET" />
<uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />
<uses-permission android:name="android.permission.WAKE_LOCK" />

2.使用

直接上代码,以下就是发布者和订阅者一体的实现方式,自己添加了重连机制,对应的注释也比较清晰,如果对于MQTT入门最好看下阿里MQTT的官方文档

public class MQTTManager {

    private String broker = "";//固定配置
    private String secretKey = "";//固定配置
    private String acessKey = "";//固定配置

    private String topic = "";//自己定义
    private String groupId = "";//自己定义
    private String clientId = "";//自己定义

    private MqttClient mqttClient;
    private volatile static MQTTManager manager;

    private int[] qos = {0, 0};//订阅个数就是数组的长度

    private ScheduledExecutorService reconnectPool;//重连线程池

    public static MQTTManager getInstance() {
        if (manager == null) {
            synchronized (MQTTManager.class) {
                if (manager == null)
                    manager = new MQTTManager();
            }
        }
        return manager;
    }

    public MQTTManager() {
        clientId = String.format("%s@@@%s", groupId, MQTTCons.Sep_SEND);//这是根据自己需求定义的clientId
    }

    /**
     * 发送信息
     * @param msg
     */
    public void sendMessage(String msg) {
        MqttMessage message = new MqttMessage(msg.getBytes());
        message.setQos(0);
        try {
            if (mqttClient != null)
                mqttClient.publish(topic, message);
        } catch (MqttException e) {
            e.printStackTrace();
            TVLog.i("MqttException-sendMQTT-" + e);
        }
    }

    /**
     * 开启MQTT连接和订阅
     */
    public void startSendMQTT() {
        try {
            closeMQTT();//断开和关闭连接的操作,由于我们需求需要,切换用户要重新创建新的连接,一般应用中基本都会始终订阅一条
            MemoryPersistence persistence = new MemoryPersistence();
            mqttClient = new MqttClient(broker, clientId, persistence);
            final MqttConnectOptions connOpts = new MqttConnectOptions();
            String sign = MacSignature.macSignature(clientId.split("@@@")[0], secretKey);
            connOpts.setUserName(acessKey);
            connOpts.setServerURIs(new String[]{broker});
            connOpts.setPassword(sign.toCharArray());
            connOpts.setCleanSession(true);
            connOpts.setKeepAliveInterval(20);
            connOpts.setConnectionTimeout(10);
            connOpts.setMqttVersion(MQTT_VERSION_3_1_1);
            connOpts.setAutomaticReconnect(false);//禁用自带重连机制,用于TV端会出现不稳定性,所以自己写了重连
            mqttClient.setCallback(new MqttCallbackExtended() {
                public void connectComplete(boolean reconnect, String serverURI) {
                    TVLog.i("Send connect success" + topic);
                    closeReconnectTask();
                    subscribeFilter();//mqtt每次连接成功都得订阅Topic
                }

                public void connectionLost(Throwable throwable) {
                    TVLog.i("mqtt connection lost");
                    startReconnectTask();
                }

                public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
                    TVLog.i("messageArrived:" + topic + "------" + new String(mqttMessage.getPayload()));
                }

                public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                    try {
                        TVLog.i("deliveryComplete:" + iMqttDeliveryToken.getMessage().toString());
                    } catch (MqttException e) {
                        e.printStackTrace();
                    }
                }
            });
            mqttClient.connect(connOpts);
        } catch (Exception me) {
            me.printStackTrace();
        }
    }

    /**
     * 订阅Topic
     */
    private void subscribeFilter() {
        String registerTopic = "";//自定义
        String controlTopic = "";//自定义,作为示例订阅了两个
        String[] topicFilters = new String[]{registerTopic, controlTopic};

        try {
            mqttClient.subscribe(topicFilters, qos);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    private synchronized void startReconnectTask() {//开启重连任务
        if (reconnectPool != null) return;
        reconnectPool = Executors.newScheduledThreadPool(1);
        reconnectPool.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    if (mqttClient == null || mqttClient.isConnected()) return;
                    mqttClient.reconnect();
                    TVLog.d("reconnectSendMQTT" + topic);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }, 0, 5 * 1000, TimeUnit.MILLISECONDS);
    }

    public synchronized void closeReconnectTask() {//程序销毁的时候也记得关闭
        if (reconnectPool != null) {
            reconnectPool.shutdownNow();
            reconnectPool = null;
        }
    }

    public void closeMQTT() {//在程序销毁的时候也记得调用
        try {
            closeReconnectTask();
            if (mqttClient != null) {
                mqttClient.disconnect();
                mqttClient.close();
                mqttClient = null;
                TVLog.d("closeSendMQTT" + topic);
            }
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

}

注意:mqttClient.disconnect()和mqttClient.connect(connOpts)都为耗时操作,网络差的时候会阻塞主线程,可以新开线程或者直接用线程池ScheduledExecutorService管理

结语

以上就是MQTT的简介、以及由官方Java的通用实现Demo改写后,在Android客户端的实现方式,下一篇会详细讲解阿里专门针对Android客户端的实现方式MqttAndroidClient

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,590评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 86,808评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,151评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,779评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,773评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,656评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,022评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,678评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,038评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,659评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,756评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,411评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,005评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,973评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,053评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,495评论 2 343