Android mqtt 客户端实现一般使用以下两个库:
implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.1'
这两个库是eclipse 公司开发维护的。
github地址:https://github.com/eclipse/paho.mqtt.android
一般Android 客户端的连接代码可以这样写:
import android.content.Context;
import android.util.Log;
import org.eclipse.paho.android.service.MqttAndroidClient;
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
public class MqttClient {
private static MqttClient instance;
private Context context;
//单例模式
public static MqttClient getInstance(Context context) {
if (null == instance) {
synchronized (MqttClient.class) {
instance = new MqttClient(context);
}
}
return instance;
}
private MqttClient(Context context) {
this.context = context.getApplicationContext();
}
//声明一个MQTT客户端对象
private MqttAndroidClient mMqttClient;
private static final String TAG = "MqttClient";
//连接到服务器
private void connectMQTT() {
//连接时使用的clientId, 必须唯一, 一般加时间戳
String clientId = "xxxx" + System.currentTimeMillis();
mMqttClient = new MqttAndroidClient(context, "tcp://xxxxhost:xxxxport", clientId);
//连接参数
MqttConnectOptions options;
options = new MqttConnectOptions();
//设置自动重连
options.setAutomaticReconnect(true);
// 缓存,
options.setCleanSession(true);
// 设置超时时间,单位:秒
options.setConnectionTimeout(15);
// 心跳包发送间隔,单位:秒
options.setKeepAliveInterval(15);
// 用户名
options.setUserName("username");
// 密码
options.setPassword("password".toCharArray());
// 设置MQTT监听
mMqttClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
Log.d(TAG, "connectionLost: 连接断开");
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
Log.d(TAG, "消息到达");
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
}
});
try {
//进行连接
mMqttClient.connect(options, null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
Log.d(TAG, "onSuccess: 连接成功");
try {
//连接成功后订阅主题
mMqttClient.subscribe("some topic", 2);
} catch (MqttException e) {
e.printStackTrace();
}
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
Log.d(TAG, "onFailure: 连接失败");
}
});
} catch (MqttException e) {
e.printStackTrace();
}
}
}
几个重要参数的介绍:
这里最重要的是对几个关键参数的理解。
-
options.setAutomaticReconnect(true);
先看官方的解释:
Sets whether the client will automatically attempt to reconnect to the server if the connection is lost.
If set to false, the client will not attempt to automatically reconnect to the server in the event that the connection is lost.
If set to true, in the event that the connection is lost, the client will attempt to reconnect to the server. It will initially wait 1 second before it attempts to reconnect, for every failed reconnect attempt, the delay will double until it is at 2 minutes at which point the delay will stay at 2 minutes.
设置为true表示支持自动重连。 这里的重连机制是:先1秒之后尝试重连,然后翻倍2秒后再尝试重连,最后一直稳定重连间隔时间为2分钟。
-
options.setCleanSession(true);
这个标志是标志客户端,服务端是否要保持持久化的一个标志。默认是
true
。这个标志理解起来并不容易先看官方的解释:
Sets whether the client and server should remember state across restarts and reconnects.
If set to false both the client and server will maintain state across restarts of the client, the server and the connection. As state is maintained:
Message delivery will be reliable meeting the specified QOS even if the client, server or connection are restarted.
The server will treat a subscription as durable.
If set to true the client and server will not maintain state across restarts of the client, the server or the connection. This means
Message delivery to the specified QOS cannot be maintained if the client, server or connection are restarted
The server will treat a subscription as non-durable
设置客户端和服务端重启或重连后是否需要记住之前的状态。
当setCleanSession为true
时,客户端掉线或服务端重启后,服务端和客户端会清掉之前的 session, 重连后客户端会有一个新的session。离线期间发来QoS=0,1,2的消息一律接收不到,且所有之前订阅的topic需要重新订阅。
当setCleanSession为false
时, 客户端掉线或服务端重启后,服务端和客户端不会清除之前的session。重连后session将会恢复,客户端不用重新订阅主题。且离线期间发来QoS=0,1,2的消息仍然可以接收到。
这里有个需要注意的地方,即setCleanSession为true时,掉线后客户端设置了setAutomaticReconnect
为true才会自动重连。为当setCleanSession为false时。不管setAutomaticReconnect
为true或者false都会自动重连。
-
options.setKeepAliveInterval(15);
Sets the "keep alive" interval. This value, measured in seconds, defines the maximum time interval between messages sent or received. It enables the client to detect if the server is no longer available, without having to wait for the TCP/IP timeout. The client will ensure that at least one message travels across the network within each keep alive period. In the absence of a data-related message during the time period, the client sends a very small "ping" message, which the server will acknowledge. A value of 0 disables keepalive processing in the client.
这个字段是设置keepalive
时间间隔的。
MQTT客户端(client)在没有收到或发布任何消息时仍然是保持连接的。服务端(the broker
)需要跟踪客户端的连接状态。 所有需要发送心跳包来确定客户端是否是连接状态。心跳包发送的时间间隔就是keepalive
设置的。
服务端都会维持一个timer。当这个timer记录的时间超过1.5倍keepalive
值时,服务端会将这个客户端标记为断开连接,并发送Last Will and Testament (LWT)
遗言广播。
以下情况下会重置这个timer:
- 每次客户端发送或接收一个消息, 服务端会重置这个timer。
- 一个客户端可以在任何时间发送一个
PINGREQ
的消息到服务器,服务器如果收到这个消息后,会回一个PINGRESP
消息,然后服务端会重置这个timer。
paho client在一个keepalive
时间间隔内没有向 Broker 发送任何数据包,比如 PUBLISH 和 SUBSCRIBE 的时候,它会向 Broker 发送PINGREQ
数据包,告诉服务器自己仍然是连接的。
遇到的问题及解决方法
1.断线重连后收不到消息。
一般是由于我们设置了setCleanSession=true
时,且setAutomaticReconnect(true)
,这样mqtt客户端断线后会启动自动重连机制。但是由于CleanSession=true
会启动一个新的session,这样需要重新订阅topic。如果我们没有重新订阅topic,就会导致断线重连后收不到消息。
此时
我们需要将Callback替换成MqttCallbackExtended,并在重写方法connectComplete
重新订阅即可。
public void connectComplete(boolean reconnect, String serverURI){
client.subscribe(topics,Qos);//具体订阅代码
}```