spring+stomp+webSocket+SockJS 实现简单的订阅广播消息和订阅个人消息

接触webSocket是因为做的项目里有一个图像报表类的监控页面,数据在页面加载后通过ajax后台获取后展示在页面,因为监控数据实时在变化,需要体现数据的时效性,所以采用webSocket实现客户端和服务端的长连接通信,避免客户端重复发起请求获取数据。实现思路是通过定时任务定期的获取最新数据,然后通过webSocket由服务端广播出去,客户端监听这个广播消息并把最新数据展示出来。不多废话。

  1. 首先这是一个ssm项目。加入spring的webSocket包,必须是4.0以上的包,这里用了4.1.1:
    spring-messaging-4.1.1.RELEASE.jar
    spring-websocket-4.1.1.RELEASE.jar
  2. web.xml 里的filter 和servlet 都需要加上<async-supported>true</async-supported>配置。
  3. 配置webSocket配置文件,此配置文件在springmvc的配置文件里引入。
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:mvc="http://www.springframework.org/schema/mvc"
    xmlns:aop="http://www.springframework.org/schema/aop"
    xmlns:tx="http://www.springframework.org/schema/tx"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:websocket="http://www.springframework.org/schema/websocket"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
    http://www.springframework.org/schema/context
    http://www.springframework.org/schema/context/spring-context-4.0.xsd
    http://www.springframework.org/schema/mvc
    http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd
    http://www.springframework.org/schema/aop 
    http://www.springframework.org/schema/aop/spring-aop-4.0.xsd
    http://www.springframework.org/schema/tx 
    http://www.springframework.org/schema/tx/spring-tx-4.0.xsd
    http://www.springframework.org/schema/websocket  
    http://www.springframework.org/schema/websocket/spring-websocket-4.0.xsd">
    <bean class="org.springframework.web.socket.server.standard.ServletServerContainerFactoryBean">
        <property name="maxTextMessageBufferSize" value="8192"/>
        <property name="maxBinaryMessageBufferSize" value="8192"/>
        <property name="maxSessionIdleTimeout" value="900000"/>
        <property name="asyncSendTimeout" value="5000"/>
    </bean>

    <!-- 实时消息推送,在spring容器初始化此bean的时候就启动了监听线程, -->
    <bean id="webSocketMessageUtil" class="com.lancy.webSocket.common.WebSocketMessageUtil"/>
</beans>

用注解的方式定义的webSocket的配置类

package com.lancy.webSocket.action;

import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.stereotype.Controller;
import org.springframework.web.socket.config.annotation.AbstractWebSocketMessageBrokerConfigurer;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;

import com.lancy.webSocket.handler.StompMessageHandshakeHandler;
import com.lancy.webSocket.handler.WebSocketHandshakeInterceptor;


@EnableWebSocketMessageBroker
@Controller
public class WebSocketMessageAction extends AbstractWebSocketMessageBrokerConfigurer{

    /**
     * 将"/webSocket"路径注册为STOMP端点,这个路径与发送和接收消息的目的路径有所不同,
     * 这是一个端点,客户端在订阅或发布消息到目的地址前,要连接该端点, 
     * 即用户发送请求url="/applicationName/webSocket"与STOMP server进行连接。之后再转发到订阅url;
     */
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        //注册websocket,客户端用ws://host:port/项目名/webSocket 访问
        registry.addEndpoint("/webSocket")
                .setHandshakeHandler(new StompMessageHandshakeHandler())
                .addInterceptors(new WebSocketHandshakeInterceptor())
                .withSockJS();//表示支持以SockJS方式连接服务器  
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.enableSimpleBroker("/topic","/user");//这句话表示在topic和user这两个域上服务端可以向客户端发消息
        registry.setApplicationDestinationPrefixes("/ws");//这句话表示客户端向服务器端发送时的主题上面需要加"/ws"作为前缀
        registry.setUserDestinationPrefix("/user");//这句话表示服务端给客户端指定用户发送一对一的主题,前缀是"/user"
    }
}

其中StompMessageHandshakeHandler类代码如下

package com.lancy.webSocket.handler;

import java.security.Principal;
import java.util.Map;

import org.springframework.http.server.ServerHttpRequest;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.support.DefaultHandshakeHandler;

/**
 * 获取客户端连接前对客户端的session、cookie等信息进行握手处理, 也就是可以在这里可以进行一些用户认证?
 * 这是我个人的理解。这里没有做任何处理
 *
 */
public class StompMessageHandshakeHandler extends DefaultHandshakeHandler{

    @Override
    protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler,
            Map<String, Object> attributes) {
        return super.determineUser(request, wsHandler, attributes);
    }
}

