mqtt 工具类

package com.realtop.mqttutils;

import android.content.Context;
import android.content.SharedPreferences;
import android.os.Handler;
import android.os.Looper;
import android.text.TextUtils;
import android.util.Log;

import org.eclipse.paho.android.service.MqttAndroidClient;
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttMessage;

import java.lang.ref.WeakReference;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;

public class MQTTHelper implements Runnable {

    private final static class Inner {
        public static final MQTTHelper OBJ = new MQTTHelper();
    }

    public static MQTTHelper getInstance() {
        return Inner.OBJ;
    }

    private MQTTHelper() {
        Log.i(TAG, "MQTTHelper_init_do_nothing: ");
    }

    public static final String TAG = "mqtt_helper";

    public static final String INTENT_ACTION_MQTT_MSG = "INTENT_ACTION_MQTT_MSG";
    public static final String MQTT_MSG = "MQTT_MSG";

    /**
     * 参数部分
     */
    private String receiveSubject = "";
    private String sendSubject = "";
    private String host = "";
    private String clientId = "";
    private String userName = "";
    private String password = "";

    private boolean needAutoConnect = true;

    private Context mContext;
    private MqttAndroidClient mMqtt;
    private Handler mHandler;

    private final ConcurrentHashMap<Integer, WeakReference<OnSendMsgListener>> mListeners
            = new ConcurrentHashMap<>();

    private final Set<MQTTReceiver> mReceivers=new CopyOnWriteArraySet<>();

    public Context getContext() {
        return mContext;
    }

    public static SharedPreferences getConfig(Context context){
        return context.getSharedPreferences("mqtt_config_prefer", Context.MODE_PRIVATE);
    }

    public boolean isNeedAutoConnect() {
        return needAutoConnect;
    }

    public void setNeedAutoConnect(boolean needAutoConnect) {
        this.needAutoConnect = needAutoConnect;
    }

    public String getSendSubject() {
        return sendSubject;
    }

    public MQTTHelper setSendSubject(String sendSubject) {
        this.sendSubject = sendSubject;
        return this;
    }

    public String getReceiveSubject() {
        return receiveSubject;
    }

    public MQTTHelper setReceiveSubject(String receiveSubject) {
        this.receiveSubject = receiveSubject;
        return this;
    }

    public String getHost() {
        return host;
    }

    public MQTTHelper setHost(String host) {
        this.host = host;
        return this;
    }

    public String getClientId() {
        return clientId;
    }

    public MQTTHelper setClientId(String clientId) {
        this.clientId = clientId;
        return this;
    }

    public String getUserName() {
        return userName;
    }

    public MQTTHelper setUserName(String userName) {
        this.userName = userName;
        return this;
    }

    public String getPassword() {
        return password;
    }

    public MQTTHelper setPassword(String password) {
        this.password = password;
        return this;
    }

    public void init(Context context) {
        mContext = context.getApplicationContext();
        mHandler = new Handler(Looper.getMainLooper());
        if (mMqtt != null)
            return;
        // todo id 服务器提前输入的id, 设备出场时刻录
        String id = MUtils.getFactoryMacAddress(context);
        if (TextUtils.isEmpty(id)){
            id = System.currentTimeMillis()+"_id";
        }
        if (TextUtils.isEmpty(clientId)) {
            clientId = id;
        }
        if (TextUtils.isEmpty(receiveSubject)) {
            receiveSubject = "$thing/down/";
        }
        if (TextUtils.isEmpty(sendSubject)) {
            sendSubject = "$thing/up/";
        }
        mMqtt = new MqttAndroidClient(mContext, host, clientId);
        mMqtt.setCallback(new MqttCallback());
        Log.i(TAG, "init_" + clientId + ": " + receiveSubject + ": " + sendSubject);
    }

    @Override
    public void run() {
        startConnect();
    }

    /**
     * 发送上行信息
     * @param str 消息json
     * @param listener 发送是否成功回调
     */
    public void sendMsg(String str, OnSendMsgListener listener) {
        sendMsg(getSendSubject(), str, listener);
    }

    public void sendMsg(String sendSubject, String str, OnSendMsgListener listener) {
        if (mMqtt == null)
            return;
        new Handler(Looper.getMainLooper()).post(() -> {
            try {
                MqttMessage msg = new MqttMessage();
                msg.setPayload(str.getBytes());
                IMqttDeliveryToken publish = mMqtt.publish(sendSubject, msg);
                Log.i(TAG, "sendMsg_send_msg_id: " + publish.getMessageId()+"; "+str);

                if (listener == null)
                    return;
                if (publish.getMessageId() == 0) {
                    listener.onSendFinish(false);
                    return;
                }
                WeakReference<OnSendMsgListener> reference = new WeakReference<>(listener);
                mListeners.put(publish.getMessageId(), reference);
            } catch (Exception e) {
                Log.i(TAG, "sendMsg_error_" + e.getMessage());
            }
        });
    }

