Stomp on Spring WebSocket项目源码分析

1 Why WebSocket?

WebSocket是一种在TCP连接上进行全双工网络通信的协议,之所以在有了Http协议之后,仍旧诞生了WebSocket的原因在于Http的缺陷:通信只能由客户端发起

举例来说,我们想象一个在线客服聊天场景,任何的数据传输都有可能由双方中的任意一方发起。HTTP1.x 协议做不到服务器在最快时间将客户的上一句话发给客服,以及将客服的回复立刻向客户推送,每次获取对方的最新消息,都必须由我方发起一个HTTP请求。这使得HTTP在很多场景的应用上大打折扣,WebSocket因此应运而生。

1.1 满足推送场景的几种技术方案

在WebSocket出现之前,在Web 或者C/S架构上,不论是实现服务端向客户端推送实时消息或者实现双向通信,不得不基于以下几种方案:

Polling (轮询)

浏览器定期发送 HTTP 请求并接收响应,作为服务器推送。如果消息传递的确切时间已知,这是一个尚可的解决方案,每次轮询都可以获取到服务器最新的数据信息。

然而,实际应用中,实时数据的什么时候需要传输通常是不可预测的,这必然造成许多不必要的请求,因此,在低频率消息的情况下,许多连接被不必要地打开和关闭,这也就需要不停地建立连接并传输较长的Header信息,在不断请求的过程中这将产生资源的浪费。如果减少轮询频率,又会造成消息推送的不及时,轮询间隔很难把控。

Long-Polling (长轮询)

为了减少无效的连接次数,最简单方便的优化,就是拉长单个HTTP连接的保持时间。

长轮询是让服务器在接收到浏览器所送出 HTTP 请求后,服务器等待一段时间,若在这段时间里面服务器有新的消息,服务器会把最新的消息传回给浏览器,如果等待的时间到了之后也没有新的消息的话,服务器也会送一个回应给浏览器,告知浏览器消息没有更新。

长轮询可以在低频通信的情况下减少产生原本轮询造成网络带宽浪费的情况,但在实际应用中,频率往往是不可预测 的,因此长轮询实现的依然是伪即时。此外,开发一套双向通信应用如在线客服聊天, 服务端通过请求接受消息和客户端轮询必定是两套机制,处理收/发消息的机制是割裂的,这和我们对一个在线客服聊天的直观理解有所不同。

iframe流(streaming)或者Flash socket等插件内嵌长连接

iframe流方式是在页面中插入一个隐藏的iframe,利用其src属性在服务器和客户端之间创建一条长连接,服务器向iframe传输数据,来实时更新页面。客户端只请求一次,然而服务端却是源源不断向客户端发送数据。Flash socket则是使用了flash提供的一套协议,但客户端必须安装Flash插件(被淘汰的技术),无法自动穿越防火墙。 这些方案虽然在历史上广泛使用,但并不属于主流方案,部分浏览器对其的兼容较差,例如firefox使用iframe+streaming的实现方式,浏览器页面会一直显示loading,最新版本的Chrome也已经不再支持flash。

SSE(Server-sent Events)

SSE ( Server-sent Events )是 WebSocket 的一种轻量代替方案,使用 HTTP 协议。
Server-sent Events 规范是 HTML 5 通讯协议是基于纯文本的简单协议。服务器端的响应的内容类型是“text/event-stream”。响应文本的内容可以看成是一个事件流,由不同的事件所组成。
每个事件由类型和数据两部分组成,同时每个事件可以有一个可选的标识符。
SSE 是单向通道,只能服务器向客户端发送消息,如果客户端需要向服务器发送消息,则需要一个新的 HTTP 请求。也就是说,SSE是只解决了服务器实时推送,但未解决客户端发起的通信。

1.2 WebSocket的优势

