1. WebSocket介绍
WebSocket协议RFC 6455提供了一种标准化的方法,可以通过一个TCP连接在客户机和服务器之间建立一个全双工的双向通信通道。它是与HTTP不同的TCP协议,但设计用于在HTTP上工作,使用端口80和443,并允许重用现有的防火墙规则。
WebSocket交互从一个HTTP请求开始,该请求使用HTTP 的Upgrade
header进行升级,或者在本例中切换到WebSocket协议。下面的例子展示了这样一种交互:
GET /spring-websocket-portfolio/portfolio HTTP/1.1
Host: localhost:8080
Upgrade: websocket // Upgrade header
Connection: Upgrade // 使用这个Upgrade连接
Sec-WebSocket-Key: Uc9l9TMkWGbHFD2qnFHltg==
Sec-WebSocket-Protocol: v10.stomp, v11.stomp
Sec-WebSocket-Version: 13
Origin: http://localhost:8080
与通常的200状态代码不同,支持WebSocket的服务器返回的输出如下:
HTTP/1.1 101 Switching Protocols // 协议switch
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: 1qVdfYHU9hPOl4JYYNXF623Gzn0=
Sec-WebSocket-Protocol: v10.stomp
成功握手之后,HTTP upgrade请求底层的TCP套接字将保持打开状态,以便客户机和服务器继续发送和接收消息。
注意,如果WebSocket服务器运行在web服务器的后面(例如nginx),您可能需要将其配置为将WebSocket upgrade请求传递给WebSocket服务器。同样,如果应用程序在云环境中运行,请检查与WebSocket支持相关的云提供商的说明。
1.1 HTTP VS WebSocket
尽管WebSocket被设计为与HTTP兼容,并从HTTP请求开始,但重要的是要理解这两个协议之间不同的体系结构和应用程序编程模型。
在HTTP和REST中,应用程序被建模为多个url。要与应用程序交互,客户机访问这些url,即请求-响应样式。服务器根据HTTP URL、方法和头将请求路由到适当的处理程序。
相反,在WebSockets中,通常只有一个初始连接URL。随后,所有应用程序消息都在同一个TCP连接上流动。这指向一个完全不同的异步、事件驱动的消息传递体系结构。
WebSocket也是一种底层传输协议,与HTTP不同,它不为消息的内容指定任何语义。这意味着,除非客户机和服务器在消息语义上达成一致,否则无法路由或处理消息。
WebSocket客户机和服务器可以通过HTTP握手请求上的Sec-WebSocket-Protocol
头协商使用更高级别的消息传递协议(例如,STOMP)。如果没有这些,他们需要制定自己的约定。
1.2 何时使用WebSocket
WebSockets可以使web页面具有动态性和交互性。然而,在许多情况下,Ajax和HTTP流媒体或长轮询的组合可以提供简单而有效的解决方案。
低延迟、高频率和高容量的组合为WebSocket的使用提供了最佳方案.
还要记住,在Internet上,超出您控制范围的限制性代理可能会阻止WebSocket交互,这可能是因为它们没有配置为传递upgrade头,也可能是因为它们关闭了看起来空闲的长寿命连接。这意味着在防火墙内为内部应用程序使用WebSocket比面向公共应用程序更直接。
2. WebSocket API
Spring框架提供了一个WebSocket API,您可以使用它来编写处理WebSocket消息的客户端和服务器端应用程序。
2.1 WebSocketHandler
创建一个WebSocket服务是简单的, 实现WebSocketHandler接口,或者是继承TextWebSocketHandler或BinaryWebSocketHandler。下面的例子使用TextWebSocketHandler:
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.TextMessage;
public class MyHandler extends TextWebSocketHandler {
@Override
public void handleTextMessage(WebSocketSession session, TextMessage message) {
// ...
}
}
有专门的WebSocket Java配置和XML命名空间支持将前面的WebSocket处理程序映射到特定的URL,如下例所示:
- Java:
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(myHandler(), "/myHandler");
}
@Bean
public WebSocketHandler myHandler() {
return new MyHandler();
}
}
- xml:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:websocket="http://www.springframework.org/schema/websocket"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/websocket
https://www.springframework.org/schema/websocket/spring-websocket.xsd">
<websocket:handlers>
<websocket:mapping path="/myHandler" handler="myHandler"/>
</websocket:handlers>
<bean id="myHandler" class="org.springframework.samples.MyHandler"/>
</beans>
前面的示例用于Spring MVC应用程序,应该包含在DispatcherServlet的配置中。然而,Spring的WebSocket支持并不依赖于Spring MVC。在WebSocketHttpRequestHandler
的帮助下,将WebSocketHandler
集成到其他http服务环境中相对简单。
当直接或间接使用WebSocketHandler API时,例如,通过STOMP传递消息,应用程序必须同步发送消息,因为底层标准WebSocket会话(JSR-356)不允许并发发送。一个解决方法是使用ConcurrentWebSocketSessionDecorator包装WebSocketSession。
2.2 WebSocket Handshake
自定义初始HTTP WebSocket握手请求的最简单方法是通过HandshakeInterceptor,它公开握手“之前”和“之后”的方法。您可以使用这个拦截器来阻止握手,或者给WebSocketSession添加属性。下面的示例使用内置拦截器将HTTP会话属性传递给WebSocket会话:
- Java:
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(new MyHandler(), "/myHandler")
.addInterceptors(new HttpSessionHandshakeInterceptor());
}
}
- xml:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:websocket="http://www.springframework.org/schema/websocket"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/websocket
https://www.springframework.org/schema/websocket/spring-websocket.xsd">
<websocket:handlers>
<websocket:mapping path="/myHandler" handler="myHandler"/>
<websocket:handshake-interceptors>
<bean class="org.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor"/>
</websocket:handshake-interceptors>
</websocket:handlers>
<bean id="myHandler" class="org.springframework.samples.MyHandler"/>
</beans>
一个更高级的选项是扩展DefaultHandshakeHandler,它执行WebSocket握手的步骤,包括验证客户端起源、协商子协议和其他细节。如果应用程序需要配置定制的RequestUpgradeStrategy以适应尚未支持的WebSocket服务器引擎和版本,那么它也可能需要使用这些选项.
Spring提供了一个WebSocketHandlerDecorator基类,您可以使用这个基类用其他行为装饰WebSocketHandler。在使用WebSocket Java配置或XML名称空间时,默认情况下提供并添加日志记录和异常处理实现。ExceptionWebSocketHandlerDecorator捕获来自任何WebSocketHandler方法的所有未捕获异常,并以状态1011关闭WebSocket会话,这表示服务器出错。
2.3 部署
Spring WebSocket API很容易集成到Spring MVC应用程序中,其中DispatcherServlet同时提供HTTP WebSocket握手和其他HTTP请求。通过调用WebSocketHttpRequestHandler,还可以很容易地集成到其他HTTP处理场景中。这既方便又容易理解。但是,对于JSR-356运行时要特别注意。
Java WebSocket API (JSR-356)提供了两种部署机制。第一个在启动时的Servlet容器类路径扫描(Servlet 3特性)。另一个是注册API,用于Servlet容器初始化。这两种机制都不能为所有HTTP处理(包括WebSocket握手和所有其他HTTP请求)使用单个“前端控制器”,比如Spring MVC的DispatcherServlet。
这是JSR-356的一个重要限制,即使在JSR-356运行中运行时,Spring的WebSocket也支持特定于服务器的RequestUpgradeStrategy
实现。目前,这种策略适用于Tomcat、Jetty、GlassFish、WebLogic、WebSphere和Undertow(以及WildFly)。
第二个需要考虑的是,具有JSR-356支持的Servlet容器将执行ServletContainerInitializer
(SCI)扫描,这可能会减慢应用程序的启动速度——在某些情况下会很明显。如果在升级到支持JSR-356的Servlet容器版本后观察到明显的影响,可以通过web.xml中使用< absolelt - ordered />元素有选择地启用或禁用web片段(和SCI扫描)。如下例所示:
<web-app xmlns="http://java.sun.com/xml/ns/javaee"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://java.sun.com/xml/ns/javaee
https://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
version="3.0">
<absolute-ordering/>
</web-app>
您也可以根据名称选择性地启用web片段,例如Spring自己的SpringServletContainerInitializer
,它提供了对Servlet 3 Java初始化API的支持。下面的例子说明了如何做到这一点:
<web-app xmlns="http://java.sun.com/xml/ns/javaee"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://java.sun.com/xml/ns/javaee
https://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
version="3.0">
<absolute-ordering>
<name>spring_web</name>
</absolute-ordering>
</web-app>
2.4 服务器配置
每个底层WebSocket引擎都公开控制运行时特性的配置属性,例如消息缓冲区大小、空闲超时等。
对于Tomcat、WildFly和GlassFish,您可以在WebSocket 配置中添加一个ServletServerContainerFactory
Bean,如下面的示例所示:
- Java:
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
@Bean
public ServletServerContainerFactoryBean createWebSocketContainer() {
ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();
container.setMaxTextMessageBufferSize(8192);
container.setMaxBinaryMessageBufferSize(8192);
return container;
}
}
- xml:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:websocket="http://www.springframework.org/schema/websocket"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/websocket
https://www.springframework.org/schema/websocket/spring-websocket.xsd">
<bean class="org.springframework...ServletServerContainerFactoryBean">
<property name="maxTextMessageBufferSize" value="8192"/>
<property name="maxBinaryMessageBufferSize" value="8192"/>
</bean>
</beans>
对于客户端WebSocket配置,应该使用
WebSocketContainerFactoryBean
(XML)或ContainerProvider.getWebSocketContainer()
(Java配置)。
对于Jetty,您需要提供预先配置好的Jetty WebSocketServerFactory,并通过WebSocket Java配置将其插入Spring的DefaultHandshakeHandler。
- Java:
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(echoWebSocketHandler(),
"/echo").setHandshakeHandler(handshakeHandler());
}
@Bean
public DefaultHandshakeHandler handshakeHandler() {
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.SERVER);
policy.setInputBufferSize(8192);
policy.setIdleTimeout(600000);
return new DefaultHandshakeHandler(
new JettyRequestUpgradeStrategy(new WebSocketServerFactory(policy)));
}
}
- xml:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:websocket="http://www.springframework.org/schema/websocket"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/websocket
https://www.springframework.org/schema/websocket/spring-websocket.xsd">
<websocket:handlers>
<websocket:mapping path="/echo" handler="echoHandler"/>
<websocket:handshake-handler ref="handshakeHandler"/>
</websocket:handlers>
<bean id="handshakeHandler" class="org.springframework...DefaultHandshakeHandler">
<constructor-arg ref="upgradeStrategy"/>
</bean>
<bean id="upgradeStrategy" class="org.springframework...JettyRequestUpgradeStrategy">
<constructor-arg ref="serverFactory"/>
</bean>
<bean id="serverFactory" class="org.eclipse.jetty...WebSocketServerFactory">
<constructor-arg>
<bean class="org.eclipse.jetty...WebSocketPolicy">
<constructor-arg value="SERVER"/>
<property name="inputBufferSize" value="8092"/>
<property name="idleTimeout" value="600000"/>
</bean>
</constructor-arg>
</bean>
</beans>
2.5 Allowed Origins
从Spring Framework 4.1.5起,WebSocket和SockJS的默认行为是只接受同源请求。也可以允许所有或指定的源列表。这个检查主要是为浏览器客户端设计的。没有任何东西可以阻止其他类型的客户机修改源头值.
这三种可能的行为是:
- 只允许同源请求(默认): 在这种模式下,当启用SockJS时,Iframe HTTP响应头X-Frame-Options被设置为SAMEORIGIN, JSONP传输被禁用,因为它不允许检查请求的起源。因此,当启用此模式时,不支持IE6和IE7。
- 允许指定的来源列表: 每个被允许的源必须以http://或https://开头。在此模式下,启用SockJS,将禁用IFrame传输。因此,当启用此模式时,不支持IE6到IE9。
- 允许所有原点:要启用此模式,您应该提供*作为允许的原点值。在此模式下,所有传输都可用。
您可以配置WebSocket和SockJS允许的起源,如下例所示:
- Java:
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(myHandler(), "/myHandler").setAllowedOrigins("https://mydomain.com");
}
@Bean
public WebSocketHandler myHandler() {
return new MyHandler();
}
}
- xml:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:websocket="http://www.springframework.org/schema/websocket"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/websocket
https://www.springframework.org/schema/websocket/spring-websocket.xsd">
<websocket:handlers allowed-origins="https://mydomain.com">
<websocket:mapping path="/myHandler" handler="myHandler" />
</websocket:handlers>
<bean id="myHandler" class="org.springframework.samples.MyHandler"/>
</beans>
3. SockJS Fallback
3.1 简介
SockJS的目标是让应用程序使用WebSocket API,但在运行时需要时返回到非WebSocket替代方案,而不需要更改应用程序代码。
SockJS包括:
- SockJS协议。
- SockJS JavaScript client : 用于浏览器的客户端库。
- SockJS服务器实现,包括Spring框架中的一个Spring -websocket模块。
- spring-websocket模块中的SockJS Java客户端(自4.1版以来)。
SockJS是为浏览器设计的。它使用多种技术来支持多种浏览器版本。传输分为三大类:WebSocket、HTTP流和HTTP长轮询。
SockJS客户端首先发送GET /info从服务器获取基本信息。然后,它必须决定使用什么传输工具。如果可能,使用WebSocket。如果没有,在大多数浏览器中,至少有一个HTTP流选项。如果没有,则使用HTTP (long)轮询。
所有的传输请求都有以下URL结构:
http://host:port/myApp/myEndpoint/{server-id}/{session-id}/{transport}
- {server-id}: 用于路由集群中的请求,但不用于其他用途。
- {session-id}: 关联属于SockJS会话的HTTP请求。
- {transport}: 指示传输类型(例如,websocket、xhr-streaming等)。
WebSocket传输只需要一个HTTP请求就可以完成WebSocket握手。此后所有消息都在该套接字上交换。
SockJS添加了最小的消息帧。例如,服务器最初发送字母o(“打开”帧),消息以[“message1”、“message2”(json编码的数组)、字母h(“heartbeat”帧)如果25秒内没有消息流(默认情况下),以及字母c(“关闭”帧)来关闭会话。
SockJS客户机允许固定传输列表,因此可以一次查看每个传输。SockJS客户机还提供了调试标志,它在浏览器控制台中启用了有用的消息。在服务器端,您可以为org.springframework.web.socket启用跟踪日志记录。
3.2 启用 SockJS
- Java:
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(myHandler(), "/myHandler").withSockJS();
}
@Bean
public WebSocketHandler myHandler() {
return new MyHandler();
}
}
- xml:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:websocket="http://www.springframework.org/schema/websocket"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/websocket
https://www.springframework.org/schema/websocket/spring-websocket.xsd">
<websocket:handlers>
<websocket:mapping path="/myHandler" handler="myHandler"/>
<websocket:sockjs/>
</websocket:handlers>
<bean id="myHandler" class="org.springframework.samples.MyHandler"/>
</beans>
前面的示例用于Spring MVC应用程序,应该包含在DispatcherServlet的配置中。
3.3 IE8 和 IE9
3.4 心跳
SockJS协议要求服务器发送心跳消息,以防止代理将连接挂起。Spring SockJS配置有一个名为heartbeatTime的属性,您可以使用该属性定制频率。默认情况下,假设该连接上没有发送其他消息, 心跳每隔25秒发送一次。
当在WebSocket和SockJS上使用STOMP时,如果STOMP客户端和服务器定义了要交换的心跳,SockJS心跳将被禁用。
Spring SockJS支持还允许您配置TaskScheduler来调度心跳任务。任务调度程序由线程池支持,默认设置基于可用处理器的数量。您应该考虑根据您的特定需求定制设置。
3.5 客户端断开
HTTP流和HTTP长轮询SockJS传输要求连接保持比通常打开的时间更长。
在Servlet容器中,这是通过Servlet 3异步支持完成的,它允许退出Servlet容器线程,处理请求,并继续从另一个线程写入响应。
一个问题是Servlet API不为已经断开的客户机提供通知.但是,Servlet容器在随后尝试写入响应时引发异常。由于Spring的SockJS服务支持服务器发送的心跳(默认为每25秒一次),这意味着客户端断开连接通常在这段时间内检测到(如果有消息发送则更早)。
3.6 SockJS 和 CORS
如果允许跨域请求, SockJS协议使用CORS在XHR流传输和轮询传输中提供跨域支持。因此,除非检测到响应中存在CORS header,否则将自动添加CORS标头。因此,如果应用程序已经配置为提供CORS支持(例如,通过Servlet过滤器),Spring的SockJsService将跳过这一部分。
通过在Spring的SockJsService中设置suppressCors属性,还可以禁用添加这些CORS头。
SockJS希望的header和值:
- Access-Control-Allow-Origin:从源请求头的值初始化。
- Access-Control-Allow-Credentials: 总是设置为true。
- Access-Control-Request-Headers:Initialized from values from the equivalent request header.
- Access-Control-Allow-Methods: 传输支持的HTTP方法(请参阅TransportType enum)。
- Access-Control-Max-Age: 设置为31536000(1年)。
有关确切的实现,请参见AbstractSockJsService中的addcorsheader和源代码中的TransportType enum。
如果CORS配置允许,考虑排除带有SockJS端点前缀的url,从而让Spring的SockJsService处理它。
3.7 SockJsClient
Spring提供了一个SockJS Java客户端来连接到远程SockJS端点,而无需使用浏览器。
SockJS Java客户机支持websocket、xhr-streaming和xhr-polling传输。剩下的只有在浏览器中才有意义。
你可以配置WebSocketTransport:
- StandardWebSocketClient: 在JSR-356中运行。
- JettyWebSocketClient :使用Jetty 9+本机WebSocket API
- Spring的WebSocketClient的任何实现
根据定义,XhrTransport既支持xhr流,也支持xhr轮询,因为从客户机的角度看,除了用于连接到服务器的URL之外,没有什么区别。目前有两种实现方式:
- RestTemplateXhrTransport: 使用Spring的RestTemplate进行HTTP请求.
- JettyXhrTransport : 使用Jetty的HttpClient进行HTTP请求.
下面的示例展示了如何创建SockJS客户机并连接到SockJS端点:
List<Transport> transports = new ArrayList<>(2);
transports.add(new WebSocketTransport(new StandardWebSocketClient()));
transports.add(new RestTemplateXhrTransport());
SockJsClient sockJsClient = new SockJsClient(transports);
sockJsClient.doHandshake(new MyWebSocketHandler(), "ws://example.com:8080/sockjs");
SockJS对消息使用JSON格式的数组。默认情况下,使用Jackson 2,并且需要在类路径上。或者,您可以配置SockJsMessageCodec的自定义实现,并在SockJsClient上配置它。
要使用SockJsClient来模拟大量并发用户,您需要配置底层HTTP客户机(用于XHR传输),以允许足够数量的连接和线程。下面的例子展示了如何使用Jetty:
HttpClient jettyHttpClient = new HttpClient();
jettyHttpClient.setMaxConnectionsPerDestination(1000);
jettyHttpClient.setExecutor(new QueuedThreadPool(1000));
下面的示例显示了服务器端与sockjs相关的属性(详细信息请参阅javadoc),您还应该考虑定制这些属性:
@Configuration
public class WebSocketConfig extends WebSocketMessageBrokerConfigurationSupport {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/sockjs").withSockJS()
.setStreamBytesLimit(512 * 1024) //将streamBytesLimit属性设置为512KB(默认值为128KB - 128 * 1024)。
.setHttpMessageCacheSize(1000) //将httpMessageCacheSize属性设置为1,000(缺省值为100)。
.setDisconnectDelay(30 * 1000); //将disconnectDelay属性设置为30秒(默认为5秒- 5 * 1000)。
}
// ...
}
4. STOMP
Simple Text Oriented Messaging Protocol,简单文本定向消息协议.
WebSocket协议定义了两种类型的消息(文本和二进制),但是没有定义它们的内容。协议定义了一种机制,让客户机和服务器协商一个子协议(即更高级别的消息传递协议),以便在WebSocket之上使用它来定义每种消息可以发送什么类型的消息、格式是什么、每种消息的内容等等。子协议的使用是可选的,但无论如何,客户机和服务器都需要就定义消息内容的某些协议达成一致。
4.1 概述
STOMP(简单的面向文本的消息传递协议)最初是为脚本语言(如Ruby、Python和Perl)创建的,用于连接到企业消息代理。它被用于处理常用消息传递模式的最小子集。STOMP可以用于任何可靠的双向流网络协议,如TCP和WebSocket。虽然STOMP是一个面向文本的协议,但是消息有效负载可以是文本的,也可以是二进制的。
STOMP是一种基于框架的协议,它的框架是基于HTTP建模的。下面的清单显示了STOMP框架的结构:
COMMAND
header1:value1
header2:value2
Body^@
客户端可以使用SEND或SUBSCRIBE命令发送或订阅消息。这启用了一个简单的发布-订阅机制,您可以使用该机制通过代理向其他连接的客户机发送消息,或者向服务器发送消息以请求执行某些工作。
当您使用Spring的STOMP支持时,Spring WebSocket应用程序充当客户机的STOMP代理。消息被路由到@Controller消息处理方法或简单的内存代理,该代理跟踪订阅并向订阅的用户广播消息。您还可以将Spring配置为使用专用的STOMP代理(如RabbitMQ、ActiveMQ等)来实际广播消息。在这种情况下,Spring维护到代理的TCP连接,向代理传递消息,并将消息从代理向下传递到已连接的WebSocket客户机。因此,Spring web应用程序可以依赖统一的基于http的安全性、公共验证和熟悉的消息处理编程模型。
下面的例子显示了一个订阅接收股票报价的客户端,服务器可能会定期发出股票报价(例如,通过一个调度任务将消息通过SimpMessagingTemplate发送给代理):
SUBSCRIBE
id:sub-1
destination:/topic/price.stock.*
^@
下面的示例显示了发送交易请求的客户机,服务器可以通过@MessageMapping方法处理该请求:
SEND
destination:/queue/trade
content-type:application/json
content-length:44
{"action":"BUY","ticker":"MMM","shares",44}^@
执行之后,服务器可以向客户端广播交易确认消息和详细信息。
目的地的含义在STOMP规范中故意保持不透明。它可以是任何字符串,完全取决于STOMP服务器来定义它们支持的目标的语义和语法。
STOMP服务器可以使用MESSAGE命令向所有订阅者广播消息。下面的例子显示了服务器发送股票报价给订阅的客户端:
MESSAGE
message-id:nxahklf6-1
subscription:sub-1
destination:/topic/price.stock.MMM
{"ticker":"MMM","price":129.45}^@
服务器不能发送未经请求的消息。来自服务器的所有消息必须响应特定的客户机订阅,服务器消息的订阅id头必须匹配客户机订阅的id头。
4.2 好处
使用STOMP作为子协议使Spring框架和Spring安全性提供了比使用原始WebSockets更丰富的编程模型。同样的观点也适用于HTTP和原始TCP,以及它如何让Spring MVC和其他web框架提供丰富的功能。以下是一些好处:
- 不需要创建自定义消息传递协议和消息格式。
- 可以使用STOMP客户端,包括Spring框架中的Java客户端。
- 您可以(选择性地)使用消息代理(如RabbitMQ、ActiveMQ和其他)来管理订阅和广播消息。
- 可以在任意数量的@Controller实例中组织应用程序逻辑,并且可以根据STOMP目标头将消息路由到它们,而不是使用一个WebSocketHandler为给定连接处理原始WebSocket消息。
- 您可以使用Spring Security基于STOMP目的地和消息类型来保护消息。
4.3 启用STOMP
spring-messaging和spring-websocket模块中提供了对基于WebSocket的STOMP支持。一旦有了这些依赖项,就可以使用SockJS Fallback在WebSocket上公开STOMP端点,如下面的示例所示:
- Java
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
// /portfolio是WebSocket(或SockJS)客户端需要连接的端点的HTTP URL,用于WebSocket握手。
registry.addEndpoint("/portfolio").withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
// 以/app开头的消息被路由到@Controller类中的@MessageMapping方法。
config.setApplicationDestinationPrefixes("/app");
// 使用内置的message broker订阅和广播消息,并将目标头以/topic '或' /queue开头的消息路由到代理。
config.enableSimpleBroker("/topic", "/queue");
}
}
- xml:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:websocket="http://www.springframework.org/schema/websocket"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/websocket
https://www.springframework.org/schema/websocket/spring-websocket.xsd">
<websocket:message-broker application-destination-prefix="/app">
<websocket:stomp-endpoint path="/portfolio">
<websocket:sockjs/>
</websocket:stomp-endpoint>
<websocket:simple-broker prefix="/topic, /queue"/>
</websocket:message-broker>
</beans>
对于内置的简单代理,/topic和/queue前缀没有任何特殊意义。它们只是区分发布-订阅和点到点消息传递(即,许多订阅者和一个消费者)之间的约定。使用外部代理时,请检查代理的STOMP页面,以了解它支持哪种类型的STOMP目的地和前缀。
要从浏览器连接,对于SockJS,可以使用SockJS -client。对于STOMP,许多应用程序都使用了jmesnil/ STOMP -websocket库(也称为stomp.js),该库功能齐全,已在生产中使用多年,但不再维护。目前,JSteunou/webstomp-client是该库最积极维护和发展的继承者。下面的例子代码就是基于它的:
var socket = new SockJS("/spring-websocket-portfolio/portfolio");
var stompClient = webstomp.over(socket);
stompClient.connect({}, function(frame) {
}
如果您通过WebSocket连接(没有SockJS),可以使用以下代码:
var socket = new WebSocket("/spring-websocket-portfolio/portfolio");
var stompClient = Stomp.over(socket);
stompClient.connect({}, function(frame) {
}
注意,上面示例中的stompClient不需要指定login
和passcode
header.即使它这样做了,它们也会在服务器端被忽略(或者,更确切地说,被覆盖)。
4.4 WebSocket服务器
对于Jetty,你需要通过StompEndpointRegistry
设置HandshakeHandler
和WebSocketPolicy
:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/portfolio").setHandshakeHandler(handshakeHandler());
}
@Bean
public DefaultHandshakeHandler handshakeHandler() {
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.SERVER);
policy.setInputBufferSize(8192);
policy.setIdleTimeout(600000);
return new DefaultHandshakeHandler(
new JettyRequestUpgradeStrategy(new WebSocketServerFactory(policy)));
}
}
4.5 消息流
一旦公开了STOMP端点,Spring应用程序就成为连接客户机的STOMP代理。本节描述服务器端上的消息流。
spring-messaging
模块包含对消息传递应用程序的基本支持.下面的列表简要描述了一些可用的消息传递抽象:
- Message: 消息的简单表示,包括header和有效负载。
- MessageHandler: 处理消息.
- MessageChannel: 用于发送消息的契约,该消息支持生产者和消费者之间的松散耦合。
- SubscribableChannel: 带有MessageHandler订阅者的MessageChannel。
- ExecutorSubscribableChannel: 使用
Executor
传递消息的SubscribableChannel。
Java配置(即@EnableWebSocketMessageBroker)和XML名称空间配置(即<websocket:message-broker>)都使用前面的组件组装消息工作流。下图显示了启用简单内置message broker时使用的组件:
上图显示了三个消息通道:
- clientInboundChannel: 用于传递从WebSocket客户端接收到的消息。
- clientOutboundChannel:用于向WebSocket客户端发送消息。
- brokerChannel:用于从服务器端应用程序代码中向消息代理发送消息。
下图显示了配置外部代理(如RabbitMQ)来管理订阅和广播消息时使用的组件:
前面两个图之间的主要区别是使用“代理中继”将消息通过TCP传递到外部STOMP代理,并将消息从代理传递到订阅的客户机。
当从WebSocket连接接收到消息时,它们被解码为STOMP帧,转换为Spring消息表示,并发送到clientInboundChannel进行进一步处理。例如,目标头以/app开头的STOMP消息可以路由到控制器中的带@MessageMapping注释方法,而/topic和/queue消息可以直接路由到消息代理。
处理来自客户机的STOMP消息的带注释的@Controller可以通过brokerChannel向消息代理发送消息,代理通过clientOutboundChannel将消息广播给匹配的订阅者。相同的控制器也可以对HTTP请求执行相同的响应,因此客户机可以执行HTTP POST,然后@PostMapping方法可以向message broker发送消息,以便向订阅的客户机广播。
我们可以通过一个简单的例子来跟踪流程。考虑下面的例子,它设置了一个服务器:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/portfolio");
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.setApplicationDestinationPrefixes("/app");
registry.enableSimpleBroker("/topic");
}
}
@Controller
public class GreetingController {
@MessageMapping("/greeting") {
public String handle(String greeting) {
return "[" + getTimestamp() + ": " + greeting;
}
}
上面的例子支持以下流程:
- a. 客户机连接到http://localhost:8080/portfolio,一旦建立了WebSocket连接,STOMP帧就开始在上面流动。
- b. 客户端发送一个带有目标标题/主题/问候语的订阅框架。接收和解码后,消息被发送到clientInboundChannel,然后路由到存储客户机订阅的message broker。