目录
MQTT简介
MQTT是机器对机器(M2M)/物联网(IoT)连接协议。它被设计为一个极其轻量级的发布/订阅消息传输协议。对于需要较小代码占用空间和/或网络带宽非常宝贵的远程连接非常有用,是专为受限设备和低带宽、高延迟或不可靠的网络而设计。这些原则也使该协议成为新兴的“机器到机器”(M2M)或物联网(IoT)世界的连接设备,以及带宽和电池功率非常高的移动应用的理想选择。例如,它已被用于通过卫星链路与代理通信的传感器、与医疗服务提供者的拨号连接,以及一系列家庭自动化和小型设备场景。它也是移动应用的理想选择,因为它体积小,功耗低,数据包最小,并且可以有效地将信息分配给一个或多个接收器。
效果演示
这是我连接的MQTT中文网的公共服务器
基础知识
这里开发客户端需要的知识不多,paho的核心库都封装好了,我们只需要了解下基础的知识就行了。
1.连接
这里我使用的MQTT3.1.1协议,服务器地址是以tcp开头的末尾加上端口号1883
val server = "tcp://mqtt.p2hp.com:1883" //服务端地址
其他的一些参数如下:
clientId:(作为客户端的标识),这里我使用的是AndroidID,获取不到的话就使用生成的一个UUID
CleanSession:设置不持久化的话,每次都是一次新会话
keepAliveInterval:发送心跳包的时间
val androidId = DeviceUtils.getAndroidID()
val clientId = if(!TextUtils.isEmpty(androidId)){
androidId
}else{
UUID.randomUUID().toString()
}
mqttClient = MqttAndroidClient(context,serverUrl,clientId)
connectOptions = MqttConnectOptions().apply {
isCleanSession = false //是否会话持久化
connectionTimeout = 30 //连接超时时间
keepAliveInterval = 10 //发送心跳时间
userName = name //如果设置了认证,填的用户名
password = pass.toCharArray() //用户密码
}
2.订阅和发布
(1) 订阅
这里的概念就好像你微博关注了一个博主,然后当博主发布新的动态,你这就可以收到,而这里的订阅就是类似关注,订阅的主题格式跟文件路径差不多,比如订阅一个topic/1,当然这里也有带通配符的订阅方式比如topic/#,#的意思就是匹配所有,也就是当你订阅了topic/#的主题你就可以收到所有topic开头的主题消息,像topic/1、topic/2、topic/3等。
(2) 发布
发布消息的时候也需要指定一个主题,比如topic/1,但是不能指定带通配符的主题。
(3) QOS
另外还有一个比较重要的概念就是QOS(服务质量),这里有3个值(0,1,2),代表的意义如下:
0:代表,Sender 发送的一条消息,Receiver 最多能收到一次,也就是说 Sender 尽力向 Receiver 发送消息,如果发送失败,也就算了。
1:代表,Sender 发送的一条消息,Receiver 至少能收到一次,也就是说 Sender 向 Receiver 发送消息,如果发送失败,会继续重试,直到 Receiver 收到消息为止,但是因为重传的原因,Receiver 有可能会收到重复的消息。
2:代表,Sender 发送的一条消息,Receiver 确保能收到而且只收到一次,也就是说 Sender 尽力向 Receiver 发送消息,如果发送失败,会继续重试,直到 Receiver 收到消息为止,同时保证 Receiver 不会因为消息重传而收到重复的消息。
实现步骤
1.引入依赖
这里本来我是用的这个库https://github.com/eclipse/paho.mqtt.android,但是这个库不适配Android12因此我下载了源码调整了下,重新自己封装了一个,如下:
allprojects {
repositories {
...
maven { url 'https://jitpack.io' }
}
}
dependencies {
implementation 'com.github.itfitness:MQTTAndroid:1.0.0'
}
2.封装方法
我对这个库进行了一些封装,如下:
class MQTTHelper{
private val mqttClient: MqttAndroidClient
private val connectOptions: MqttConnectOptions
private var mqttActionListener: IMqttActionListener? = null
constructor(context: Context, serverUrl:String, name:String, pass:String){
val macAddress = DeviceUtils.getAndroidID()
val clientId = if(!TextUtils.isEmpty(macAddress)){
macAddress
}else{
UUID.randomUUID().toString()
}
mqttClient = MqttAndroidClient(context,serverUrl,clientId)
connectOptions = MqttConnectOptions().apply {
isCleanSession = false
connectionTimeout = 30
keepAliveInterval = 10
userName = name
password = pass.toCharArray()
}
}
/**
* 连接
* @param mqttCallback 接到订阅的消息的回调
* @param isFailRetry 失败是否重新连接
*/
fun connect(topic: Topic, qos: Qos, isFailRetry:Boolean, mqttCallback: MqttCallback){
mqttClient.setCallback(mqttCallback)
if(mqttActionListener == null){
mqttActionListener = object :IMqttActionListener{
override fun onSuccess(asyncActionToken: IMqttToken?) {
LogUtils.eTag("连接","连接成功")
subscribe(topic,qos)
}
override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {
//失败重连
LogUtils.eTag("连接","连接失败重试${exception?.message}")
if (isFailRetry){
mqttClient.connect(connectOptions,null,mqttActionListener)
}
}
}
}
mqttClient.connect(connectOptions,null,mqttActionListener)
}
/**
* 订阅
*/
private fun subscribe(topic: Topic,qos:Qos){
mqttClient.subscribe(topic.value(),qos.value())
}
/**
* 发布
*/
fun publish(topic:Topic,message:String,qos:Qos){
val msg = MqttMessage()
msg.isRetained = false
msg.payload = message.toByteArray()
msg.qos = qos.value()
mqttClient.publish(topic.value(),msg)
}
/**
* 断开连接
*/
fun disconnect(){
mqttClient.disconnect()
}
}
enum class Qos{
QOS_ZERO{
override fun value():Int{
return 0
}
},
QOS_ONE{
override fun value():Int{
return 1
}
},
QOS_TWO{
override fun value():Int{
return 2
}
};
abstract fun value(): Int
}
enum class Topic{
//订阅主题
TOPIC_MSG{
override fun value():String{
return "testtopic/#"
}
},
//发布主题
TOPIC_SEND{
override fun value():String{
return "testtopic/1"
}
};
abstract fun value(): String
}
3.使用
在Activity中的使用如下:
class MainActivity : AppCompatActivity() {
@RequiresApi(Build.VERSION_CODES.N)
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
val server = "tcp://mqtt.p2hp.com:1883" //服务端地址
val mqttHelper = MQTTHelper(this,server,"123","123")
mqttHelper.connect(Topic.TOPIC_MSG, Qos.QOS_TWO,false,object : MqttCallback {
override fun connectionLost(cause: Throwable?) {
}
override fun messageArrived(topic: String?, message: MqttMessage?) {
//收到消息
message?.payload?.let { ToastUtils.showShort(String(it)) }
LogUtils.eTag("消息", message?.payload?.let { String(it) })
}
override fun deliveryComplete(token: IMqttDeliveryToken?) {
}
})
val etMsg = findViewById<EditText>(R.id.et_msg)
findViewById<Button>(R.id.tv_send).setOnClickListener {
//发送消息
mqttHelper.publish(Topic.TOPIC_SEND,etMsg.text.toString(),Qos.QOS_TWO)
}
}
}