可以看出以上场景的一些特点,websocket拥有以下优势,更好得切合了这些场景。

  1. 支持双向通信
  2. 很强的实时性
  3. 更好的二进制支持,对传输的内容格式不作限制。这就使得发送内容不受协议本身限制。
  4. 较少的控制开销。连接创建后,ws客户端、服务端进行数据交换时,协议控制的数据包头部较小。在不包含头部的情况下,服务端到客户端的包头只有2~10字节(取决于数据包长度),客户端到服务端的的话,需要加上额外的4字节的掩码。而HTTP协议每次通信都需要携带完整的头部。
  5. 支持扩展。ws协议定义了扩展,用户可以扩展协议,或者实现自定义的子协议。(比如支持自定义压缩算法等)

总结,几种实时获取服务端的技术比较:

Polling Long-Polling iframe流/Flash WebSocket SSE
通讯方式 HTTP HTTP 基于TCP的长连接通讯 基于TCP的长连接通讯 HTTP
触发方式 轮询 轮询 事件 事件 事件
优点 简单,兼容性好 相对短轮询资源占用少 实现真正的即时通信,而不是伪即时。 全双工通讯,性能好,安全(最新协议版本),扩展性 实现简单, 占用很少的资源实现推送
缺点 资源占用高 资源占用高 兼容性差 缺乏通用的子协议,传输资源要进行二次解析,一定的开发门槛 低版本浏览器不支持(Safari 10.1+, Edge 14+, IE不支持)
常见的适用范围 B/S服务中的推送 B/S服务中的推送 B/S或者内嵌Flash的任何应用 比较通用 仅推送

1.3 WebSocket适用场景

不只是在线客服聊天,决定手头的工作是否需要使用WebSocket技术的方法可以主要可以参考以下两点:

1.你的应用提供多方互相且频繁的数据传输吗?

2.你的应用是需要实时展示服务器端经常变动的数据吗?

类似于以下场景:

  • 社交 / 订阅
    微信朋友圈的实时更新提醒、点赞或评论的红点通知,比如qq的特别关注人的动态提醒,比如聊天信息的实时同步,比如新闻客户端的订阅通知等等。

  • 多玩家游戏
    对于在线实时的多人游戏,互动效率是非常重要的,你可不想在扣动扳机之后,你的对手却已经在10秒钟之前移动了位置。

  • 协同办公 / 编辑
    不同人在不同地点同时编辑同一份文档。

  • 股市基金报价
    金融界瞬息万变——几乎是每毫秒都在变化。过时的信息也只能导致损失。使用WebSocket可以流式更新这些数据变化而不需要等待。

  • 基于位置的应用
    越来越多的开发者借用移动设备的GPS功能来实现他们基于位置的网络应用。比如共享单车、共享汽车、地图GPS服务、疫情监控目标人的实时运动轨迹、运动员的轨迹分析。借用WebSocket连接可以让数据飞起来。

1.4 和几种直播协议比较

不过,如果只是基于刚刚提到的两个标准,多方互相且频繁的数据传输、实时展示服务器端经常变动的数据,除了在线客服聊天,直播或者视讯也是另外一个比较常见的场景。但直播往往不使用WebSocket协议,常用的直播协议有HLS、RTMP、HTTP-FLV等。

在线客服聊天不同,直播协议往往只需要面对相对稳定频率的消息,且消息格式固定(流媒体)。此外,直播对可靠的连接要求不高,更强调的是实时,因此不需要重传机制,网络抖动丢失部分数据不被看重,总体的连贯性、实时性则必须优先保证,直播协议也是为此而生,也专门为此服务。而在线客服聊天,每一条聊天信息及发送状态记录都必须完整保留,因此说直播协议和WebSocket的出发点和目标是不同的。

2 Why STOMP On WebSocket?

2.1 用WebSocket搭建一个应用还需要什么

当选择了使用WebSocket来搭建应用,来看看为什么STOMP或者其他替代子协议,是WebSocket的一个必要补充。我们着手在浏览器上应用WebSocket,WebSocket协议本身的不足就会体现出来。

