express + websocket 实现单点推送

本文简要介绍一个使用websocket协议实现单点推送的小demo的实现

github:https://github.com/SMIELPF/websocket-demo

websocket

WebSocket 协议在2008年诞生,2011年成为国际标准。所有浏览器都已经支持了。
它的最大特点就是,服务器可以主动向客户端推送信息,客户端也可以主动向服务器发送信息,>是真正的双向平等对话,属于服务器推送技术的一种。

其他特点包括:

  • 建立在 TCP 协议之上,服务器端的实现比较容易。
  • 与 HTTP 协议有着良好的兼容性。默认端口也是80和443,并且握手阶段采用 HTTP 协议,因此握手时不容易屏蔽,能通过各种 HTTP 代理服务器。
  • 数据格式比较轻量,性能开销小,通信高效。
  • 可以发送文本,也可以发送二进制数据。
  • 没有同源限制,客户端可以与任意服务器通信。
  • 协议标识符是ws(如果加密,则为wss),服务器网址就是 URL。

在浏览器端,HTML5已经提供了Websocket API,而在服务端,
也有许多优秀的第三方库提供对websocket的支持,例如在Node.js中比较常用的就有socket.io, express-ws等,下面我们就用express-ws来实现一个简单的websocket通信的小demo

客户端实现

我们实现这样一个web客户端

image

可以通过下拉框选择消息发送者和接收者,点击发送按钮后,通过http post请求告知服务端消息的发送者,接收者以及消息内容,然后服务端通过websocket向消息接收者推送消息。
客户端与服务端建立url为ws://{host}/ws/:name的websocket连接,其中name为消息发送者,当发送者改变时,关闭上一条连接,建立新的连接,例如消息发送方从Bob变为Alice, 则关闭ws://{host}/ws/Bob, 建立ws://{host}/ws/Alice, 这样我们就区分开了客户端,方便之后进行单点推送。

前端html:

<!DOCTYPE html>
<html>
  <head>
  <title>websocket demo</title>
    <meta name="viewport" content="width=device-width, initial-scale=1" />
    <meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
    <style>
        .row {
            margin: 1rem
        }
    </style>
  </head>
  <body>
    <div class='row'>发送方: 
      <select id='sender'>
        <option value="Bob" selected>Bob</option>
        <option value="Alice">Alice</option>
        <option value="Jack">Jack</option>
      </select>
    </div>
    <div class='row'>接收方: 
      <select id='receiver'>
        <option value="Bob">Bob</option>
        <option value="Alice" selected>Alice</option>
        <option value="Jack">Jack</option>
      </select>
    </div>
    <textarea id='msg' class='row' rows="10" cols="30"></textarea>
    <div class='row'>
        <button id='sendBtn'>发送</button>
    </div>
    <h3 class='row'>收到的消息:</h3>
    <div id='conversation' class='row'></div>
    <script src="/bundle.js"></script>
  </body>
</html>

前端js代码:

var sender = document.getElementById('sender');
var receiver = document.getElementById('receiver');
var conversation = document.getElementById('conversation');
var sendBtn = document.getElementById('sendBtn');
var socket = null;
var createSocket = function() {
    if(socket) {
        socket.close();
    }
    var url = 'ws://' + window.location.host + '/ws/' + sender.options[sender.selectedIndex].value;
    socket = new WebSocket(url);
    socket.onopen = function() {
        console.log('connected to ' + url);
    }
    socket.onmessage = function(event) {
        var data = JSON.parse(event.data);
        conversation.innerHTML = conversation.innerHTML + data.from + ':' + data.content + '<br/>'; 
    }
    socket.onclose = function() {
        console.log('close connect to' + url);
    }
};

var sendMessage = function() {
    var msg = document.getElementById('msg').value;
    fetch('/rest/message', {
        method: 'POST',
        headers: {
            'Content-type': 'application/json'
        },
        body: JSON.stringify({
            from: sender.options[sender.selectedIndex].value,
            content: msg,
            to: receiver.options[receiver.selectedIndex].value
        }) 
    }).then(res => {
        return res.json();
    }).then(data => {
        if(!data.succeed) {
            alert(data.msg);
        }
    })
};

sender.onchange = function() {
    createSocket();
}

sendBtn.onclick = function() {
    sendMessage();
}

createSocket();

服务端实现

服务端实现依赖express和express-ws,
主要实现两个接口,一个是websocket接口,一个是http接口

websocket接口的实现如下:

const app = new express();
expressWs(app);

const wsClients = {}
app.wsClients = wsClients;

app.ws('/ws/:wid',  (ws, req) => {
    if(!wsClients[req.params.wid]) {
        wsClients[req.params.wid] = []
    }
    // 将连接记录在连接池中
    wsClients[req.params.wid].push(ws);
    ws.onclose = () => {
        // 连接关闭时,wsClients进行清理
        wsClients[req.params.wid] = wsClients[req.params.wid].filter((client) => {
            return client !== ws;
        });
        if(wsClients[req.params.wid].length === 0) {
            delete wsClients[req.params.wid];
        }
    }
});

