Spring Websocket+Stomp 防踩坑实战

背景

近期项目中需要运用到及时消息通知,就采用了websocket+Stomp方式来实现,项目框架是微服务架构,需要考虑网关(SpringCloud Gateway)的转发,多服务节点的消息通知。本文章主要讲解websocket在微服务架构下的运用。

文章目录

接下来从下面5个方面进行讲解。
1.环境
2.后端接入websocket
3.客服端接入注意点
4.网关如何进行转发
5.采用Redis的订阅发布

1. 环境

SpringCloud 相关
SpringCloud Gateway
SpringBoot
Redis

2. 后端接入websocket

后端主要采用Websocket+Stomp的方式。

2.1 导入依赖包

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
            <version>2.2.10.RELEASE</version>
        </dependency>

2.2 新增websocket配置类

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.ChannelRegistration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;

/**
 * 功能描述:
 * websocket配置类
 * @Author: nickel
 * @Date: 2021/4/2 14:30
 */
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/websocket")  //开启websocket端点
                .setAllowedOrigins("*")                //允许跨域访问
                .withSockJS();                          //设置sockJs
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry)
    {
        //表明在topic、queue、users这三个域上可以向客户端发消息。
        registry.enableSimpleBroker("/topic","/queue","/user");
        //客户端向服务端发起请求时,需要以/app为前缀。
        registry.setApplicationDestinationPrefixes("/app");
        //给指定用户发送一对一的消息前缀是/user/。
        registry.setUserDestinationPrefix("/user/");
    }

    /**
     * 定义用户入端通道拦截器
     * @param registration
     */
    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {
        registration.interceptors(createUserInterceptor());
    }

    /**
     * 将自定义的客户端渠道拦截器加入IOC容器中
     * @return
     */
    @Bean
    public UserChannelInterceptor createUserInterceptor(){
        return new UserChannelInterceptor();
    }
}

2.3 自定义渠道拦截器

配置类如上,项目中需要对websocket用户进行校验,校验token的有效性,就需要自定义渠道连接器,需要实现接口类ChannelInterceptor,这里为什么要用渠道拦截,是因为比较好获取参数,进行校验。如下:

/**
 * 功能描述:
 * websocket用户相关渠道拦截
 * @Author: nickel
 * @Date: 2021/4/2 14:30
 */
@Slf4j
public class UserChannelInterceptor implements ChannelInterceptor {
    @Autowired
    private SimpUserRegistry simpUserRegistry;

    @Override
    public Message<?> preSend(Message<?> message, MessageChannel channel) {
        StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
        if (StompCommand.CONNECT.equals(accessor.getCommand())){
            String username=null;
            try {
                String token = accessor.getNativeHeader("Authorization").get(0);
                //校验token
                JwtBean jwtBean = ApplicationContextUtils.getBean(JwtBean.class);
                username = jwtBean.getUsername(token);
            } catch (Exception e) {
                e.printStackTrace();
                log.error("token is error");
                throw new IllegalStateException("The token is illegal");
            }
            if(StringUtils.isEmpty(username)){
                log.error("token is overtime");
                throw new IllegalStateException("The token is illegal");
            }
            accessor.setUser(new MyPrincipal(username));
            log.info("【{}】用户上线了",username);
        }else if(StompCommand.DISCONNECT.equals(accessor.getCommand())){
            log.info("【{}】用户下线了",accessor.getUser().getName());
        }
        return message;
    }


}

注:这里用SimpUserRegistry 可以获取到当前节点在线人数;

2.4 自定义用户

从上面的拦截器中可以看到用到了自定义用户MyPrincipal,需要实现Principal;如下:

import java.security.Principal;

/**
 * 功能描述:
 * websocket-自定义用户
 * @Author: nickel
 * @Date: 2021/4/2 14:37
 */
public class MyPrincipal implements Principal {
    private String name;
    public MyPrincipal(String name){
        this.name=name;
    }