因为HTTP本身是基于TCP连接的,所以,在浏览器中使用WebSocket时,往往是在HTTP协议的基础上做了一个简单的升级,即建立TCP连接后,浏览器发送请求时,附带以下Header:

1
2
3
4
GET /chat HTTP/1.1
Host: tech.yangqianguan.com
Upgrade: websocket
Connection: Upgrade

就表示客户端希望升级连接,变成长连接的WebSocket,服务器返回升级成功的响应:

1
2
3
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade

收到成功响应后表示WebSocket“握手”成功, 同时代表WebSocket的这个TCP连接将不会被服务器关闭,服务器可随时向浏览器推送消息,浏览器也可随时向服务器推送消息。

当完成建立连接这一步后,接下来呢?WebSocket几乎没有规定发送内容的格式,或者可以说是没有提供任何的帮助,消息可以是任意格式的文本、二进制。WebSocket协议虽然有一个“消息架构”,但并不强制使用任何特定的“消息协议”。从OSI七层模型上理解,与HTTP不同,工作于应用层上的 WebSocket,其作用也仅是将字节流转换为消息流(文本或二进制),并不是一个OSI七层模型中完整的应用层协议。虽然WebSocket的限制少了,但想要在此基础上搭建一个B/S应用还需要一套“消息协议”,将可以是任意格式的文本、二进制实现为“消息”——对于web应用来说,“消息”是约定好的路由或者处理方式。WebSocket的“消息架构”不足以满足“消息协议”的需求,留下了很多的空白空间。可以试想,假如我们直接用Servlet API+纯文本来编写一套应用,会有多么复杂。这也就是为什么我们需要STOMP协议或者类似的协议,作为WebSocket协议的补充的原因。

2.2 STOMP(Simple Text Orientated Messaging Protocol)——WebSocket语义方面的补充

基于刚刚所述的背景,WebSocket RFC 定义了一个概念 sub-protocols ——基于握手时的 Sec-WebSocket-Protocol Header,服务器可以和client达成共识,建立一个基于子协议的连接。Spring框架提供了对使用STOMP子协议的支持。

STOMP是一种简单的消息传递协议,初衷是为脚本语言(如 Ruby、 Python 和 Perl)和web框架创建一种基于文本的简单异步消息协议。相比于正式诞生于2011年的WebSocket,STOMP在此之前广泛使用了十年以上,并且得到了很多客户端(如stomp.js、Gozirra、stomp.py、stompngo等)、消息代理端(如ActiveMQ、RabbitMQ等)、工具库的支持,目前最新的协议版本为2012年1.2版本。

具体来说,STOMP是一种基于Frame的协议,每个Frame由一个命令Command、一组Headers和可选的正文Body组成,如下是一个STOMP frame的基本结构示例:

1
2
3
4
5
6
SEND   //作为COMMAND
USER-TOKEN:value1 //作为Headers
content-type:application/json //作为Headers

Hello, stomp. //消息内容,可以多行,直到^@为止 //作为Body
^@ //此Frame结束

可以看到STOMP本身的结构是非常简单明了的。STOMP同样有客户端和服务端的概念,STOMP的命令分为客户端可服务端两类,有如下几种:

客户端命令:

  • CONNECT:用于初始化信息流或TCP连接,是客户端第一个需要发送的命令
  • SEND:表示向目的地发送消息,必须要包含一个名为destination的Headers
  • SUBSCRIBE:用于注册监听一个目的地,必须包含一个名为destination的Headers
  • BEGIN:用于启动事务,必须包含一个名为transaction的Headers
  • COMMIT:用于提交事务,必须包含一个名为transaction的Headers
  • ABORT:用于回滚事务,必须包含一个名为transaction的Headers
  • DISCONNECT:告知服务端关闭连接

服务端命令:

  • CONNECTED:服务器响应客户的段的CONNECT请求,表示连接成功
  • MESSAGE:用于将订阅的消息发送给客户端,Headersdestination的值应与SEND frame中的相同,且必须包含一个名为message-id的Headers用于唯一标识这个消息
  • RECIPT:收据,表示服务器成功处理了一个客户端要求返回收据的消息,必须包含Headersmessage-id表明是哪个消息的收据
  • ERROR:出现异常时,服务端可能会发送该命令,通常在发送ERROR后将关闭连接

