mqtt网络协议,相信跟物联网相关的公司都会遇到,在Android,iOS原生开发是可以很好的实现,相关的资料也是很多!但是在flutter里面还算比较尝鲜的一个领域吧!
幸亏flutter里面 已经有一个还不错的第三库mqtt_client,我在项目中用的版本是比较低一点的版本,如下图
此版本在项目中,已经运行了有一段时间了,相关的app,也已经上线了,目前没有发现有什么问题.但是,秉承我一贯的原则,干货,干货,干货;当然是要为大家带来最新版本的mqtt的示例啦~~此时应该有掌声
这篇文章介绍 怎么封装一个属于自己的工具,解决在使用过程 中文乱码等问题。
首先我希望用单例模式,这样整个项目随处都可以调用,使用起来比较方便
然后希望支持加密与不加密的情况!
最后希望支持中文
话不多说,新建一个工程flutter_app_mqtt,在pubspec.yaml文件中,添加依赖库mqtt_client,然后pub get一下,下载库
mqtt_client: ^7.3.0
准备工作好了,我们准备封装工具类MqttTool
核心代码
MqttTool工具类代码如下:
import 'dart:async';
import 'package:mqtt_client/mqtt_client.dart';
import 'package:mqtt_client/mqtt_server_client.dart';
import 'package:typed_data/typed_buffers.dart';
typedef ConnectedCallback = void Function();
class MqttTool {
MqttQos qos = MqttQos.atLeastOnce;
MqttServerClient mqttClient;
static MqttTool _instance;
static MqttTool getInstance() {
if (_instance == null) {
_instance = MqttTool();
}
return _instance;
}
Future<MqttClientConnectionStatus> connect(String server, int port,
String clientIdentifier, String username, String password,
{bool isSsl = false}) {
mqttClient = MqttServerClient.withPort(server, clientIdentifier, port);
mqttClient.onConnected = onConnected;
mqttClient.onSubscribed = _onSubscribed;
mqttClient.onSubscribeFail = _onSubscribeFail;
mqttClient.onUnsubscribed = _onUnSubscribed;
mqttClient.setProtocolV311();
mqttClient.logging(on: false);
if (isSsl) {
mqttClient.secure = true;
mqttClient.onBadCertificate = (dynamic a) => true;
}
_log("_正在连接中...");
return mqttClient.connect(username, password);
}
disconnect() {
mqttClient.disconnect();
_log("_disconnect");
}
int publishMessage(String pTopic, String msg) {
_log("_发送数据-topic:$pTopic,playLoad:$msg");
Uint8Buffer uint8buffer = Uint8Buffer();
var codeUnits = msg.codeUnits;
uint8buffer.addAll(codeUnits);
return mqttClient.publishMessage(pTopic, qos, uint8buffer, retain: false);
}
int publishRawMessage(String pTopic, List<int> list) {
_log("_发送数据-topic:$pTopic,playLoad:$list");
Uint8Buffer uint8buffer = Uint8Buffer();
// var codeUnits = msg.codeUnits;
uint8buffer.addAll(list);
return mqttClient.publishMessage(pTopic, qos, uint8buffer, retain: false);
}
Subscription subscribeMessage(String subtopic) {
return mqttClient.subscribe(subtopic, qos);
}
unsubscribeMessage(String unSubtopic) {
mqttClient.unsubscribe(unSubtopic);
}
MqttClientConnectionStatus getMqttStatus() {
return mqttClient.connectionStatus;
}
Stream<List<MqttReceivedMessage<MqttMessage>>> updates() {
_log("_监听成功!");
return mqttClient.updates;
}
onConnected() {
// mqttClient.onConnected = callback;
_log("_onConnected");
}
onDisConnected(ConnectedCallback callback) {
mqttClient.onDisconnected = callback;
}
_onDisconnected() {
_log("_onDisconnected");
}
_onSubscribed(String topic) {
_log("_订阅主题成功---topic:$topic");
}
_onUnSubscribed(String topic) {
_log("_取消订阅主题成功---topic:$topic");
}
_onSubscribeFail(String topic) {
_log("_onSubscribeFail");
}
_log(String msg) {
print("MQTT-->$msg");
}
}
工具类封装完成了,现在就需要去项目中应用了,因此,我们改造一下HomePage的布局,来完成测试验证工作
Widget build(BuildContext context) {
return Scaffold(
appBar: AppBar(
title: Text(widget.title),
),
body: Center(
child: Column(
mainAxisAlignment: MainAxisAlignment.center,
children: <Widget>[
Text(
'You have pushed the button this many times:',
),
Text(
'$_counter',
style: Theme.of(context).textTheme.headline4,
),
Container(
child: RaisedButton(
onPressed: _connect,
child: Text(
"connect",
style: TextStyle(fontSize: 20, color: Colors.purple),
),
),
),
Container(
child: RaisedButton(
onPressed: _subscribeTopic,
child: Text(
"subscribe topic",
style: TextStyle(fontSize: 20, color: Colors.purple),
),
),
),
Container(
child: RaisedButton(
onPressed: _unSubscribeTopic,
child: Text(
"unSubscribe topic",
style: TextStyle(fontSize: 20, color: Colors.purple),
),
),
),
Container(
child: RaisedButton(
onPressed: _publishTopic,
child: Text(
"publish topic",
style: TextStyle(fontSize: 20, color: Colors.purple),
),
),
),
Container(
child: RaisedButton(
onPressed: _startListen,
child: Text(
"start listen",
style: TextStyle(fontSize: 20, color: Colors.purple),
),
),
),
Container(
child: RaisedButton(
onPressed: _disconnect,
child: Text(
"disconnect",
style: TextStyle(fontSize: 20, color: Colors.purple),
),
),
)
],
),
),
floatingActionButton: FloatingActionButton(
onPressed: _incrementCounter,
tooltip: 'Increment',
child: Icon(Icons.add),
), // This trailing comma makes auto-formatting nicer for build methods.
);
}
对应的UI图是这样的
RaiseButton对应的点击事件函数如下
// 建立连接
_connect() async {
String server = "your server name";
int port = 1883;
String clientId = "86-1885999fuehxz5f3ced1e";
String userName = "86-18859995315";
String password = "63ab9508485e131f946ce59ab9b3b687";
MqttTool.getInstance()
.connect(server, port, clientId, userName, password)
.then((v) {
if (v.returnCode == MqttConnectReturnCode.connectionAccepted) {
print("恭喜你~ ====mqtt连接成功");
} else if (v.returnCode == MqttConnectReturnCode.badUsernameOrPassword) {
print("有事做了~ ====mqtt连接失败 --密码错误!!!");
} else {
print("有事做了~ ====mqtt连接失败!!!");
}
});
}
// 订阅主题
_subscribeTopic() {
String clientId = "86-1885999fuehxz5f3ced1e";
String topic = "device/F4CFA26F1E43/#";
String topic2 = "reply/device/F4CFA26F1E43/#";
MqttTool.getInstance().subscribeMessage(topic);
MqttTool.getInstance().subscribeMessage(topic2);
}
// 取消订阅
_unSubscribeTopic() {
String clientId = "86-1885999fuehxz5f3ced1e";
String topic = "device/F4CFA26F1E43/#";
MqttTool.getInstance().unsubscribeMessage(topic);
}
// 发布消息
_publishTopic() {
String topic1 =
"api/device/F4CFA26F1E43/86-1885999mvqqyy5f3cf0d5/attribute/OccupiedCoolingSetpoint";
String str1 = "2950";
String topic2 =
"api/device/F4CFA26F1E43/86-1885999mvqqyy5f3cf0d5/attribute/OccupiedHeatingSetpoint";
String str2 = "2900";
MqttTool.getInstance().publishMessage(topic1, str1);
MqttTool.getInstance().publishMessage(topic2, str2);
}
// 监听消息的具体实现
_onData(List<MqttReceivedMessage<MqttMessage>> data) {
final MqttPublishMessage recMess = data[0].payload;
final String topic = data[0].topic;
final String pt = Utf8Decoder().convert(recMess.payload.message);
String desString = "topic is <$topic>, payload is <-- $pt -->";
print("string =$desString");
Map p = Map();
p["topic"] = topic;
p["type"] = "string";
p["payload"] = pt;
ListEventBus.getDefault().post(p);
}
// 开启监听消息
_startListen() {
_listenSubscription = MqttTool.getInstance().updates().listen(_onData);
}
// 断开连接
_disconnect() {
MqttTool.getInstance().disconnect();
}
点击RaiseButton顺序 connect--->subscribe topic---->start listen---->publish topic----> disconnect
应的控制台log 如下:
至此,Mqtt协议的封装,加应用完成得差不多了.
还有二点需要补充一下,
一个是加密问题,
另一个是 整个工程在监听mqtt回来的数据该如何处理
加密问题,其实MqttTool工具类代码,已经处理好了,相关代码如下
if (isSsl) {
mqttClient.secure = true;
mqttClient.onBadCertificate = (dynamic a) => true;
}
所以用加密端口时,_connect()方法里面要多传一个参数 isSsl:true
// 建立连接
_connect() async {
String server = "connect.owon.com";
int port = 8883;
String clientId = "86-1885999fuehxz5f3ced1e";
String userName = "86-188599895315";
String password = "63ab9508485e131f946ce59ab9b3b687";
MqttTool.getInstance()
.connect(server, port, clientId, userName, password,isSsl: true)
.then((v) {
if (v.returnCode == MqttConnectReturnCode.connectionAccepted) {
print("恭喜你~ ====mqtt连接成功");
} else if (v.returnCode == MqttConnectReturnCode.badUsernameOrPassword) {
print("有事做了~ ====mqtt连接失败 --密码错误!!!");
} else {
print("有事做了~ ====mqtt连接失败!!!");
}
});
}
另一个是 整个工程在监听mqtt回来的数据该如何处理
我这边是把mqtt工具类接收到数据用evenbus发送出去了, 其他需要监听数据的page,就去用evenbus去监听接收的数据.
ListEventBus.getDefault().post(p);
evenbus类的代码如下
import 'dart:async';
class ListEventBus {
static ListEventBus _instance;
StreamController _streamController;
factory ListEventBus.getDefault() {
if (_instance == null) {
_instance = ListEventBus._init();
}
return _instance;
}
ListEventBus._init() {
_streamController = StreamController.broadcast();
}
StreamSubscription<T> register<T>(void onData(T event)) {
///需要返回订阅者,所以不能使用下面这种形式
// return _streamController.stream.listen((event) {
// if (event is T) {
// onData(event);
// }
// });
///没有指定类型,全类型注册
if (T == dynamic) {
return _streamController.stream.listen(onData);
} else {
///筛选出 类型为 T 的数据,获得只包含T的Stream
Stream<T> stream =
_streamController.stream.where((type) => type is T).cast<T>();
return stream.listen(onData);
}
}
void post(event) {
_streamController.add(event);
}
void unregister() {
_streamController.close();
}
void pause() {
_streamController.onPause();
}
void resume() {
_streamController.onResume();
}
}
在需要监听数据的page里面添加以下代码就可以啦
StreamSubscription<Map<dynamic, dynamic>> _listEvenBusSubscription;
@override
void initState() {
// TODO: implement initState
super.initState();
_listEvenBusSubscription =
ListEventBus.getDefault().register<Map<dynamic, dynamic>>((msg) {
String topic = msg["topic"];
Map<String, dynamic> payload = msg["payload"];
});
}
main.dart文件里面的完整代码
import 'dart:async';
import 'dart:convert';
import 'live_even_bus.dart';
import 'package:flutter/material.dart';
import 'package:mqtt_client/mqtt_client.dart';
import 'mqtt_tool.dart';
void main() {
runApp(MyApp());
}
class MyApp extends StatelessWidget {
@override
Widget build(BuildContext context) {
return MaterialApp(
title: 'Flutter Demo',
theme: ThemeData(
primarySwatch: Colors.blue,
visualDensity: VisualDensity.adaptivePlatformDensity,
),
home: MyHomePage(title: 'Flutter Demo Home Page'),
);
}
}
class MyHomePage extends StatefulWidget {
MyHomePage({Key key, this.title}) : super(key: key);
final String title;
@override
_MyHomePageState createState() => _MyHomePageState();
}
class _MyHomePageState extends State<MyHomePage> {
int _counter = 0;
StreamSubscription<List<MqttReceivedMessage<MqttMessage>>>
_listenSubscription;
StreamSubscription<Map<dynamic, dynamic>> _listEvenBusSubscription;
void _incrementCounter() {
setState(() {
_counter++;
});
}
@override
void initState() {
// TODO: implement initState
super.initState();
_listEvenBusSubscription =
ListEventBus.getDefault().register<Map<dynamic, dynamic>>((msg) {
String topic = msg["topic"];
Map<String, dynamic> payload = msg["payload"];
print("监听的 topc= $topic, payload= $payload");
});
}
@override
Widget build(BuildContext context) {
return Scaffold(
appBar: AppBar(
title: Text(widget.title),
),
body: Center(
child: Column(
mainAxisAlignment: MainAxisAlignment.center,
children: <Widget>[
Text(
'You have pushed the button this many times:',
),
Text(
'$_counter',
style: Theme.of(context).textTheme.headline4,
),
Container(
child: RaisedButton(
onPressed: _connect,
child: Text(
"connect",
style: TextStyle(fontSize: 20, color: Colors.purple),
),
),
),
Container(
child: RaisedButton(
onPressed: _subscribeTopic,
child: Text(
"subscribe topic",
style: TextStyle(fontSize: 20, color: Colors.purple),
),
),
),
Container(
child: RaisedButton(
onPressed: _unSubscribeTopic,
child: Text(
"unSubscribe topic",
style: TextStyle(fontSize: 20, color: Colors.purple),
),
),
),
Container(
child: RaisedButton(
onPressed: _publishTopic,
child: Text(
"publish topic",
style: TextStyle(fontSize: 20, color: Colors.purple),
),
),
),
Container(
child: RaisedButton(
onPressed: _startListen,
child: Text(
"start listen",
style: TextStyle(fontSize: 20, color: Colors.purple),
),
),
),
Container(
child: RaisedButton(
onPressed: _disconnect,
child: Text(
"disconnect",
style: TextStyle(fontSize: 20, color: Colors.purple),
),
),
)
],
),
),
floatingActionButton: FloatingActionButton(
onPressed: _incrementCounter,
tooltip: 'Increment',
child: Icon(Icons.add),
), // This trailing comma makes auto-formatting nicer for build methods.
);
}
// 建立连接
_connect() async {
String server = "your server name";
int port = 8883;
String clientId = "86-1885999713jlb5f3d01f3";
String userName = "86-188599895315";
String password = "63ab9508485e131f946ce59ab9b3b687";
MqttTool.getInstance()
.connect(server, port, clientId, userName, password, isSsl: true)
.then((v) {
if (v.returnCode == MqttConnectReturnCode.connectionAccepted) {
print("恭喜你~ ====mqtt连接成功");
} else if (v.returnCode == MqttConnectReturnCode.badUsernameOrPassword) {
print("有事做了~ ====mqtt连接失败 --密码错误!!!");
} else {
print("有事做了~ ====mqtt连接失败!!!");
}
});
}
// 订阅主题
_subscribeTopic() {
String clientId = "86-1885999fuehxz5f3ced1e";
String topic = "device/F4CFA26F1E43/#";
String topic2 = "reply/device/F4CFA26F1E43/#";
MqttTool.getInstance().subscribeMessage(topic);
MqttTool.getInstance().subscribeMessage(topic2);
}
// 取消订阅
_unSubscribeTopic() {
String clientId = "86-1885999fuehxz5f3ced1e";
String topic = "device/F4CFA26F1E43/#";
MqttTool.getInstance().unsubscribeMessage(topic);
}
// 发布消息
_publishTopic() {
String topic1 =
"api/device/F4CFA26F1E43/86-1885999mvqqyy5f3cf0d5/attribute/OccupiedCoolingSetpoint";
String str1 = "2950";
String topic2 =
"api/device/F4CFA26F1E43/86-1885999mvqqyy5f3cf0d5/attribute/OccupiedHeatingSetpoint";
String str2 = "2900";
MqttTool.getInstance().publishMessage(topic1, str1);
MqttTool.getInstance().publishMessage(topic2, str2);
}
// 监听消息的具体实现
_onData(List<MqttReceivedMessage<MqttMessage>> data) {
final MqttPublishMessage recMess = data[0].payload;
final String topic = data[0].topic;
final String pt = Utf8Decoder().convert(recMess.payload.message);
String desString = "topic is <$topic>, payload is <-- $pt -->";
print("string =$desString");
Map p = Map();
p["topic"] = topic;
p["type"] = "string";
p["payload"] = pt;
ListEventBus.getDefault().post(p);
}
// 开启监听消息
_startListen() {
_listenSubscription = MqttTool.getInstance().updates().listen(_onData);
}
// 断开连接
_disconnect() {
MqttTool.getInstance().disconnect();
}
}
结尾
mqtt这个库也是在不断的更新,如果从低版本升到高版本,可能会遇到不兼容的问题,希望到家,冷静沉着应对!祝君好运~ 最后,小伙伴们觉得有点帮助,请帮忙点个赞吧。如果有什么问题需要探讨的,也欢迎留言~
2023年4月15日更新,希望能帮助到小伙伴们,好运~
mqtt库更新到如下版本
mqtt_client: ^9.6.6
主要变化:
- MqttTool工具类适配空安全,
- 增加mqtt连接成功监听
- 增加mqtt连接断开监听
- 增加网络异常时提示
- 解决中文乱码问题
如下
import 'dart:async';
import 'dart:convert';
import 'package:connectivity_plus/connectivity_plus.dart';
import 'package:mqtt_client/mqtt_client.dart';
import 'package:mqtt_client/mqtt_server_client.dart';
import 'package:owoncare/owon_res/owon_constant.dart';
import '../generated/l10n.dart';
import '../owon_utils/owon_log.dart';
import 'package:typed_data/typed_buffers.dart';
import 'owon_toast.dart';
class OwonMqtt {
MqttQos qos = MqttQos.atLeastOnce;
MqttServerClient mqttClient;
static OwonMqtt _instance;
StreamSubscription<ConnectivityResult> _subscription;
static Set<String> keepTopics = new Set<String>();
ConnectivityResult _currentConnectivityResult;
static OwonMqtt getInstance() {
if (_instance == null) {
_instance = OwonMqtt();
}
return _instance;
}
Future<MqttClientConnectionStatus> connect(String server, int port,
String clientIdentifier, String username, String password,
{bool isSsl = false, Function onDisconnected, Function onConnected}) {
_subscription = Connectivity()
.onConnectivityChanged
.listen((ConnectivityResult result) {
_currentConnectivityResult = result;
});
print("mqtt connect to $server:$port, clientId:$clientIdentifier");
mqttClient = MqttServerClient.withPort(server, clientIdentifier, port);
mqttClient.onSubscribed = _onSubscribed;
mqttClient.onSubscribeFail = _onSubscribeFail;
mqttClient.onUnsubscribed = _onUnSubscribed;
mqttClient.onDisconnected = onDisconnected;
mqttClient.onConnected = onConnected;
mqttClient.setProtocolV311();
mqttClient.logging(on: false);
mqttClient.keepAlivePeriod = 120;
if (isSsl) {
mqttClient.secure = true;
mqttClient.onBadCertificate = (dynamic a) => true;
}
_log("mqtt正在连接中...");
return mqttClient.connect(username, password);
}
disconnect() {
if (mqttClient != null) {
mqttClient.disconnect();
_subscription.cancel();
_log("mqtt disconnect");
} else {
_log("mqtt client is null , disconnect failure");
}
}
// int publishMessage(String pTopic, String msg) {
// if (_currentConnectivityResult == ConnectivityResult.none) {
// Future.delayed(Duration(milliseconds: 0), () {
// OwonToast.show(S.of(OwonConstant.context).login_no_network,);
// });
// return null;
// }
// // AbnormalType currentType = Provider.of<AbnormalProvider>(currentContext).abnormalType;
// // if(currentType == AbnormalType.failedTwice && (mqttClient.connectionStatus.state == MqttConnectionState.disconnected)){
// // print("mqtt.dart第73行");
// // ListEventBus.getDefault().post(2);
// // }
//
// _log("_发送数据-topic:$pTopic,playLoad:$msg");
// Uint8Buffer uint8buffer = Uint8Buffer();
// var codeUnits = msg.codeUnits;
// uint8buffer.addAll(codeUnits);
//
// return mqttClient.publishMessage(pTopic, qos, uint8buffer, retain: false);
// }
bool isEnable() {
if (_currentConnectivityResult == ConnectivityResult.none) {
OwonLog.e("mqtt 网络异常");
Future.delayed(Duration(milliseconds: 0), () {
OwonToast.show(
S.of(OwonConstant.context).login_no_network,
);
});
return false;
}
if (mqttClient == null ||
mqttClient.connectionStatus.state != MqttConnectionState.connected) {
OwonLog.e("mqtt 连接状态异常");
if (mqttClient != null) {
OwonLog.e("主动断开mqtt连接");
mqttClient.disconnect();
}
return false;
} else {
return true;
}
}
///此方法可以兼容中文
int publishMessage(String pTopic, String msg) {
if (!isEnable()) return null;
_log("_发送数据-topic:$pTopic,playLoad:$msg");
final bytes = utf8.encode(msg);
// 创建MQTT消息并发送
final message = MqttClientPayloadBuilder();
bytes.forEach((element) {
message.addByte(element);
});
return mqttClient.publishMessage(pTopic, qos, message.payload,
retain: false);
}
int publishRawMessage(String pTopic, List<int> list) {
if (!isEnable()) return null;
_log("_发送数据-topic:$pTopic,playLoad:$list");
Uint8Buffer uint8buffer = Uint8Buffer();
// var codeUnits = msg.codeUnits;
uint8buffer.addAll(list);
return mqttClient.publishMessage(pTopic, qos, uint8buffer, retain: false);
}
Subscription subscribeMessage(String subtopic, {bool keep = false}) {
if (keep) {
keepTopics.add(subtopic);
}
if (!isEnable()) return null;
return mqttClient.subscribe(subtopic, qos);
}
unsubscribeMessage(String unSubtopic) {
if (!isEnable()) return null;
mqttClient.unsubscribe(unSubtopic);
}
MqttClientConnectionStatus getMqttStatus() {
return mqttClient != null ? mqttClient.connectionStatus : null;
}
Stream<List<MqttReceivedMessage<MqttMessage>>> updates() {
_log("_监听成功!");
return mqttClient.updates;
}
_onSubscribed(String topic) {
_log("_订阅主题成功---topic:$topic");
}
_onUnSubscribed(String topic) {
_log("_取消订阅主题成功---topic:$topic");
}
_onSubscribeFail(String topic) {
_log("_onSubscribeFail");
}
_log(String msg) {
// print("MQTT-->$msg");
OwonLog.e("MQTT-->$msg");
}
static cleanKeepTopics() {
keepTopics.clear();
}
}
建立连接的代码
OwonMqtt.getInstance()
.connect(_mMqttServer, _mIsSSLFlag ? _mMqttSSLPort : _mMqttPort,
_mClientId, _mUserName, accessToken,
isSsl: _mIsSSLFlag,
onConnected: _onMqttConnectedState,
onDisconnected: _onDisconnectedState)
.then((v) {
OwonLog.e("mqtt connect result = $v");
if (v.returnCode == MqttConnectReturnCode.connectionAccepted) {
OwonLog.e("恭喜你~ ====mqtt连接成功");
} else if (v.returnCode ==
MqttConnectReturnCode.badUsernameOrPassword) {
OwonLog.e("有事做了~ ====mqtt连接失败 --密码错误!!!");
} else {
OwonLog.e("有事做了~ ====mqtt连接失败!!!");
}
}).catchError((e) {
OwonLog.e("有事做了~ ====mqtt连接失败!!!");
});
监听mqtt连接成功
_onMqttConnectedState() {
OwonLog.e("mqtt连接已成功")
//do something,比如:去订阅相关主题,或者更新UI
}
监听mqtt连接断开
_onDisconnectedState() {
OwonLog.e("mqtt连接已断开");
//do something, 比如:给出提示,或者去重新连接mqtt等
}
补充一下,当前版本信息
Justin-Mac-mini:care-vefify$ flutter doctor
Doctor summary (to see all details, run flutter doctor -v):
[✓] Flutter (Channel stable, 3.0.3, on macOS 13.3 22E252 darwin-x64, locale zh-Hans-CN)
[✓] Android toolchain - develop for Android devices (Android SDK version 32.0.0)
[✓] Xcode - develop for iOS and macOS (Xcode 14.3)
[✓] Chrome - develop for the web
[✓] Android Studio (version 2021.2)
[✓] IntelliJ IDEA Community Edition (version 2021.2.2)
[✓] VS Code (version 1.77.1)
[✓] Connected device (2 available)
[✓] HTTP Host Availability
• No issues found!
Justin-Mac-mini:care-vefify$