首先声明一个连接池wsClients, 这是一个对象,键为消息发送方的名字,值是一个数组,用于保存所有对应的websocket连接实例。当一个websocket连接建立时,我们把连接记录在连接池中,并在onclose方法中声明连接关闭时清理连接池的回调。
http接口的实现如下:

app.post('/rest/message', (req, res) => {
    const to = req.body.to; // 接收方id
    const from = req.body.from; // 发送发id
    const result = { succeed: true };
    if(wsClients[to] !== undefined) {
        wsClients[to].forEach((client) => {
            client.send(JSON.stringify({
                from,
                content: req.body.content
            }));
        });
    } else {
        // 如果消息接收方没有连接,则返回错误信息
        result.succeed = false;
        result.msg = '对方不在线';
    }
    res.json(result);
});

从http请求的body中获取消息发送方和接收方,然后从连接池中遍历所有消息接收方的websocket连接实例,向客户端推送消息

完整的服务端代码如下,在实现基本功能的基础上,定时打印一下连接池中websocket连接的数量:

const express = require('express');
const expressWs = require('express-ws');

const app = new express();
expressWs(app);

const wsClients = {}
app.wsClients = wsClients;
app.use(express.json());
app.use(express.urlencoded({ extended: false }));
app.use(express.static('./static'));

app.ws('/ws/:wid',  (ws, req) => {
    if(!wsClients[req.params.wid]) {
        wsClients[req.params.wid] = []
    }
    // 将连接记录在连接池中
    wsClients[req.params.wid].push(ws);
    ws.onclose = () => {
        // 连接关闭时,wsClients进行清理
        wsClients[req.params.wid] = wsClients[req.params.wid].filter((client) => {
            return client !== ws;
        });
        if(wsClients[req.params.wid].length === 0) {
            delete wsClients[req.params.wid];
        }
    }
});

app.post('/rest/message', (req, res) => {
    const to = req.body.to; // 接收方id
    const from = req.body.from; // 发送发id
    const result = { succeed: true };
    if(wsClients[to] !== undefined) {
        wsClients[to].forEach((client) => {
            client.send(JSON.stringify({
                from,
                content: req.body.content
            }));
        });
    } else {
        // 如果消息接收方没有连接,则返回错误信息
        result.succeed = false;
        result.msg = '对方不在线';
    }
    res.json(result);
});

setInterval(() => {
    // 定时打印连接池数量
    console.log('websocket connection counts:')
    Object.keys(wsClients).forEach(key => {
        console.log(key, ':', wsClients[key].length);
    })
    console.log('-----------------------------');
}, 5000);

app.listen(3000, () => {
    console.log('visit http://localhost:3000');
    // child_process.execSync('start http://localhost:3000');
});

思考

现在我们就实现了一个简单的websocket通信的小demo,但是现在这种实现方式是在处理http post请求的过程中向客户端使用websocket推送消息,如果服务端是单节点部署倒是没什么问题

image

但是如果服务是部署在多个节点上,就会出现部分客户端收不到服务端推送的情况
image

如上图所示,Bob1想给Alice发消息,但是只有跟Bob1负载在同一节点的Alice1能收到服务端的推送,Alice2就收不到了。这种时候我们就需要利用Redis的pub/sub或者kafka这样的中间件了
image

正所谓,在计算机领域,如果有什么事是加一个中间层搞不定的,那就加两个。
关于websocket集群的实现,可以看一下这一篇:关于一个 websocket 多节点分布式问题的头条前端面试题

本demo的代码:github:https://github.com/SMIELPF/websocket-demo

觉得有帮助的话欢迎点赞,点一点star哦 : )

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,772评论 6 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,458评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,610评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,640评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,657评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,590评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,962评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,631评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,870评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,611评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,704评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,386评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,969评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,944评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,179评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,742评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,440评论 2 342

推荐阅读更多精彩内容

  • 可能有很多的同学有用 setInterval 控制 ajax 不断向服务端请求最新数据的经历(轮询)看下面的代码:...
    船长___阅读 14,845评论 0 8
  • 原文地址:http://www.ibm.com/developerworks/cn/java/j-lo-WebSo...
    敢梦敢当阅读 8,876评论 0 50
  • WebSocket 机制 WebSocket 是 HTML5 一种新的协议。它实现了浏览器与服务器全双工通信,能更...
    勇敢的_心_阅读 2,241评论 0 4
  • WebSocket简介 谈到Web实时推送,就不得不说WebSocket。在WebSocket出现之前,很多网站为...
    吧啦啦小汤圆阅读 8,122评论 15 75
  • 很多场景下的应用对数据实时更新要求很高。比如股票交易,数字资产交易,还有一些需要动态更新数据的大屏数据可视化应用等...
    前端进阶体验阅读 1,277评论 0 4