总结为一句话:STOMP提供了发送消息、订阅消息的语义,同时还能够支持事务的处理。

有了STOMP,起码服务器能够知道报文中的哪部分是消息体本身,另一部分只是代表“我收到了消息”或者代表了“我离线了”。这对于了WebSocket协议构建应用时,是个有效的补充。更关键的在于,STOMP得到了Spring Framework的支持,这使得搭建WebSocket 应用时的成本大大降低。

2.3 STOMP On Spring WebSocket —— Spring推出的完整解决方案

Spring Framework 从 4.0.0 加入了spring-websocket 和 spring-messaging 两大模块。

Spring文档的篇幅、提供的应用样例以及spring-boot-starter-websocket直接引入了spring-websocket 和 spring-messaging模块(包含了STOMP等相关内容)等各种情况,不难看出基于STOMP做为其消息交互协议的方式,是spring主推的完整的websocket解决方案即 STOMP On Spring WebSocket,即使用STOMP也是spring框架的选择。相比较使用其他的STOMP on WebSocket架构,使用 spring-boot-starter-websocket 来构建 WebSocket 应用的优势又在于:

  1. 能够用得上Spring能够提供更加丰富的编程模型,和其他的Spring应用集成方便,风格一致

  2. 无需自定义消息交互协议和消息格式(协议用Stomp,反序列化Converter支持,和Spring MVC一致)

  3. 无需手动管理Session,Session的生命周期管理由Spring-WebSocket托管

  4. 能够使用消息队列来代为管理消息,可以简单的直接交给rabbit-mq 托管消息,这个从下边要演示的配置类可以看出,可以直接使用rabbit-mq的监控和管理体系。此外整合了同样使用了spring-integration的spring-boot-starter-amqp 了后,对于消息管理的定制化非常方便。

  5. 与Spring Security集成良好,可以基于WebSocket CONNECT、SEND、SUBSCRIBE等不同命令进行鉴权

总的来说,STOMP On Spring WebSocket既满足了快速搭建WebSocket应用的需求,又可以和已有的Spring其他项目深度结合,当下热度较高。

3 快速理解STOMP On Spring WebSocket

3.1 Pipes-and-Filters模型

消息分发和路由作为STOMP On Spring WebSocket的重中之重,是通过spring-messaging这个由Spring Integration抽象发展而来的模块来实现。Spring对于它的完整运作机制描写在了Spring Integration的文档中官方文档

简单来说,Spring Integration是一套消息控制框架,核心思想源自Enterprise Integration Pattern一书,根据文档,Spring解释构建此框架的核心目标,是通过消息将各个子系统解耦分离,为企业复杂系统集成提供一种便捷的解决方案。Enterprise Integration Pattern一书中提出了Pipes-and-Filters模型,定义了Pipe(管道)过滤器(Filters)消息(Message)

Pipe and Filters

spring-messaging中定义了以下三个Interface,就是依据Pipes-and-Filters模型

  • Message: 同Pipes-and-Filters模型中的消息(Message),包括消息Headers和负载,是对消息的抽象封装
  • MessageHandler:即Pipes-and-Filters模型中的过滤器(Filters),它是消息的消费端,接收消息并执行操作。
  • MessageChannel:即Pipes-and-Filters模型中的Pipe(管道),用于传递消息,解耦生产者和消费者。在应用层面,MessageChannel的消息可以连接一个或者多个MessageHandler,给channel发消息即是请求这些Handlers处理该消息。

spring-messaging中几乎所有主要实体,都是这三个Interface之一的实现类,可以说Pipes-and-Filters模型就是spring-messaging的整体设计的灵魂,spring-messaging就是依照Pipes-and-Filters模型,将下面要介绍的MessageHandler、MessageChannel的各种实现类,拼成了一个完整的消息处理和流转过程

