2019-08-22(后台内置MQTT)

    我们自己有一套系统,该系统包括前端的可视化界面,以及后端提供的服务。由于系统会应用在很多的场景中,所以需要做很多的对接,对接第三方的各种系统,有动环的时实数据和告警数据,还有DCIM的资产信息。

    有时候客户我们的可视化页面也会被嵌入在第三方的页面中,为了使得可视化页面里的内容发生变化时他们的页面也做相应的变化。因此我们可视化页面的交互事件,比如鼠标点击了哪个对象的事件,场景切换的事件,聚焦发生变化的事件等等都要向外派发。

为了处理这一系列的复杂关系,我在我们的系统后台内置了一个MQTT代理服务(当然也可以使用第三方的)。

梳理了一下我们的这个MQTT所需要处理的问题,大概如下:

1、可视化页面的客户端可订阅指定的消息;

2、可视化页面的客户端的交互事件等消息的发布;

3、第三方可订阅某个特定的可视化页面客户端的消息;

4、第三方可给某个指定的可视化页面客户端推送消息;

5、第三方可给所有的可视化页面客户端推送消息;

6、MQTT实现标准的接口对接功能,如:时实数据,告警,资产等。第三方可以通过MQTT来实现数据对接;

7、通过我们的这个实现系统通讯。

前端没有引用mqtt的js包,浏览器端的订阅我们直接通过socket.io来实现的。

服务端的代码如下:


内置mqtt服务是mosca

后端对前端socket的转换,以及收到消息后的转发:

前端的mqtt的订阅的发布

/**

* mqtt服务

* 统一创建一个client会比较麻烦,那就需要管理所有的socket的消息,每次来了新的消息需要做分发处理

* 如果每个socket创建一个socket,那就不需要对过来的消息做分发处理了

*/

const mqttService = require('./Service.js');

const mqtt = require('mqtt');

const logger = require('../../Logger');

const lite = require('iconv-lite');

const realtimeDataReceiver = require('./RealtimeDataReceiver.js');

let sockets = {};

function start(app, io, httpServer) {

mqttService.start(httpServer); // 启动内置的mqtt服务

io.on('connection', function(socket) {

console.log('socketId:'+ socket.id +'. connect!!!');

socket.on('mqtt_sub', function(order) { // 前端的订阅

// console.log('mqtt_sub connect success!!!');

let topic = order.topic;

subscribe(socket, topic);

});

socket.on('mqtt_pub', function(order) { // 前端的发布

// console.log('mqtt_pub connect success!!!');

let topic = order.topic;

let message = order.message;

publish(socket, topic, message);

});

socket.on('disconnect', function() { // 前端

let socketClients = sockets[socket.id];

if (socketClients && socketClients.length > 0) {

socketClients.forEach(function(client){

client.end();

});

};

delete sockets[socket.id];

});

});

//第三方也可以通过mqtt向3D机房平台后台推送时实数据

realtimeDataReceiver.start();

}

/**

* 消息订阅

* 需要注意的是:

*  1、消息能堆到指定的客户端,这样发布消息时就要加上对应的客户端的socketId

*  2、如果publish的topic中没有socketId,那就推向所有的客户端

*/

function subscribe(socket,topic,order){

let client = mqtt.connect('mqtt://127.0.0.1'); // 连接到当前的mqtt服务

if (!socket || !topic) {

return ;

}

let socketId = socket.id;

client.subscribe(topic);

client.subscribe(socketId+'/'+topic);

client.on('message', function(topic, message) {

//需要做一次装换,mosca中吐出来的是buffer。当然其他的消息代理服务器中吐出来的不是这样的

let value = lite.decode(Buffer.from(message),'utf8');

topic = topic.replace(socketId+'/','');

receiveHandle(socket,topic,value);

// client.end();

});

let socketClients = sockets[socket.id];

if (!socketClients) {

socketClients = sockets[socket.id] = [];

}

socketClients.push(client);

}

/**

* 处理mqtt服务器过来的消息

*/

function receiveHandle(socket,topic,message){

console.log('-------------message:');

console.log(message);

socket.emit('mqtt_message', {topic:topic,message:message});

}

/**

* 消息发布

* @ topic 发布的标题

* @message 是发布的内容,数据格式是json对象

*/

function publish(socket,topic,message) {

let client = mqtt.connect('mqtt://127.0.0.1'); // 连接到当前的mqtt服务

if (!socket || !topic || !message) {

return;

}

topic = socket.id+'/'+topic;

client.publish(topic, JSON.stringify(message));

// client.end();

client.on('message', function(topic,message) {

logger.info('publish:' + topic+'; message:'+message);

client.end();

});

}

exports.start = start;

    需要注意的是,为了推到指定的客户端,第三方系统推送的时候需要加上对应的前端的socketId(目前是这样的,可能会存在更好的方式),然后后台处理时会通过该id找到对应的socket。如果推的时候没有socketId,那订阅了该消息的所有的客户端都能收到发布的消息,否则只有订阅了该消息并且socketId是一致的客户端才能收到消息。

浏览器前端的订阅和发布:

发布和订阅

mqtt_pub : 发布消息

mqtt_sub : 消息订阅

mqtt_message : 订阅的消息的接收


第三方的订阅发布跟标准的mqtt的订阅发布完全一样,如下是一些例子:

第三方调用

处理第三方的推送数据,目前实现了告警、时实和资产的接口:

alarm/#:告警;add是新发告警、delete是删除告警、ack是确认告警

monitor:时实数据;温/湿度、资产的时实数据

asset/# :资产对接; add、update、delete

时实数据对接

    第三方向客户端发送命令控制前端页面交互,比如场景切换,看到某个对象等,目前内置了几种基本的事件,这个可以扩展,直接实现对应的方法就行了。代码截图如下:

命令响应1


具体的命令解析

前端的交互事件下发:

前端消息发布

这样整个mqtt的设计基本完成。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 196,165评论 5 462
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 82,503评论 2 373
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 143,295评论 0 325
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,589评论 1 267
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,439评论 5 358
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,342评论 1 273
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,749评论 3 387
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,397评论 0 255
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,700评论 1 295
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,740评论 2 313
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,523评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,364评论 3 314
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,755评论 3 300
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,024评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,297评论 1 251
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,721评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,918评论 2 336

推荐阅读更多精彩内容