WebSocketHandshakeInterceptor类:

package com.lancy.webSocket.handler;

import java.util.Map;

import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor;

/**
 * 握手前后的拦截,这里没有处理,默认
 *
 */
public class WebSocketHandshakeInterceptor extends HttpSessionHandshakeInterceptor{

    @Override
    public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,
            Exception ex) {
        super.afterHandshake(request, response, wsHandler, ex);
    }

    @Override
    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,
            Map<String, Object> attributes) throws Exception {
        return super.beforeHandshake(request, response, wsHandler, attributes);
    }
}

WebSocketHandshakeHandler类:

package com.lancy.webSocket.handler;

import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.WebSocketMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;

/**
 * WebSocket 握手信息
 * @ClassName: WebSocketHandshakeHandler.java
 * @Description: WebSocket 握手信息
 */
public class WebSocketHandshakeHandler extends TextWebSocketHandler {  

    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
        super.afterConnectionClosed(session, status);
    }

    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        super.afterConnectionEstablished(session);
    }

    @Override
    public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
        super.handleMessage(session, message);
    }
}  

自定义一个消息实体类,方便封装消息

package com.lancy.webSocket.common;

/**
 * 自定义封装包含发送信息的实体类
 *
 */
public class WebSocketMessage {

    /**
     * 发送广播消息,发送地址:/topic/* ,*为任意名字,如取名monitor,则客户端对应订阅地址为:/topic/monitor
     * 发送私人消息,发送地址:/*,*为任意名字,这里取名为message,客户端对应订阅地址:/user/{自定义客户端标识ID}/message
     */
    //可以自定义其他的属性
    private String distination;
    private Object data;//实际发送的数据对象
    private String userId;//如果不为空,则表示发送给个人而不是广播

    public String getDistination() {
        return distination;
    }
    public void setDistination(String distination) {
        this.distination = distination;
    }
    public Object getData() {
        return data;
    }
    public void setData(Object data) {
        this.data = data;
    }
    public String getUserId() {
        return userId;
    }
    public void setUserId(String userId) {
        this.userId = userId;
    }
}

webSocket消息发送的工具类

package com.lancy.webSocket.common;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;

import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.simp.SimpMessagingTemplate;

import com.lancy.myBatis.common.StringUtils;

/**
 * 这里启动一个线程不停的监控队列是否有消息需要进行发送,如果有就发送出去。
 *
 */
public class WebSocketMessageUtil implements Runnable{

    static Logger log = Logger.getLogger(WebSocketMessageUtil.class); 

    private static SimpMessagingTemplate messageingTemplate;
    private static BlockingQueue<WebSocketMessage> wsQueue = new LinkedBlockingDeque<>();

    public WebSocketMessageUtil() {
        new Thread(this).start();
    }

    @Autowired
    public void setTemplate(SimpMessagingTemplate t){
        WebSocketMessageUtil.messageingTemplate = t;
    }

    public static void addMessage(WebSocketMessage msg){
        try{
            wsQueue.put(msg);
        }catch(InterruptedException e){
            log.error("添加实时消息异常");
        }
    }

    public static void sendMessage(WebSocketMessage msg){
        if(StringUtils.isBlank(msg.getUserId())){
            messageingTemplate.convertAndSend(msg.getDistination(),msg);
        }else{
            messageingTemplate.convertAndSendToUser(msg.getUserId(), msg.getDistination(), msg);
        }
    }

    @Override
    public void run() {
        log.info(">>>>>>>推送消息线程启动,正在监听消息。");
        while (true) {
            try {
                WebSocketMessage msg = wsQueue.take();
                if(msg!=null){
                    WebSocketMessageUtil.sendMessage(msg);
                }
            } catch (Exception ex) {}
        }
    }
}

然后配置spring定时任务,在定时执行的方法里进行消息的添加,参考代码

public void testWebSocket() {
        WebSocketMessage msg = new WebSocketMessage();
        msg.setDistination("/topic/lancy/testWebSocket/new");
        msg.setData("test....");
        WebSocketMessageUtil.addMessage(msg);
}

前台界面
首先引入两个js文件 sockjs.min.js 和 stomp.min.js

<script type="text/javascript" src="${ctx}/js/socket/sockjs.min.js"></script>
<script type="text/javascript" src="${ctx}/js/socket/stomp.min.js"></script>

具体页面如下

<%@ page language="java" contentType="text/html; charset=UTF-8"
    pageEncoding="UTF-8"%>
<%@ taglib prefix="c" uri="http://java.sun.com/jsp/jstl/core" %>
<c:set var="ctx" value="${pageContext.request.contextPath}" />
<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
<title>Insert title here</title>