3.3 理解STOMP On Spring WebSocket 所提供的过滤器

MessageHandler

STOMP On Spring WebSocket 所提供的过滤器都是MessageHandler的子类,
展开解释一个比较有特点的过滤器

  • 直接对接client消息的双向格式转换器Handler——SubProtocolWebSocketHandler:

该过滤器实现了 WebSocketHandlerWebSocketHandler 是spring-websocket项目定义的源格式消息(即WebSocketMessage message)的过滤器,因此就有了对源消息的流入的支持。 此外该过滤器又实现了spring-messaging的`MessageHandler`接口, 这代表它又可以处理特定子协议格式的消息即Message message,用来处理消息流出。

因此SubProtocolWebSocketHandler的作用可以概括为,对流入的源格式的WebSocketMessage message,将其根据子协议配置,流转入对应的SubProtocolHandler子类(例如使用Stomp协议就是StompSubProtocolHandler);而发消息时,又可以将Message message 处理为源格式的文本消息发出。这就承担了双向格式转换器的角色(就像一头为Type-C,一头为micro-USB接口的充电线,每一头都可以用来插电源或者设备),靠近client端都是源格式消息(即WebSocketMessage message),对下层则是子协议格式的消息即Message message。

  • 具体核心代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56

/**
* Handle an inbound message from a WebSocket client.
*/
@Override
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
//Session管理器,后续会提到
WebSocketSessionHolder holder = this.sessions.get(session.getId());
if (holder != null) {
session = holder.getSession();
}
//websocket使用Header Sec-WebSocket-Protocol来定义子协议类型
SubProtocolHandler protocolHandler = findProtocolHandler(session);
//将message根据子协议封装
protocolHandler.handleMessageFromClient(session, message, this.clientInboundChannel);
if (holder != null) {
holder.setHasHandledMessages();
}
//默认60秒无心跳关闭check, 此处并非唯一的心跳check,这里主要是为了关闭建立子协议连接就过慢的session。
checkSessions();
}

/**
* Handle an outbound Spring Message to a WebSocket client.
*/
@Override
public void handleMessage(Message<?> message) throws MessagingException {
String sessionId = resolveSessionId(message);
if (sessionId == null) {
if (logger.isErrorEnabled()) {
logger.error("Couldn't find session id in " + message);
}
return;
}

WebSocketSessionHolder holder = this.sessions.get(sessionId);
if (holder == null) {
if (logger.isDebugEnabled()) {
// The broker may not have removed the session yet
logger.debug("No session for " + message);
}
return;
}

WebSocketSession session = holder.getSession();
try {
//和上边方法的handleMessageFromClient正好做相反的处理
findProtocolHandler(session).handleMessageToClient(session, message);
}
catch (SessionLimitExceededException ex) {
//省略
}
catch (Exception ex) {
//省略
}
}

3.4 STOMP On Spring WebSocket 的管道

MessageChannel

管道过滤器相比的最大不同,是其不对消息本身做处理,只负责对接不同的过滤器,接受、发送或者拦截消息。这和中文意义上的管道不太一样,更像一个“电报收发员”的角色,这在MessageChannel(STOMP On Spring WebSocket 的管道都基于此)被使用最多的一个实现ExecutorSubscribableChannel上就可以看出。

具体来说,AbstractSubscribableChannel 作为一个管道 ,主要用来找到对应的订阅者handler并发送消息,不承担消息处理任务。核心代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
@Override
public boolean sendInternal(Message<?> message, long timeout) {
for (MessageHandler handler : getSubscribers()) {
SendTask sendTask = new SendTask(message, handler);
if (this.executor == null) {
sendTask.run();
}
else {
this.executor.execute(sendTask);
}
}
return true;
}

其中getSubscribers() 维护了一个Set ,每一个订阅者对应一个MessageHandler,从而同时可以实现广播或者指定发送。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

public Set<MessageHandler> getSubscribers() {
return Collections.<MessageHandler>unmodifiableSet(this.handlers);
}

