如何从零搭建一个消息通讯模块,只需三招就会可以了,小明哥这几天把牛刀平台的消息通讯模块加上来了,这里给大家介绍一种简单的做法,在并发量不大的情况下,无需第三方MQ中间件,这种方式方便部署,方便学习,而且更有一点是能很好的理解即时通讯的模块的设计、开发思路,这是一个典型的学习案例,他能加强你对websocket的理解。
话不多说,下面我简单从技术选型,设计思路讲讲。(功能效果大家可以找几个小伙伴一起相互发送消息体验一下。)
1、首先,技术选型
为什么说技术选型?因为即时通讯和普通的网页request—>response请求响应是不一样的。
request—>response是 基于HTTP协议的请求响应模式,只能由客户端发出请求。
今天我们要说的 即时通讯 是基于TCP协议,不知道TCP协议的可以自己百度普及一下。我这里简单的说一下:
TCP协议的请求方式是: 由客户端发起请求,服务端响应后建立双向的连接,当建立连接成功之后,通信协议升级为TCP协议,此时两边都可以发送请求,说简单点就是两边都可以发消息。
酷不酷?
当多人在线时,某人给某人发送消息,或者平台发一份通知,相关会员立马就能收的到。这就是实时响应,而不是自己动手刷新页面才有数据。
所以,毫无疑问,我们的目标是找一款基于TCP协议的技术。而WebSocket就是 TCP 长连接应用的一种用,好了,终于定准了websocket 这个东东 。
2、基于websocket选一款合适的高级协议
为什么还选一个高级协议呢?因为websocket只是一种基础技术,这就好比我们搭建房子,不可能石砖我们都自己做对吧?所以,我们发现直接用这种基础的东西做很费劲。我们需要选一款合适的,能快速开发的才行。而这类基于websocket技术再封装一次,我们也可以叫他高级协议吧。继续找我们会发现很多款式的,花样百出。
比如:AMQP、MQTT、STOMP、RabbitMQ、Kafka等等,一大批优秀的产品,你会发现选哪个都无从下手。对这些感兴趣的小伙伴可以一个一个的搜,了解一下。
在众多的产品中我们一个一个的淘汰,最终选择STOMP,理由就两个字 ”简单“! ,它实现代码很容易,只做消息代理,没有其他杂七杂八的东西。可以粗略的理解成如下图:
3、STOMP可以做什么?
不想让大家理解得这么费劲,我用大白话说吧,它能做如下事情:
1、我们给STOMP消息代理发送一条信息,希望它能帮我们发给所有人,这叫:广播通信
2、我们给STOMP消息代理发送一条信息,希望它能发给指定的某个人,这叫:一对一通信
3、像我们用request请求一样,它也支持从客户端发送数据到服务端。
下面我们用一张图理解:
大家看上面的图,可以清晰的知道发送数据时,指定不同的域,消息自动到不同的通道,这是STOMP内部给我们做好的,因此开发非常方便。
4、在Spring项目中使用
4.1 pom文件中引入
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
4.2 配置类
package com.niudao.app.configuration;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.web.socket.config.annotation.AbstractWebSocketMessageBrokerConfigurer;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
/**
* 配置WebSocket
* 作者:小明哥
*/
@Configuration
//注解开启使用STOMP协议来传输基于代理(message broker)的消息,这时控制器支持使用@MessageMapping,就像使用@RequestMapping一样
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer{
//设置心跳2秒
private static final long HEART_BEAT = 2000;
/**
* 注册STOMP协议的节点(endpoint),并映射指定的url
* @param registry
*/
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
//注册一个STOMP的endpoint,并指定使用SockJS协议
registry.addEndpoint("/web/endpoint").setAllowedOrigins("*").withSockJS();
}
/**
* 配置消息代理(Message Broker)
* 默认前缀:
* /user 以/user开头的消息会将消息从路由到某个用户独有的目的地上
* /topic 以/topic 或 /queue 开头的消息都会发送到STOMP代理中,根据你所选择的STOMP代理不同,目的地的可选前缀也会有所限制;
* /queue 同上
* /app 以 /app 开头的消息都会被路由到带有@MessageMapping 或 @SubscribeMapping 注解的方法中;
* 以下的enableSimpleBroker、setApplicationDestinationPrefixes是可以不用配置的,使用默认方式即可。
* @param registry
*/
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
ThreadPoolTaskScheduler te = new ThreadPoolTaskScheduler();
te.setPoolSize(1);
te.setThreadNamePrefix("wss-heartbeat-thread-");
te.initialize();
registry.enableSimpleBroker("/user","/topic")
.setHeartbeatValue(new long[]{HEART_BEAT,HEART_BEAT})
.setTaskScheduler(te);
//全局使用的消息前缀(客户端订阅路径上会体现出来)
registry.setApplicationDestinationPrefixes("/app");
}
}
4.3 服务端发送数据
@Autowired
SimpMessagingTemplate simpMessagingTemplate;
/**
* 广播消息
* @param destination 目的地
* @param message 消息
*/
public void sendMessageToAll(String destination, Object message){
//把数据推送给消息代理,发送到以"/topic"开头目的地
this.simpMessagingTemplate.convertAndSend("/topic"+destination, message);
}
/**
* 发消息给某用户
* @param toUserId 指定用户
* @param destination 目的地
* @param message 消息
*/
public void sendMessageToUser(String toUserId, String destination, Object message){
//把数据推送给消息代理,convertAndSendToUser目的地已自动补齐 "/user"
this.simpMessagingTemplate.convertAndSendToUser(toUserId, destination, message);
}
4.4 某个生产者(发消息者)
解释一下什么叫生产者,就是某个业务需要发一笔消息给消息代理。大白话是谁发起消息,那么这里我们假设牛刀平台的一个业务把。比如:某用户发了一个问题,广播给所有人,有新问题了,大家来帮忙回答吧。
这段消息要在哪里生产呢?我们可以在用户点击“发布问题”的逻辑中产生。如下代码:
//广播消息,只需要设置生产者
Message message = new Message();
//生产者
message.setFromUserId(question.getUserId());
//设置messageContent
message.setMessageContent(....);
//其他代码。。。。。
//这里把消息发给消息代理,完事。
this.messageWebSocketService.sendMessageToAll("/topic/ABC", message);
4.5 消费者(前端接收)
后台悄悄的把数据发给了消息代理,前端如何接收呢?这是大家就不要想着ajax了,它有自己的一套。步骤是:
1)一如相关包(vue中npm这两个即可:sockjs-client、stompjs这两个)
2)前端代码:
//注意这里的”web/endpoint“对应java中配置文件的”web/endpoint“
let socket = new SockJS("http://localhost:8080/api/web/endpoint");
this.stompClient = Stomp.over(socket);
this.stompClient.connect(headers,frame=> {
//"/topic/ABC"对应生产者发送的地址
this.subscribe("/topic/ABC",response=>{
//以下是我们的业务代码,例如可以把数据设置到vuex状态管理中。
this.vm.$store.commit("storeMutations", JSON.parse(response.body));
});
})
好了,这是最基本的步骤。广播模式和一对一的模式发送、接收都是类似(注意是类似)。
5.牛刀平台的消息通讯设计思路
想做好一个完整的即时通讯模块可没那么容易,需要充分的思考数据的存储,在线用户数据的推送,离线用户的数据保留,新数据、未查看数据的设计。以及一对一聊天时,数据的即时呈现。这需要非常深度的思考才能做好。我再这里先把基本的思路写了吧,今后有机会在录制视频讲一讲大家就明白了。
5.1 数据库设计
数据库核心是消息存储表,其中至少要有:生产者、消费者、消息、查看状态。
再一个,如果用户相互聊天,需要保留聊天好友记录。第三,允许用户自己设置订阅,比如有些内容平台发了,但是我不喜欢这里消息,这需要允许用户设置的,也就是消息主题的定制。所以也需要设置相关的数据库表进行存储、设置。下面是四张表:
5.2 功能设计思路
以下是在本子上画的,这是初稿,整个开发过程基本围绕这个思路,变化不大。
简单解释一下:
1)离线数据未读消息:数据从vuex状态来,在页面被加载时主动去得到(也就是当你不在的时候,消息没法发给你,给你放数据库了,但是状态是未读的);
2)在线未读消息:只要后台有数据,后台把数据发送给消息代理,同时保留一份到数据库。前端自动就和消息代理获得联系,把数据实时的呈现出来(如:3个未读消息,或者一个未读列表);
3)在线实时获取数据:一对一聊天:两人同时在线时,聊天时数据实时呈现,同样注意离线数据的存储。
6.功能效果:
详细的代码请加入 niudao.cn ,找到小明哥: http://niudao.cn/space/f0c24d6f-293b-4df6-987b-42e009e46bfb 。给小明发消息。