<link rel="stylesheet" type="text/css" href="${ctx}/css/default.css" />
<link rel="stylesheet" type="text/css" href="${ctx}/js/easyui/themes/default/easyui.css">
<link rel="stylesheet" type="text/css" href="${ctx}/js/easyui/themes/icon.css">
<link rel="stylesheet" type="text/css" href="${ctx}/js/easyui/demo/demo.css">
<script type="text/javascript" src="${ctx}/js/jquery.min.js"></script>
<script type="text/javascript" src="${ctx}/js/socket/sockjs.min.js"></script>
<script type="text/javascript" src="${ctx}/js/socket/stomp.min.js"></script>
<script type="text/javascript" src="${ctx}/js/app_common.js"></script>
<script type="text/javascript" src="${ctx}/js/easyui/jquery.easyui.min.js"></script>
<script type="text/javascript"> 

    var stompClient = null;  

    function setConnected(connected) {  
        document.getElementById('connect').disabled = connected;  
        document.getElementById('disconnect').disabled = !connected;  
        document.getElementById('conversationDiv').style.visibility = connected ? 'visible' : 'hidden';  
        document.getElementById('response').innerHTML = '';  
    }  

    function connect() {  
        var socket = new SockJS('/myBookstore/webSocket');//连接服务端的端点,连接以后才可以订阅广播消息和个人消息 
        stompClient = Stomp.over(socket);              
        stompClient.connect({}, function(frame) {  
            setConnected(true);  
            console.log('Connected: ' + frame); 
            //订阅广播消息
            stompClient.subscribe('/topic/lancy/testWebSocket/new', function(greeting){  
                showGreeting(greeting.body);  
            });

            //订阅个人信息
            stompClient.subscribe('/user/1/testUser', function(greeting){  
                showGreeting(greeting.body);  
            }); 
        });  
    }  

    function disconnect() {  
        if (stompClient != null) {  
            stompClient.disconnect();  
            setConnected(false);  
            console.log("Disconnected");  
        }  
    }  

    //发送到服务的消息
    function sendName() {  
        var name = document.getElementById('name').value;
        stompClient.send("/ws/webSocket/testWithServer", {'name': 'xiao','syn':'wang'}, JSON.stringify({'message': name }));  
    }  

    function showGreeting(message) {  
        var response = document.getElementById('response');  
        var p = document.createElement('p');  
        p.style.wordWrap = 'break-word';  
        p.appendChild(document.createTextNode(message));  
        response.appendChild(p);  
    }

</script>
</head>
<body class="easyui-layout">
    <div>  
        <div>  
            <button id="connect" onclick="connect();">Connect</button>  
            <button id="disconnect" disabled="disabled" onclick="disconnect();">Disconnect</button>  
        </div>  
        <div id="conversationDiv">  
            <label>What is your name?</label><input type="text" id="name" />  
            <button id="sendName" onclick="sendName();">Send all</button>  
            <p id="response"></p>  
        </div>  
    </div> 


</body>
</html>

js方法里的sendName 发送到服务端的消息对应的类:

package com.lancy.webSocket.action;

import java.util.Map;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Controller;

@Controller
public class WebSocketTestAction {

    @Autowired
    private SimpMessagingTemplate simpMessagingTemplate;

    @MessageMapping("/webSocket/testWithServer")
    //与页面的stompClient.send("/ws/webSocket/testWithServer", 
    //{'name': 'xiao','syn':'wang'}, JSON.stringify({'message': name }));方法对应
    public String send(String message,@Header("name")String name,
            @Headers Map<String, Object> headers){
        System.out.println(message);
        System.out.println(name);
        System.out.println(headers);
        simpMessagingTemplate.convertAndSend("/user/1/testUser","服务端返回消息");
        return "";
    }
}

欢迎工作一到五年的Java工程师朋友们加入Java高并发QQ群:219571750,群内提供免费的Java架构学习资料(里面有高可用、高并发、高性能及分布式、Jvm性能调优、Spring源码,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多个知识点的架构资料)合理利用自己每一分每一秒的时间来学习提升自己,不要再用"没有时间“来掩饰自己思想上的懒惰!趁年轻,使劲拼,给未来的自己一个交代!

41bcdf6703584fc58d197b1b04af4990.jpg
616236adc0134e2daf4b2d4c60c30360.jpg
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,530评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 86,403评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,120评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,770评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,758评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,649评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,021评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,675评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,931评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,659评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,751评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,410评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,004评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,969评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,042评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,493评论 2 343

推荐阅读更多精彩内容