一、概述
相比 HTTP 协议来说,WebSocket 协议对大多数后端开发者是比较陌生的。相比来说,WebSocket 协议重点是提供了服务端主动向客户端发送数据的能力,这样我们就可以完成实时性较高的需求。例如说,聊天 IM 即使通讯功能、消息订阅服务、网页游戏等等。
同时,因为 WebSocket 使用 TCP 通信,可以避免重复创建连接,提升通信质量和效率。
二、方案
本文采用Tomcat WebSocket方案实现WebSocket。
三、Tomcat WebSocket 快速入门
使用 Tomcat WebSocket 搭建一个 WebSocket 的示例。提供如下消息的功能支持:
- 身份认证请求
- 私聊消息
- 群聊消息
考虑到让示例更加易懂,我们先做成全局有且仅有一个大的聊天室,即建立上 WebSocket 的连接,都自动动进入该聊天室的功能。
3.1 引入依赖
在 [pom.xml
] 文件中,引入相关依赖。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.0.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>tomcat-websocket</artifactId>
<dependencies>
<!-- 实现对 WebSocket 相关依赖的引入,方便~ -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<!-- 引入 Fastjson ,实现对 JSON 的序列化,因为后续我们会使用它解析消息 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.67</version>
</dependency>
</dependencies>
</project>
3.2 WebsocketServerEndpoint
创建 [WebsocketServerEndpoint] 类,定义 Websocket 服务的端点(EndPoint)。代码如下:
// WebsocketServerEndpoint.java
@Controller
@ServerEndpoint("/")
public class WebsocketServerEndpoint {
private Logger logger = LoggerFactory.getLogger(getClass());
@OnOpen
public void onOpen(Session session, EndpointConfig config) {
logger.info("[onOpen][session({}) 接入]", session);
}
@OnMessage
public void onMessage(Session session, String message) {
logger.info("[onOpen][session({}) 接收到一条消息({})]", session, message); // 生产环境下,请设置成 debug 级别
}
@OnClose
public void onClose(Session session, CloseReason closeReason) {
logger.info("[onClose][session({}) 连接关闭。关闭原因是({})}]", session, closeReason);
}
@OnError
public void onError(Session session, Throwable throwable) {
logger.info("[onClose][session({}) 发生异常]", session, throwable);
}
}
- 在类上,添加
@Controller
注解,保证创建一个 WebsocketServerEndpoint Bean 。 - 在类上,添加 JSR-356 定义的
@ServerEndpoint
注解,标记这是一个 WebSocket EndPoint ,路径为/
。 - WebSocket 一共有四个事件,分别对应使用 JSR-356 定义的
@OnOpen
、@OnMessage
、@OnClose
、@OnError
注解。
这是最简版的 WebsocketServerEndpoint 的代码,只是实现了4个事件的日志输出。在下文,我们会慢慢把代码补全。
3.3 WebSocketConfiguration
创建 [WebsocketServerEndpoint]配置类。代码如下:
@Configuration
// @EnableWebSocket // 无需添加该注解,因为我们并不是使用 Spring WebSocket
public class WebSocketConfiguration {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
在#serverEndpointExporter()
方法中,创建ServerEndpointExporter
的Bean 。该 Bean 的作用,是扫描添加有@ServerEndpoint
注解的 Bean 。
3.4 Application
创建 [Application.java
]类,配置 @SpringBootApplication
注解即可。代码如下:
// Application.java
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
执行 Application 启动该示例项目。
考虑到可能不会或者不愿意写前端代码,所以我们直接使用 WEBSOCKET 在线测试工具 。测试 WebSocket 连接,如下图:
至此,最简单的一个 WebSocket 项目的骨架,我们已经搭建完成。下面,我们开始改造,把相应的逻辑补全。
3.5 消息
在 HTTP 协议中,是基于 Request/Response 请求响应的同步模型,进行交互。在 Websocket 协议中,是基于 Message 消息的异步模型,进行交互。这一点,是很大的不同的,等会看到具体的消息类,感受会更明显。
因为 WebSocket 协议,不像 HTTP 协议有 URI 可以区分不同的 API 请求操作,所以我们需要在 WebSocket 的 Message 里,增加能够标识消息类型,这里我们采用 type
字段。所以在这个示例中,我们采用的 Message 采用 JSON 格式编码,格式如下:
{
type: "", // 消息类型
body: {} // 消息体
}
-
type
字段,消息类型。通过该字段,我们知道使用哪个 MessageHandler 消息处理器。关于 MessageHandler ,我们在下面段落详细解析。 -
body
字段,消息体。不同的消息类型,会有不同的消息体。 - Message 采用 JSON 格式编码,主要考虑便捷性,胖友实际项目下,也可以考虑 Protobuf 等更加高效且节省流量的编码格式。
3.5.1 Message
基础消息体,所有消息体都要实现该接口。代码如下:
// Message.java
public interface Message {
}
目前作为一个标记接口,未定义任何操作。
3.5.2 认证相关 Message
创建 [AuthRequest]类,用户认证请求。代码如下:
// AuthRequest.java
public class AuthRequest implements Message {
public static final String TYPE = "AUTH_REQUEST";
/**
* 认证 Token
*/
private String accessToken;
// ... 省略 set/get 方法
}
TYPE 静态属性,消息类型为 AUTH_REQUEST 。
accessToken 属性,认证 Token 。在 WebSocket 协议中,我们也需要认证当前连接,用户身份是什么。一般情况下,我们采用用户调用 HTTP 登录接口,登录成功后返回的访问令牌 accessToken 。
虽然说,WebSocket 协议是基于 Message 模型,进行交互。但是,这并不意味着它的操作,不需要响应结果。例如说,用户认证请求,是需要用户认证响应的。所以,我们创建 [AuthResponse]\类,作为用户认证响应。代码如下:
// AuthResponse.java
public class AuthResponse implements Message {
public static final String TYPE = "AUTH_RESPONSE";
/**
* 响应状态码
*/
private Integer code;
/**
* 响应提示
*/
private String message;
// ... 省略 set/get 方法
}
TYPE 静态属性,消息类型为 AUTH_REQUEST 。实际上,我们在每个 Message 实现类上,都增加了 TYPE 静态属性,作为消息类型。下面,我们就不重复赘述了。
code 属性,响应状态码。
message 属性,响应提示。
在本示例中,用户成功认证之后,会广播用户加入群聊的通知 Message ,使用 [UserJoinNoticeRequest] 。代码如下:
// UserJoinNoticeRequest.java
public class UserJoinNoticeRequest implements Message {
public static final String TYPE = "USER_JOIN_NOTICE_REQUEST";
/**
* 昵称
*/
private String nickname;
// ... 省略 set/get 方法
}
3.5.3 发送消息相关 Message
创建 [SendToOneRequest] 类,发送给指定人的私聊消息的 Message。代码如下:
// SendToOneRequest.java
public class SendToOneRequest implements Message {
public static final String TYPE = "SEND_TO_ONE_REQUEST";
/**
* 发送给的用户
*/
private String toUser;
/**
* 消息编号
*/
private String msgId;
/**
* 内容
*/
private String content;
// ... 省略 set/get 方法
}
创建 [SendToAllRequest]类,发送给所有人的群聊消息的 Message。代码如下:
// SendToAllRequest.java
public class SendToAllRequest implements Message {
public static final String TYPE = "SEND_TO_ALL_REQUEST";
/**
* 消息编号
*/
private String msgId;
/**
* 内容
*/
private String content;
// ... 省略 set/get 方法
}
在服务端接收到发送消息的请求,需要异步响应发送是否成功。所以,创建 [SendResponse]类,发送消息响应结果的 Message 。代码如下:
// SendResponse.java
public class SendResponse implements Message {
public static final String TYPE = "SEND_RESPONSE";
/**
* 消息编号
*/
private String msgId;
/**
* 响应状态码
*/
private Integer code;
/**
* 响应提示
*/
private String message;
// ... 省略 set/get 方法
}
- 重点看
msgId
字段,消息编号。客户端在发送消息,通过使用 UUID 算法,生成全局唯一消息编号。这样,服务端通过 SendResponse 消息响应,通过msgId
做映射。
在服务端接收到发送消息的请求,需要转发消息给对应的人。所以,创建 [SendToUserRequest]类,发送消息给一个用户的 Message 。代码如下:
// SendResponse.java
public class SendToUserRequest implements Message {
public static final String TYPE = "SEND_TO_USER_REQUEST";
/**
* 消息编号
*/
private String msgId;
/**
* 内容
*/
private String content;
// ... 省略 set/get 方法
}
- 相比 SendToOneRequest 来说,少一个 toUser 字段。因为,我们可以通过 WebSocket 连接,已经知道发送给谁了。
3..6 消息处理器
每个客户端发起的 Message 消息类型,我们会声明对应的 MessageHandler 消息处理器。这个就类似在 SpringMVC 中,每个 API 接口对应一个 Controller 的 Method 方法。
3.6.1 MessageHandler
创建 [MessageHandler] 接口,消息处理器接口。代码如下:
// MessageHandler.java
public interface MessageHandler<T extends Message> {
/**
* 执行处理消息
*
* @param session 会话
* @param message 消息
*/
void execute(Session session, T message);
/**
* @return 消息类型,即每个 Message 实现类上的 TYPE 静态字段
*/
String getType();
}
- 定义了泛型 <T> ,需要是 Message 的实现类。
- 定义的两个接口方法。
3.6.2 AuthMessageHandler
创建 [AuthMessageHandler]类,处理 AuthRequest 消息。代码如下:
// AuthMessageHandler.java
@Component
public class AuthMessageHandler implements MessageHandler<AuthRequest> {
@Override
public void execute(Session session, AuthRequest message) {
// 如果未传递 accessToken
if (StringUtils.isEmpty(message.getAccessToken())) {
WebSocketUtil.send(session, AuthResponse.TYPE,
new AuthResponse().setCode(1).setMessage("认证 accessToken 未传入"));
return;
}
// 添加到 WebSocketUtil 中
WebSocketUtil.addSession(session, message.getAccessToken()); // 考虑到代码简化,我们先直接使用 accessToken 作为 User
// 判断是否认证成功。这里,假装直接成功
WebSocketUtil.send(session, AuthResponse.TYPE, new AuthResponse().setCode(0));
// 通知所有人,某个人加入了。这个是可选逻辑,仅仅是为了演示
WebSocketUtil.broadcast(UserJoinNoticeRequest.TYPE,
new UserJoinNoticeRequest().setNickname(message.getAccessToken())); // 考虑到代码简化,我们先直接使用 accessToken 作为 User
}
@Override
public String getType() {
return AuthRequest.TYPE;
}
}
- 关于 WebSocketUtil 类,我们在 [3.7 WebSocketUtil]中详解。
3.6.3 SendToOneRequest
创建 [SendToOneHandler]类,处理 SendToOneRequest 消息。代码如下:
// SendToOneRequest.java
@Component
public class SendToOneHandler implements MessageHandler<SendToOneRequest> {
@Override
public void execute(Session session, SendToOneRequest message) {
// 这里,假装直接成功
SendResponse sendResponse = new SendResponse().setMsgId(message.getMsgId()).setCode(0);
WebSocketUtil.send(session, SendResponse.TYPE, sendResponse);
// 创建转发的消息
SendToUserRequest sendToUserRequest = new SendToUserRequest().setMsgId(message.getMsgId())
.setContent(message.getContent());
// 广播发送
WebSocketUtil.send(message.getToUser(), SendToUserRequest.TYPE, sendToUserRequest);
}
@Override
public String getType() {
return SendToOneRequest.TYPE;
}
}
3.6.4 SendToAllHandler
创建 [SendToAllHandler]类,处理 SendToAllRequest 消息。代码如下:
// SendToAllRequest.java
@Component
public class SendToAllHandler implements MessageHandler<SendToAllRequest> {
@Override
public void execute(Session session, SendToAllRequest message) {
// 这里,假装直接成功
SendResponse sendResponse = new SendResponse().setMsgId(message.getMsgId()).setCode(0);
WebSocketUtil.send(session, SendResponse.TYPE, sendResponse);
// 创建转发的消息
SendToUserRequest sendToUserRequest = new SendToUserRequest().setMsgId(message.getMsgId())
.setContent(message.getContent());
// 广播发送
WebSocketUtil.broadcast(SendToUserRequest.TYPE, sendToUserRequest);
}
@Override
public String getType() {
return SendToAllRequest.TYPE;
}
}
3.7 WebSocketUtil
主要提供两方面的功能:
- Session 会话的管理;
- 多种发送消息的方式。
代码如下:
// WebSocketUtil.java
public class WebSocketUtil {
private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketUtil.class);
// ========== 会话相关 ==========
/**
* Session 与用户的映射
*/
private static final Map<Session, String> SESSION_USER_MAP = new ConcurrentHashMap<>();
/**
* 用户与 Session 的映射
*/
private static final Map<String, Session> USER_SESSION_MAP = new ConcurrentHashMap<>();
/**
* 添加 Session 。在这个方法中,会添加用户和 Session 之间的映射
*
* @param session Session
* @param user 用户
*/
public static void addSession(Session session, String user) {
// 更新 USER_SESSION_MAP
USER_SESSION_MAP.put(user, session);
// 更新 SESSION_USER_MAP
SESSION_USER_MAP.put(session, user);
}
/**
* 移除 Session 。
*
* @param session Session
*/
public static void removeSession(Session session) {
// 从 SESSION_USER_MAP 中移除
String user = SESSION_USER_MAP.remove(session);
// 从 USER_SESSION_MAP 中移除
if (user != null && user.length() > 0) {
USER_SESSION_MAP.remove(user);
}
}
// ========== 消息相关 ==========
/**
* 广播发送消息给所有在线用户
*
* @param type 消息类型
* @param message 消息体
* @param <T> 消息类型
*/
public static <T extends Message> void broadcast(String type, T message) {
// 创建消息
String messageText = buildTextMessage(type, message);
// 遍历 SESSION_USER_MAP ,进行逐个发送
for (Session session : SESSION_USER_MAP.keySet()) {
sendTextMessage(session, messageText);
}
}
/**
* 发送消息给单个用户的 Session
*
* @param session Session
* @param type 消息类型
* @param message 消息体
* @param <T> 消息类型
*/
public static <T extends Message> void send(Session session, String type, T message) {
// 创建消息
String messageText = buildTextMessage(type, message);
// 遍历给单个 Session ,进行逐个发送
sendTextMessage(session, messageText);
}
/**
* 发送消息给指定用户
*
* @param user 指定用户
* @param type 消息类型
* @param message 消息体
* @param <T> 消息类型
* @return 发送是否成功你那个
*/
public static <T extends Message> boolean send(String user, String type, T message) {
// 获得用户对应的 Session
Session session = USER_SESSION_MAP.get(user);
if (session == null) {
LOGGER.error("[send][user({}) 不存在对应的 session]", user);
return false;
}
// 发送消息
send(session, type, message);
return true;
}
/**
* 构建完整的消息
*
* @param type 消息类型
* @param message 消息体
* @param <T> 消息类型
* @return 消息
*/
private static <T extends Message> String buildTextMessage(String type, T message) {
JSONObject messageObject = new JSONObject();
messageObject.put("type", type);
messageObject.put("body", message);
return messageObject.toString();
}
/**
* 真正发送消息
*
* @param session Session
* @param messageText 消息
*/
private static void sendTextMessage(Session session, String messageText) {
if (session == null) {
LOGGER.error("[sendTextMessage][session 为 null]");
return;
}
RemoteEndpoint.Basic basic = session.getBasicRemote();
if (basic == null) {
LOGGER.error("[sendTextMessage][session 的 为 null]");
return;
}
try {
basic.sendText(messageText);
} catch (IOException e) {
LOGGER.error("[sendTextMessage][session({}) 发送消息{}) 发生异常",
session, messageText, e);
}
}
}
3.8 完善 WebsocketServerEndpoint
修改 [WebsocketServerEndpoint]的代码,完善其功能。
3.8.1 初始化 MessageHandler 集合
实现 [InitializingBean]接口,在 #afterPropertiesSet()
方法中,扫描所有 MessageHandler Bean ,添加到 MessageHandler 集合中。代码如下:
// WebsocketServerEndpoint.java
/**
* 消息类型与 MessageHandler 的映射
*
* 注意,这里设置成静态变量。虽然说 WebsocketServerEndpoint 是单例,但是 Spring Boot 还是会为每个 WebSocket 创建一个 WebsocketServerEndpoint Bean 。
*/
private static final Map<String, MessageHandler> HANDLERS = new HashMap<>();
@Autowired
private ApplicationContext applicationContext;
@Override
public void afterPropertiesSet() throws Exception {
// 通过 ApplicationContext 获得所有 MessageHandler Bean
applicationContext.getBeansOfType(MessageHandler.class).values() // 获得所有 MessageHandler Bean
.forEach(messageHandler -> HANDLERS.put(messageHandler.getType(), messageHandler)); // 添加到 handlers 中
logger.info("[afterPropertiesSet][消息处理器数量:{}]", HANDLERS.size());
}
通过这样的方式,可以避免手动配置 MessageHandler 与消息类型的映射。
3.8.2 onOpen
重新实现#onOpen(Session session, EndpointConfig config)
方法,实现连接时,使用accessToken
参数进行用户认证。代码如下:
// WebsocketServerEndpoint.java
@OnOpen
public void onOpen(Session session, EndpointConfig config) {
logger.info("[onOpen][session({}) 接入]", session);
// <1> 解析 accessToken
List<String> accessTokenValues = session.getRequestParameterMap().get("accessToken");
String accessToken = !CollectionUtils.isEmpty(accessTokenValues) ? accessTokenValues.get(0) : null;
// <2> 创建 AuthRequest 消息类型
AuthRequest authRequest = new AuthRequest().setAccessToken(accessToken);
// <3> 获得消息处理器
MessageHandler<AuthRequest> messageHandler = HANDLERS.get(AuthRequest.TYPE);
if (messageHandler == null) {
logger.error("[onOpen][认证消息类型,不存在消息处理器]");
return;
}
messageHandler.execute(session, authRequest);
}
- <1> 处,解析 ws:// 地址上的 accessToken 的请求参。例如说:ws://127.0.0.1:8080?accessToken=guo 。
- <2> 处,创建 AuthRequest 消息类型,并设置 accessToken 属性。
- <3> 处,获得 AuthRequest 消息类型对应的 MessageHandler 消息处理器,然后调用 MessageHandler#execute(session, message) 方法,执行处理用户认证请求。
打开三个浏览器创建,分别设置服务地址如下:
ws://127.0.0.1:8080/?accessToken=guo
ws://127.0.0.1:8080/?accessToken=xiu
ws://127.0.0.1:8080/?accessToken=zhi
然后,逐个点击「开启连接」按钮,进行 WebSocket 连接。最终效果如下图:
可以看到
AuthResponse
的消息和UserJoinNoticeRequest
的消息:3.8.3 onMessage
重新实现 #onMessage(Session session, String message) 方法,实现不同的消息,转发给不同的 MessageHandler 消息处理器。代码如下:
// WebsocketServerEndpoint.java
@OnMessage
public void onMessage(Session session, String message) {
logger.info("[onOpen][session({}) 接收到一条消息({})]", session, message); // 生产环境下,请设置成 debug 级别
try {
// <1> 获得消息类型
JSONObject jsonMessage = JSON.parseObject(message);
String messageType = jsonMessage.getString("type");
// <2> 获得消息处理器
MessageHandler messageHandler = HANDLERS.get(messageType);
if (messageHandler == null) {
logger.error("[onMessage][消息类型({}) 不存在消息处理器]", messageType);
return;
}
// <3> 解析消息
Class<? extends Message> messageClass = this.getMessageClass(messageHandler);
// <4> 处理消息
Message messageObj = JSON.parseObject(jsonMessage.getString("body"), messageClass);
messageHandler.execute(session, messageObj);
} catch (Throwable throwable) {
logger.info("[onMessage][session({}) message({}) 发生异常]", session, throwable);
}
}
- <1> 处,获得消息类型,从 "type" 字段中。
- <2> 处,获得消息类型对应的 MessageHandler 消息处理器。
- <3> 处,调用 #getMessageClass(MessageHandler handler) 方法,通过 MessageHandler 中,通过解析其类上的泛型,获得消息类型对应的 Class 类。代码如下:
// WebsocketServerEndpoint.java
private Class<? extends Message> getMessageClass(MessageHandler handler) {
// 获得 Bean 对应的 Class 类名。因为有可能被 AOP 代理过。
Class<?> targetClass = AopProxyUtils.ultimateTargetClass(handler);
// 获得接口的 Type 数组
Type[] interfaces = targetClass.getGenericInterfaces();
Class<?> superclass = targetClass.getSuperclass();
while ((Objects.isNull(interfaces) || 0 == interfaces.length) && Objects.nonNull(superclass)) { // 此处,是以父类的接口为准
interfaces = superclass.getGenericInterfaces();
superclass = targetClass.getSuperclass();
}
if (Objects.nonNull(interfaces)) {
// 遍历 interfaces 数组
for (Type type : interfaces) {
// 要求 type 是泛型参数
if (type instanceof ParameterizedType) {
ParameterizedType parameterizedType = (ParameterizedType) type;
// 要求是 MessageHandler 接口
if (Objects.equals(parameterizedType.getRawType(), MessageHandler.class)) {
Type[] actualTypeArguments = parameterizedType.getActualTypeArguments();
// 取首个元素
if (Objects.nonNull(actualTypeArguments) && actualTypeArguments.length > 0) {
return (Class<Message>) actualTypeArguments[0];
} else {
throw new IllegalStateException(String.format("类型(%s) 获得不到消息类型", handler));
}
}
}
}
}
throw new IllegalStateException(String.format("类型(%s) 获得不到消息类型", handler));
}
- <4> 处,调用 MessageHandler#execute(session, message) 方法,执行处理请求。
另外,这里增加了 try-catch 代码,避免整个执行的过程中,发生异常。如果在 onMessage 事件的处理中,发生异常,该消息对应的 Session 会话会被自动关闭。显然,这个不符合我们的要求。例如说,在 MessageHandler 处理消息的过程中,发生一些异常是无法避免的。
继续基于上述创建的三个浏览器,我们先点击「清空消息」按钮,清空下消息,打扫下上次测试展示出来的接收得到的 Message 。当然,WebSocket 的连接,不需要去断开。
在第一个浏览器中,分别发送两种聊天消息:
- 一条
SendToOneRequest
私聊消息:
{
type: "SEND_TO_ONE_REQUEST",
body: {
toUser: "xiu",
msgId: "eaafew3c-35dd-46ee-b548-f9crdk6396fe",
content: "一条单聊消息"
}
}
- 一条
SendToAllHandler
群聊消息:
{
type: "SEND_TO_ALL_REQUEST",
body: {
msgId: "838e97e1-6ae9-40f9-99c3-f7127ed68888",
content: "一条群聊消息"
}
}
3.8.4 .onClose
// WebsocketServerEndpoint.java
@OnClose
public void onClose(Session session, CloseReason closeReason) {
logger.info("[onClose][session({}) 连接关闭。关闭原因是({})}]", session, closeReason);
WebSocketUtil.removeSession(session);
}
3.8.5 onError
// WebsocketServerEndpoint.java
@OnError
public void onError(Session session, Throwable throwable) {
logger.info("[onClose][session({}) 发生异常]", session, throwable);
}
底线
本文源代码使用 Apache License 2.0开源许可协议,可从Gitee代码地址通过git clone
命令下载到本地或者通过浏览器方式查看源代码。
其他优秀文章:https://blog.csdn.net/moshowgame/article/details/80275084、https://www.jianshu.com/p/1501f1350c99