    @Override
    public String getName() {
        return name;
    }
}

自定义用户方便在发送消息时,通过用户名来找到当前节点的在线用户数据;

2.5 如何运用websocket进行消息发送

上面相关配置都写好后,咱们看看具体发送如何实现,新增WebSocketService类如下:

/**
 * 功能描述:
 * websocket发送信息入口
 * @Author: nickel
 * @Date: 2021/4/2 14:37
 */
@Service
@Slf4j
public class WebSocketService {

    private static final String HANDLER_NAME = "socketHandler";

    @Resource
    private MyRedisClient myRedisClient;
    @Autowired
    private SimpMessageSendingOperations simpMessageSendingOperations;

    @Autowired
    private SimpUserRegistry simpUserRegistry;

    /**
     * 服务端推送消息--一对一
     * 单体服务
     * 客服端 订阅地址为/users/{username}/message
     *
     * @param username
     * @param message
     */
    public void pushMessage(String username, String message,String id) {
        try {
            //根据用户名查询当前节点在线用户
            SimpUser simpUser = simpUserRegistry.getUser(username);
            if (null == simpUser) {
                return;
            }
            log.info("--服务端指定用户发送消息,to【{}】", simpUser.getName());
            String nowTime = GaeaUtils.formatDate(new Date(), GaeaConstant.TIME_PATTERN);
            GaeaWsMessage gaeaWsMessage = new GaeaWsMessage(id,message, nowTime);
            simpMessageSendingOperations.convertAndSendToUser(username, "/message", JSON.toJSONString(gaeaWsMessage));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 服务器端推送消息--广播
     * 客服端 订阅地址为/topic/message
     * 单体服务
     */
    public void pushMessage(String message,String id) {
        try {
            String nowTime = GaeaUtils.formatDate(new Date(), GaeaConstant.TIME_PATTERN);
            GaeaWsMessage gaeaWsMessage = new GaeaWsMessage(id,message, nowTime);
            simpMessageSendingOperations.convertAndSend("/topic/message", JSON.toJSONString(gaeaWsMessage));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 后台发送消息到redis
     * 支持微服务
     * @param commonMsgDto
     */
    public void sendMessage(CommonMsgDto commonMsgDto) {
        log.info("【websocket消息】广播消息:" + JSON.toJSONString(commonMsgDto));
        Map<String, String> msgMap = new HashMap<>();
        msgMap.put("message", commonMsgDto.getMessage());
        msgMap.put("id", commonMsgDto.getId());
        myRedisClient.sendMessage(HANDLER_NAME, msgMap);
    }

    /**
     * 此为单点消息--发送到redis
     *
     * @param userMsgDto
     */
    public void sendMessage(UserMsgDto userMsgDto) {
        Map<String, String> msgMap = new HashMap<>();
        msgMap.put("username", userMsgDto.getUsername());
        msgMap.put("message", userMsgDto.getMessage());
        msgMap.put("id", userMsgDto.getId());
        myRedisClient.sendMessage(HANDLER_NAME, msgMap);
    }

    /**
     * 此为单点消息(多人)
     * 支持微服务
     * @param userMsgDtos
     */
    public void sendMessage(List<UserMsgDto> userMsgDtos) {
        if(CollectionUtils.isEmpty(userMsgDtos)){
            return;
        }
        for (UserMsgDto userMsgDto : userMsgDtos) {
            sendMessage(userMsgDto);
        }
    }

}

从上面的代码可以看出
项目中任何地方都可以注入SimpMessageSendingOperations ,发送消息很方便;

  • 一对一发送
    使用SimpMessageSendingOperations.convertAndSendToUser()方法

  • 群发
    使用SimpMessageSendingOperations.convertAndSend()方法;

  • 支持微服务
    调用了MyRedisClient中方法发布消息,后面会有详细说明;

3.客服端如何接入Websocket

客服端需要加入如下js:

<script src="/webjars/sockjs-client/sockjs.min.js"></script>
<script src="/webjars/stomp-websocket/stomp.min.js"></script>

客服端连接重要代码如下,以官网提供的例子为主:

function connect() {
    //websocket端点地址,如是网关,写成对应网关地址
    var socket = new SockJS('http://ip:端口/websocket');
    stompClient = Stomp.over(socket);
    stompClient.connect({
        //传输token参数 进行用户鉴权
        Authorization:$("#token").val()
    }, function (frame) {
        setConnected(true);
        console.log('Connected: ' + frame);
        //订阅公共频道
        stompClient.subscribe('/topic/message', function (greeting) {
            showGreeting(greeting.body);
        })
        //订阅个人频道
        stompClient.subscribe('/user/" + $("#userName").val() +"/message', function (greeting) {
            showGreeting(greeting.body);
        })

    });
}

3.1 连接websocket

服务端启动后,用错误的token请求后,在浏览器按F12可以看到如下的信息:


用户鉴权.png

说明用户鉴权成功;
正常情况,连接成功后,浏览器控制台会有如下信息:

Connected: CONNECTED
user-name:nickel
heart-beat:0,0
version:1.1

>>> SUBSCRIBE
id:sub-0
destination:/topic/message

>>> SUBSCRIBE
id:sub-1
destination:/user/nickel/message

服务端会有如下信息:

04-13 14:20:23.485 |-INFO  c.a.t.g.a.c.UserChannelInterceptor:49 - 【nickel】用户上线了

4. 网关如何转发

如果项目接入了网关(SpringCloud Gateway),如何转发websocket请求?

4.1 服务端改动点

  • 网关新增websocket配置,如下
spring:
   cloud:
    gateway:
      routes:
        #表示authservice的正常http请求
        - id: authservice
          uri: http://127.0.0.1:9091
          predicates:
            - Path= /auth/**
          filters:
            - StripPrefix=1
        #表示websocket的转发
        - id: authservice-websocket
          uri: ws://127.0.0.1:9091
          predicates:
            - Path= /auth/websocket/**
          filters:
            - StripPrefix=1

注意:这里的path使用了前缀【auth】,为了和authservice保持一致,这样Sockjs发送的get类型的/info请求就会被正常http请求拦截。

  • 跨域问题
    如果接入网关地址后出现如下问题:
Access to XMLHttpRequest at 'http://ip:9090/auth/websocket/info?t=1618297871740' from origin 'http://localhost:8080' has been blocked by CORS policy: The value of the 'Access-Control-Allow-Origin' header in the response must not be the wildcard '*' when the request's credentials mode is 'include'. The credentials mode of requests initiated by the XMLHttpRequest is controlled by the withCredentials attribute.

在网关层的配置类中,由“*”改为request.getHeaders().getOrigin() ;

    @Bean
    public WebFilter webFilter() {
        return (exchange, chain) -> {
            ServerHttpRequest request = exchange.getRequest();
            if (CorsUtils.isCorsRequest(request)) {
                ServerHttpResponse response = exchange.getResponse();
                HttpHeaders headers = response.getHeaders();
                //改动点由“*”改为request.getHeaders().getOrigin() 
                headers.add("Access-Control-Allow-Origin", request.getHeaders().getOrigin());
                headers.add("Access-Control-Allow-Methods", ALLOWED_METHODS);
                headers.add("Access-Control-Max-Age", MAX_AGE);
                headers.add("Access-Control-Allow-Headers", ALLOWED_HEADERS);
                headers.add("Access-Control-Expose-Headers", ALLOWED_EXPOSE);
                headers.add("Access-Control-Allow-Credentials", "true");

                if (request.getMethod() == HttpMethod.OPTIONS) {
                    response.setStatusCode(HttpStatus.OK);
                    return Mono.empty();
                }
            }
            return chain.filter(exchange);
        };
    }
  • 解决请求头重复的问题
    如果接入网关地址后出现如下问题:
Opening Web Socket...
localhost/:1 Access to XMLHttpRequest at 'http://ip:9090/auth/websocket/info?t=1618297173825' from origin 'http://localhost:8080' has been blocked by CORS policy: The 'Access-Control-Allow-Origin' header contains multiple values 'http://localhost:8080, http://localhost:8080', but only one is allowed.
stomp.min.js:8 Whoops! Lost connection to http://ip:9090/auth/websocket

需要在网关层新增如下配置类,参考文章

/**
 * 功能描述:
 * websocket
 * 防止请求头中有重复的
 * @Author: nickel
 * @Date: 2021/4/1 11:14
 */
public class CorsResponseHeaderFilter implements GlobalFilter, Ordered {

    @Override
    public int getOrder() {
        // 指定此过滤器位于NettyWriteResponseFilter之后
        // 即待处理完响应体后接着处理响应头
        return NettyWriteResponseFilter.WRITE_RESPONSE_FILTER_ORDER + 1;
    }

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        return chain.filter(exchange).then(Mono.defer(() -> {
            exchange.getResponse().getHeaders().entrySet().stream()
                    .filter(kv -> (kv.getValue() != null && kv.getValue().size() > 1))
                    .filter(kv -> (kv.getKey().equals(HttpHeaders.ACCESS_CONTROL_ALLOW_ORIGIN)
                            || kv.getKey().equals(HttpHeaders.ACCESS_CONTROL_ALLOW_CREDENTIALS)))
                    .forEach(kv -> {
                        kv.setValue(new ArrayList<String>() {{
                            add(kv.getValue().get(0));
                        }});
                    });

            return chain.filter(exchange);
        }));
    }
}

并在网关配置类中纳入Spring容器进行管理:

@Configuration
public class MyGatewayConfig {
 @Bean
    public CorsResponseHeaderFilter getCorsResponseHeaderFilter(){
        return new CorsResponseHeaderFilter();
    }
}

4.2 客服端改动点

只需要把地址变为网关地址即可:
如下:

    //var socket = new SockJS('http://服务ip:9091/websocket');
    //写成对应网关地址
    var socket = new SockJS('http://网关ip:9090/auth/websocket');
    stompClient = Stomp.over(socket);

注:网关转发地址有配置前缀auth

4.3 nginx 配置更改

如果项目用了nginx代理,则需要更改对应的配置,支持转发websocket请求;
主要在location配置端增加以下配置项:

proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";

5. Redis的订阅与发布

上面网关转发的功能已完成,还存在一个问题就是微服务架构下,多个服务节点,如何给在线用户发送及时消息;这里我们主要用的redis的订阅与发布,大家也可以使用Mq消息队列处理;

  • Redis配置类修改
    这里直接看配置类代码,为了实现订阅与发布,需要新增配置RedisMessageListenerContainer 和MessageListenerAdapter
/**
 * <pre>
 * RedisCacheConfig
 * </pre>
 *
 * @author nickel
 * @version RedisCacheConfig.java
 */
@Configuration
@ConditionalOnClass({RedisOperations.class})
public class RedisCacheConfig {


    @Bean
    public RedisSerializer fastJson2JsonRedisSerializer() {
        return new FastJson2JsonRedisSerializer<>(Object.class);
    }

    @Bean
    public RedisTemplate<String, Serializable> redisTemplate(RedisConnectionFactory redisConnectionFactory, RedisSerializer fastJson2JsonRedisSerializer) {
        RedisTemplate<String, Serializable> template = new RedisTemplate<>();
        template.setConnectionFactory(redisConnectionFactory);

        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
        template.setKeySerializer(stringRedisSerializer);
        template.setHashKeySerializer(stringRedisSerializer);
        template.setValueSerializer(fastJson2JsonRedisSerializer);
        template.setHashValueSerializer(fastJson2JsonRedisSerializer);

        return template;
    }

    /**
     * redis 监听配置
     * 配置一个REDIS_TOPIC_NAME channel
     * @param redisConnectionFactory redis 配置
     * @return
     */
    @Bean
    public RedisMessageListenerContainer redisContainer(RedisConnectionFactory redisConnectionFactory, MessageListenerAdapter commonListenerAdapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(redisConnectionFactory);
        container.addMessageListener(commonListenerAdapter, new ChannelTopic(MagicValueConstants.REDIS_TOPIC_NAME));
        return container;
    }
    /**
     * 配置redis监听类MyRedisReceiver 
     * @param redisReceiver
     * @return
     */
    @Bean
    MessageListenerAdapter commonListenerAdapter(MyRedisReceiver redisReceiver) {
        MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(redisReceiver, "onMessage");
        messageListenerAdapter.setSerializer(fastJson2JsonRedisSerializer());
        return messageListenerAdapter;
    }
}

  • MyRedisReceiver 消息监听者创建
/**
 * 功能描述:
 * redis 订阅者
 *
 * @Author: nickel
 * @Date: 2021/4/2 16:20
 */
@Configuration
@Slf4j
public class MyRedisReceiver {

    /**
     * 接受消息并调用业务逻辑处理器
     *
     * @param map
     */
    public void onMessage(Map<String, String> map) {
        String handlerName = map.get(MagicValueConstants.HANDLER_NAME);
        MyRedisListener myRedisListener = null;
        try {
            if (StringUtils.isNotBlank(handlerName)) {
                myRedisListener = ApplicationContextUtils.getBean(handlerName, MyRedisListener .class);
            } else {
                myRedisListener = ApplicationContextUtils.getBean(MyRedisListener .class);
            }
            if (null != myRedisListener ) {
                myRedisListener .onMessage(map);
            }
        }catch (Exception e){
            log.info("redis listener getbean not foud");
        }

    }
}
  • 创建redis消息消费者
/**
 * 功能描述:
 * 定义redis监听接口
 * @Author: nickel
 * @Date: 2021/4/2 16:31
 */
public interface MyRedisListener {

    void onMessage(Map<String, String> message);
}



/**
 * 监听消息实现类(采用redis发布订阅方式发送消息)
 */
@Slf4j
@Component
public class SocketHandler implements MyRedisListener {

    @Autowired
    private WebSocketService webSocket;

    @Override
    public void onMessage(Map<String, String> map) {
        log.info("【MySocketHandler消息】Redis Listerer:" + map.toString());

        String username = map.get("username");
        String message = map.get("message");
        String id = map.get("id");
        if (StringUtils.isNotEmpty(username)) {
            webSocket.pushMessage(username, message,id);
        } else {
            webSocket.pushMessage(message,id);
        }
    }
}
  • 如何发布消息
    配置信息和消费者配置完成后,咱们看下redis如何发布消息:
**
 * redis客户端
 * 发布消息
 */
@Configuration
public class MyRedisClient {

    @Resource
    private RedisTemplate<String, Object> redisTemplate;


    /**
     * 发送消息
     *
     * @param params
     */
    public void sendMessage(String handerName, Map<String,String> params) {
        params.put(MagicValueConstants.HANDLER_NAME,handerName);
        redisTemplate.convertAndSend(MagicValueConstants.REDIS_TOPIC_NAME, params);
    }


}

直接通过redisTemplate向我们定义的REDIS_TOPIC_NAME通道放入消息;
在需要发布消息的地方注入MyRedisClient 对象即可,可以参考2.5 WebSocketService 中的代码;

介绍完毕,感谢大家能看到这里!

欢迎大家留言,提出建议,谢谢!

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,088评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,715评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,361评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,099评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 60,987评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,063评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,486评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,175评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,440评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,518评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,305评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,190评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,550评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,880评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,152评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,451评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,637评论 2 335

推荐阅读更多精彩内容