本文简要介绍一个使用websocket协议实现单点推送的小demo的实现
github:https://github.com/SMIELPF/websocket-demo
websocket
WebSocket 协议在2008年诞生,2011年成为国际标准。所有浏览器都已经支持了。
它的最大特点就是,服务器可以主动向客户端推送信息,客户端也可以主动向服务器发送信息,>是真正的双向平等对话,属于服务器推送技术的一种。其他特点包括:
- 建立在 TCP 协议之上,服务器端的实现比较容易。
- 与 HTTP 协议有着良好的兼容性。默认端口也是80和443,并且握手阶段采用 HTTP 协议,因此握手时不容易屏蔽,能通过各种 HTTP 代理服务器。
- 数据格式比较轻量,性能开销小,通信高效。
- 可以发送文本,也可以发送二进制数据。
- 没有同源限制,客户端可以与任意服务器通信。
- 协议标识符是ws(如果加密,则为wss),服务器网址就是 URL。
在浏览器端,HTML5已经提供了Websocket API,而在服务端,
也有许多优秀的第三方库提供对websocket的支持,例如在Node.js中比较常用的就有socket.io, express-ws等,下面我们就用express-ws来实现一个简单的websocket通信的小demo
客户端实现
我们实现这样一个web客户端
可以通过下拉框选择消息发送者和接收者,点击发送按钮后,通过http post请求告知服务端消息的发送者,接收者以及消息内容,然后服务端通过websocket向消息接收者推送消息。
客户端与服务端建立url为
ws://{host}/ws/:name
的websocket连接,其中name为消息发送者,当发送者改变时,关闭上一条连接,建立新的连接,例如消息发送方从Bob变为Alice, 则关闭ws://{host}/ws/Bob
, 建立ws://{host}/ws/Alice
, 这样我们就区分开了客户端,方便之后进行单点推送。
前端html:
<!DOCTYPE html>
<html>
<head>
<title>websocket demo</title>
<meta name="viewport" content="width=device-width, initial-scale=1" />
<meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
<style>
.row {
margin: 1rem
}
</style>
</head>
<body>
<div class='row'>发送方:
<select id='sender'>
<option value="Bob" selected>Bob</option>
<option value="Alice">Alice</option>
<option value="Jack">Jack</option>
</select>
</div>
<div class='row'>接收方:
<select id='receiver'>
<option value="Bob">Bob</option>
<option value="Alice" selected>Alice</option>
<option value="Jack">Jack</option>
</select>
</div>
<textarea id='msg' class='row' rows="10" cols="30"></textarea>
<div class='row'>
<button id='sendBtn'>发送</button>
</div>
<h3 class='row'>收到的消息:</h3>
<div id='conversation' class='row'></div>
<script src="/bundle.js"></script>
</body>
</html>
前端js代码:
var sender = document.getElementById('sender');
var receiver = document.getElementById('receiver');
var conversation = document.getElementById('conversation');
var sendBtn = document.getElementById('sendBtn');
var socket = null;
var createSocket = function() {
if(socket) {
socket.close();
}
var url = 'ws://' + window.location.host + '/ws/' + sender.options[sender.selectedIndex].value;
socket = new WebSocket(url);
socket.onopen = function() {
console.log('connected to ' + url);
}
socket.onmessage = function(event) {
var data = JSON.parse(event.data);
conversation.innerHTML = conversation.innerHTML + data.from + ':' + data.content + '<br/>';
}
socket.onclose = function() {
console.log('close connect to' + url);
}
};
var sendMessage = function() {
var msg = document.getElementById('msg').value;
fetch('/rest/message', {
method: 'POST',
headers: {
'Content-type': 'application/json'
},
body: JSON.stringify({
from: sender.options[sender.selectedIndex].value,
content: msg,
to: receiver.options[receiver.selectedIndex].value
})
}).then(res => {
return res.json();
}).then(data => {
if(!data.succeed) {
alert(data.msg);
}
})
};
sender.onchange = function() {
createSocket();
}
sendBtn.onclick = function() {
sendMessage();
}
createSocket();
服务端实现
服务端实现依赖express和express-ws,
主要实现两个接口,一个是websocket接口,一个是http接口
websocket接口的实现如下:
const app = new express();
expressWs(app);
const wsClients = {}
app.wsClients = wsClients;
app.ws('/ws/:wid', (ws, req) => {
if(!wsClients[req.params.wid]) {
wsClients[req.params.wid] = []
}
// 将连接记录在连接池中
wsClients[req.params.wid].push(ws);
ws.onclose = () => {
// 连接关闭时,wsClients进行清理
wsClients[req.params.wid] = wsClients[req.params.wid].filter((client) => {
return client !== ws;
});
if(wsClients[req.params.wid].length === 0) {
delete wsClients[req.params.wid];
}
}
});
首先声明一个连接池wsClients, 这是一个对象,键为消息发送方的名字,值是一个数组,用于保存所有对应的websocket连接实例。当一个websocket连接建立时,我们把连接记录在连接池中,并在onclose方法中声明连接关闭时清理连接池的回调。
http接口的实现如下:
app.post('/rest/message', (req, res) => {
const to = req.body.to; // 接收方id
const from = req.body.from; // 发送发id
const result = { succeed: true };
if(wsClients[to] !== undefined) {
wsClients[to].forEach((client) => {
client.send(JSON.stringify({
from,
content: req.body.content
}));
});
} else {
// 如果消息接收方没有连接,则返回错误信息
result.succeed = false;
result.msg = '对方不在线';
}
res.json(result);
});
从http请求的body中获取消息发送方和接收方,然后从连接池中遍历所有消息接收方的websocket连接实例,向客户端推送消息
完整的服务端代码如下,在实现基本功能的基础上,定时打印一下连接池中websocket连接的数量:
const express = require('express');
const expressWs = require('express-ws');
const app = new express();
expressWs(app);
const wsClients = {}
app.wsClients = wsClients;
app.use(express.json());
app.use(express.urlencoded({ extended: false }));
app.use(express.static('./static'));
app.ws('/ws/:wid', (ws, req) => {
if(!wsClients[req.params.wid]) {
wsClients[req.params.wid] = []
}
// 将连接记录在连接池中
wsClients[req.params.wid].push(ws);
ws.onclose = () => {
// 连接关闭时,wsClients进行清理
wsClients[req.params.wid] = wsClients[req.params.wid].filter((client) => {
return client !== ws;
});
if(wsClients[req.params.wid].length === 0) {
delete wsClients[req.params.wid];
}
}
});
app.post('/rest/message', (req, res) => {
const to = req.body.to; // 接收方id
const from = req.body.from; // 发送发id
const result = { succeed: true };
if(wsClients[to] !== undefined) {
wsClients[to].forEach((client) => {
client.send(JSON.stringify({
from,
content: req.body.content
}));
});
} else {
// 如果消息接收方没有连接,则返回错误信息
result.succeed = false;
result.msg = '对方不在线';
}
res.json(result);
});
setInterval(() => {
// 定时打印连接池数量
console.log('websocket connection counts:')
Object.keys(wsClients).forEach(key => {
console.log(key, ':', wsClients[key].length);
})
console.log('-----------------------------');
}, 5000);
app.listen(3000, () => {
console.log('visit http://localhost:3000');
// child_process.execSync('start http://localhost:3000');
});
思考
现在我们就实现了一个简单的websocket通信的小demo,但是现在这种实现方式是在处理http post请求的过程中向客户端使用websocket推送消息,如果服务端是单节点部署倒是没什么问题
但是如果服务是部署在多个节点上,就会出现部分客户端收不到服务端推送的情况
如上图所示,Bob1想给Alice发消息,但是只有跟Bob1负载在同一节点的Alice1能收到服务端的推送,Alice2就收不到了。这种时候我们就需要利用Redis的pub/sub或者kafka这样的中间件了
正所谓,在计算机领域,如果有什么事是加一个中间层搞不定的,那就加两个。
关于websocket集群的实现,可以看一下这一篇:关于一个 websocket 多节点分布式问题的头条前端面试题
本demo的代码:github:https://github.com/SMIELPF/websocket-demo
觉得有帮助的话欢迎点赞,点一点star哦 : )