    /**
     * 关闭自动重连 默认打开
     */
    public void cancelAutoConnect() {
        needAutoConnect = false;
        mHandler.removeCallbacksAndMessages(null);
    }

    public void startConnect() {
        if (mMqtt == null)
            return;
        mHandler.removeCallbacksAndMessages(null);
        try {
            MqttConnectOptions options = new MqttConnectOptions();
            options.setAutomaticReconnect(false);
            options.setCleanSession(true);
            options.setConnectionTimeout(60);
            options.setKeepAliveInterval(60);
            options.setMaxInflight(10);
            options.setUserName(userName);
            options.setPassword(password.toCharArray());
            options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_DEFAULT);
            Log.i(TAG, "startConnect_username_passwd: " +
                    userName + "; " + password + "; " + mMqtt.getClientId() + "; " + options.getUserName());
            mMqtt.connect(options, new IMqttActionListener() {
                @Override
                public void onSuccess(IMqttToken asyncActionToken) {
                    Log.i(TAG, "onSuccess_");
                    mHandler.removeCallbacksAndMessages(null);
                }

                @Override
                public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                    Log.i(TAG, "onFailure_" + exception.getMessage());
                    exception.printStackTrace();
                    mHandler.removeCallbacksAndMessages(null);
                    if (needAutoConnect) {
                        mHandler.postDelayed(getInstance(), 16000);
                    }
                }
            });
        } catch (Exception e) {
            e.printStackTrace();
            Log.i(TAG, "onCreate_error_" + e.getMessage());
        }
    }

    public void release() {
        mReceivers.clear();
        clearListener();
        cancelAutoConnect();
        mHandler.removeCallbacksAndMessages(null);
        if (mMqtt == null)
            return;
        if (mMqtt.isConnected()) {
            try {
                mMqtt.disconnect();
            } catch (Exception e) {
                Log.i(TAG, "onDestroy_error_" + e.getMessage());
            }
        }
        mMqtt = null;
        Log.i(TAG, "release_end_now: ");
    }

    public void clearListener() {
        Set<Integer> integers = mListeners.keySet();
        for (Integer index : integers) {
            try {
                Objects.requireNonNull(mListeners.get(index)).clear();
            } catch (Exception e) {
                Log.i(TAG, "clearListener_item_error: " + e.getMessage());
            }
        }
        mListeners.clear();
    }

    public void registerCallback(MQTTReceiver callback) {
        mReceivers.add(callback);
    }

    public void unRegisterCallback(MQTTReceiver registerReceiver) {
        mReceivers.remove(registerReceiver);
    }

    public static class MQTTReceiver {

        protected void receiveMsg(String msg) {

        }

    }


    private class MqttCallback implements MqttCallbackExtended {

        @Override
        public void connectComplete(boolean reconnect, String serverURI) {
            mHandler.removeCallbacksAndMessages(null);
            try {
                mMqtt.subscribe(receiveSubject, 0);
                Log.i(TAG, "connectComplete_start_subscribe");
            } catch (Exception e) {
                Log.i(TAG, "onSuccess_error_" + e.getMessage());
            }
        }

        @Override
        public void connectionLost(Throwable cause) {
            try {
                if (mMqtt != null)
                    mMqtt.unsubscribe(receiveSubject);
                Log.i(TAG, "connectionLost_unsubscribe");
            } catch (Exception e) {
                Log.i(TAG, "connectionLost_error_" + e.getMessage());
            }
            mHandler.removeCallbacksAndMessages(null);
            if (needAutoConnect) {
                mHandler.postDelayed(getInstance(), 68000);
            }
        }

        @Override
        public void messageArrived(String topic, MqttMessage message) throws Exception {
            try {
                byte[] payload = message.getPayload();
                String msg = new String(payload);
                Log.i(TAG, "messageArrived_msg_" + msg);
                for (MQTTReceiver item : mReceivers) {
                    item.receiveMsg(msg);
                }
            } catch (Exception e) {
                Log.i(TAG, "messageArrived_error:" + e.getMessage());
            }
        }

        @Override
        public void deliveryComplete(IMqttDeliveryToken token) {
            try {
                WeakReference<OnSendMsgListener> reference = mListeners.get(token.getMessageId());
                if (reference!=null){
                    OnSendMsgListener onSendMsgListener = reference.get();
                    if (onSendMsgListener != null) {
                        onSendMsgListener.onSendFinish(token.isComplete());
                    }
                    reference.clear();
                    mListeners.remove(token.getMessageId());
                }
            } catch (Exception e) {
                Log.i(TAG, "deliveryComplete_error: " + e.getMessage());
            }

            Log.i(TAG, "deliveryComplete_send_msg_id: "
                    + token.getMessageId() + "; " + token.isComplete());
        }

    }


    public interface OnSendMsgListener {
        void onSendFinish(boolean isComplete);
    }




}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,793评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,567评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,342评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,825评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,814评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,680评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,033评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,687评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,175评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,668评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,775评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,419评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,020评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,978评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,206评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,092评论 2 351
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,510评论 2 343

推荐阅读更多精彩内容