@Override
public boolean subscribe(MessageHandler handler) {
boolean result = this.handlers.add(handler);
if (result) {
if (logger.isDebugEnabled()) {
logger.debug(getBeanName() + " added " + handler);
}
}
return result;
}

@Override
public boolean unsubscribe(MessageHandler handler) {
boolean result = this.handlers.remove(handler);
if (result) {
if (logger.isDebugEnabled()) {
logger.debug(getBeanName() + " removed " + handler);
}
}
return result;
}

SendTask 则是一个MessageHandlingRunnable线程类,用以通过线程池异步实现发送操作。总结一下,就AbstractSubscribableChannel来说,作为一个“电报收发员”,他负责找人、确定是要广播或者单发,然后交给异步线程发出,完全不负责内容。
这里从实践角度上看,“电报收发员”确实也是确实如描述一般关键,ExecutorSubscribableChannel的调优、重写是整个系统性能优化的重点之一。

3.5 三种通信模式

Stomp on Spring WebSocket 消息流在一系列大同小异的过滤器管道拼接下,支持了WebSocket应用,以下三种模式进行消息通信:

端到Topic的通信:用户可以订阅地址`topic/*,服务器和用户都可以向/topic/*发送消息,所有订阅了该topic的用户都可收到消息。

客户端与服务器之间类似HTTP API式的通信: 客户端直接发送请求给服务器,例如默认情况下,对于发送到/app/*的消息将被路由到@Controller注解的类中注解了@MessageMapping的方法上,与@RequestMapping非常相似,发送到/app/greeting的请求将由@MessageMapping(“/greeting”)下的方法处理。

端到Topic下指定用户通信: 给特定的用户发送消息。例如默认的/user前缀下,客户端可以订阅/user/topic/chat表明监听一个只会发给自己消息的地址/topic/chat,在服务端可以发送到形如/user/{username}/topic/*的目的地或者通过调用方法来给目标用户发送消息,spring将自动解析转化为用户会话唯一的目的地(如/topic/chat-user {session-id}),保证与其他用户不冲突。

3.6 详解完整的消息流转和处理过程

STOMP

上图是基于Pipes-and-Filters,用形形色色的过滤器管道所搭建出来的完整流程图,步骤依次是:
  • 客户端消息的流入:Spring-WebSocket的消息的源头为StandardWebSocketHandlerAdapter,该类继承了javax.websocket.Endpoint,可以看出虽然经过了层层封装,但是源头依然是基于原生Java API,从web容器中获取请求。

  • 消息封装为WebSocketMessage: 数据被封装为WebSocketMessage的子类后被发送给SubProtocolWebSocketHandler,SubProtocolWebSocketHandler通过传入的消息的类型,获取到对应的子协议处理器,即StompSubProtocolHandler,将WebSocketMessage交由其处理。

  • 从WebSocketMessage到Stomp Message: StompSubProtocolHandler解析消息,并将其封装为一条或者多条org.springframework.messaging.Message,同时设置对应的Header信息包括User、SessionId等等,封装完成后将消息发送给ClientInboundChannel处理。

  • ClientInboundChannel 将三种模式下的消息的流入转到对应的Handler: WebSocketAnnotationMethodMessageHandler处理客户端与服务器之间类似HTTP API式的通信,负责解析转换用户地址; SimpleBrokerMessageHandler处理端到Topic的通信, 负责发送响应消息以及记录订阅状况。UserDestinationMessageHandler处理端到Topic下指定用户通信,负责解析转换用户地址。不过,UserDestinationMessageHandler稍微特殊,其仅负责地址转换,转换完成后会重新将消息发送到ClientInboundChannel,再由SimpleBrokerMessageHandler将消息发送给ClientOutboundChannel。ClientInboundChannel 在图上的示例中自身是一个AbstractSubscribableChannel。

  • ClientOutboundChannel 统一负责将流出的消息转到SubProtocolWebSocketHandler:消息的统一出口是ClientOutboundChannel,其对应的Handler为SubProtocolWebSocketHandler,它保存了SessionId到Session的映射,根据Message Header的SessionId信息获取到对应的Session后,交由StompSubProtocolHandler最终执行消息的发送。此外SubProtocolWebSocketHandler也通过SimpMessageTemplate向外暴露,实际代码中是可以在其他场景下,直接向用户推送消息。同样ClientInboundChannel 在图上的示例中自身是一个AbstractSubscribableChannel。

    4 总结

比起传统方式,WebSocket协议本身有条件帮助我们的应用,更加高效的提供多方互相且频繁的数据传输,实时展示服务器端经常变动的数据。

在此基础上,STOMP on Spring WebSocket是Spring主推的websocket应用解决方案,选择使用STOMP协议,补充了WebSocket协议在语义方面的不足。框架spring-boot-starter-websocket抽象程度很高易理解,功能齐全。最关键的是其和Spring其他项目整合非常方便,且保持了一贯的Spring代码风格。虽然目前相关文档较少,但框架本身清晰易懂,便于分析。

本文从WebSocket协议要解决的痛点出发,再到使用STOMP协议来补充WebSocket协议搭建应用时的不足,引出本文所介绍的重点STOMP on Spring WebSocket解决方案,阐述了STOMP on Spring WebSocket解决了什么,及其为什么是个比较不错的方案。

最后部分,本文秉承设计原型——Pipes-and-Filters模型的思路,帮助大家快速理解STOMP on Spring WebSocket消息流转和处理过程。希望本文能够对想要使用WebSocket来搭建应用的朋友们有所帮助。

附录:配置类详解

演示配置类的部分代码,参考了Spring官方的引导样例(同时也提供了前端client),从此配置类可以配置STOMP On Spring WebSocket消息模型支持的几种模式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
@Configuration
@EnableWebSocketMessageBroker
public class StompWebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Autowired
private YqgHandshakeInterceptor yqgHandshakeInterceptor;

@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableSimpleBroker("/topic");
config.setApplicationDestinationPrefixes("/app");
config.setUserDestinationPrefix("/user");

//以下代码为使用RabbitMQ做消息代理"STOMP broker relay"处理所有消息将消息发送到外部的消息代理
config.enableStompBrokerRelay("/exchange", "/topic", "/queue", "/amq/queue")
.setVirtualHost("rabbit_mq_host") //对应rabbitmq virtual host
.setRelayHost("localhost")
.setClientLogin("root")
.setClientPasscode("root")
.setSystemLogin("root")
.setSystemPasscode("root")
.setSystemHeartbeatSendInterval(5000)
.setSystemHeartbeatReceiveInterval(4000);

}

}

@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/establish-websocket-sockjs").withSockJS();
registry.addEndpoint("/establish-websocket")
.setAllowedOrigins("*") //设置白名单
.setHandshakeHandler(handshakeHandler()) //重写handShake逻辑,集成鉴权,如SpringSecurity
.addInterceptors(yqgHandshakeInterceptor); //也可以在handShake前后拦截集成鉴权,如SpringSecurity
}


/**
* Inbound消息配置。可以配置拦截器进行具体Command级别的鉴权,例如判断是否有Subscribe一个Topic的权限;
* 可以自定义配置线程池大小。
*
* @param registration
*/
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.setInterceptors(webSocketInterceptor);
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(100);
executor.setQueueCapacity(1000);
executor.setKeepAliveSeconds(1000);
executor.setThreadNamePrefix("yqg");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
registration.taskExecutor(executor)
}

/**
* Outbound消息配置
*
* @param registration
*/
@Override
public void configureClientOutboundChannel(ChannelRegistration registration) {
}

/**
* 下面的几个方法和 SpringMvc中的WebMvcConfigurationSupport极度相似,作用也是一样的,不详细展开。
*
*/
@Override
public void addArgumentResolvers(List<HandlerMethodArgumentResolver> argumentResolvers) {
}

@Override
public void addReturnValueHandlers(List<HandlerMethodReturnValueHandler> returnValueHandlers) {
}

@Override
public boolean configureMessageConverters(List<MessageConverter> messageConverters) {
return true;
}
}
  1. @EnableWebSocketMessageBroker 的主要作用就是引入一个配置类DelegatingWebSocketMessageBrokerConfiguration,这个配置类的作用是遍历、汇总所有实现WebSocketMessageBrokerConfigurer的配置类,方便统一引入用户自定义的配置。具体来说,DelegatingWebSocketMessageBrokerConfiguration在注入时发现并收集Spring容器中的所有实现WebSocketMessageBrokerConfigurer的配置类,代码如下:

    1
    2
    3
    4
    5
    6
    7
    private final List<WebSocketMessageBrokerConfigurer> configurers = new ArrayList<>();
    @Autowired(required = false)
    public void setConfigurers(List<WebSocketMessageBrokerConfigurer> configurers) {
    if (!CollectionUtils.isEmpty(configurers)) {
    this.configurers.addAll(configurers);
    }
    }

    DelegatingWebSocketMessageBrokerConfiguration本身也实现WebSocketMessageBrokerConfigurer,每个方法做的事情都如下,即遍历获取所有WebSocketMessageBrokerConfigurer 类并分别调用对应的配置方法,从而一次给应用中所有的WebSocketMessageBrokerConfigurer引入用户自定义的配置。

    1
    2
    3
    4
    5
    6
    @Override
    protected void registerStompEndpoints(StompEndpointRegistry registry) {
    for (WebSocketMessageBrokerConfigurer configurer : this.configurers) {
    configurer.registerStompEndpoints(registry);
    }
    }
  2. 刚刚提到的WebSocketMessageBrokerConfigurer,目的是配置Message Broker(消息队列服务器)的各种参数。Message Broker(消息队列服务器)为Spring已经预置好了的发送消息、订阅消息的处理模型。在这种模型下,客户端可以订阅某个广播地址(广播)或传输通道地址(单播),订阅之后将会接收发布在该地址上的消息,同时客户端也可以主动给某个地址发送消息。 在上面的引导样例 中,WebSocketMessageBrokerConfigurer配置了:

  • MessageBrokerRegistry#enableSimpleBroker 用于配置topic传输消息的地址前缀,例如配置了前缀为/diyTopic,则可以订阅地址/diyTopic/*,一端(服务器或者客户端皆可)向/diyTopic/*发送消息,全部订阅者可以收到消息。

  • MessageBrokerRegistry#setApplicationDestinationPrefixes用于配置客户端与服务器之间类似HTTP API式的通信中发送给服务器执行处理的全局地址前缀,例如配置了前缀为/abc/, 则对于发送到/abc/*的消息将被路由到@Controller注解的类中注解了@MessageMapping的方法上,这与与@RequestMapping非常相似。

  • MessageBrokerRegistry#setUserDestinationPrefix用以配置端到Topic下指定用户通信模式下的用户地址前缀。例如配置了前缀为/onlyMe在默认的/onlyMe前缀情况下,客户端可以订阅/onlyMe/topic/chat表明监听一个只会发给自己消息的地址/topic/chat,在服务端可以发送到形如/onlyMe/{username}/topic/*的目的地,或者通过调用SimpMessagingTemplate的convertAndSendToUser(String user, String destination, Object payload,
    Map<String, Object> headers)来给目标用户发送消息,spring将自动解析转化为用户会话唯一的目的地(如/topic/chat-user {session-id}),保证与其他用户不冲突。

  • StompEndpointRegistry#addEndpoint添加建立WebSocket连接的地址,后续的所有消息都依托于此连接。此外选择是否开启SockJs模式以兼容不支持WebSocket协议的浏览器;集成鉴权、配置Origin白名单等。

  • 此外,还有类似于SpringMvc中的WebMvcConfigurationSupport的各种拦截器、解析器配置、线程池配置。