SpringBoot WebSocket STOMP 广播配置
ITDragon龙 人气:0
[TOC]
## 1. 前言
WebSocket是一种在单个TCP连接上进行全双工通信的协议,常用于实时通信的场景。在没有使用高层级线路协议的情况下,直接使用WebSocket是很难实现发布订阅的功能。而STOMP是在WebSocket之上提供了一个基于帧的线路格式层,STOMP客户端可以同时作为生产者和消费者两种模式。为发布订阅的功能提供了基础。
## 2. STOMP协议
> [STOMP](http://stomp.github.com/) is a simple text-orientated messaging protocol. It defines an [interoperable wire format](http://stomp.github.com/stomp-specification-1.1.html) so that any of the available STOMP clients can communicate with any STOMP message broker to provide easy and widespread messaging interoperability among languages and platforms (the STOMP web site has a [list of STOMP client and server implementations](http://stomp.github.com/implementations.html).
文档地址:
## 3. SpringBoot WebSocket集成
SpringBoot集成WebSocket非常方便,只需要简单的三个步骤:导包、配置、提供接口
### 3.1 导入websocket包
```groovy
compile('org.springframework.boot:spring-boot-starter-websocket')
```
### 3.2 配置WebSocket
第一步:创建WebSocketConfig类,通过@EnableWebSocketMessageBroker 启用代理支持的消息传递。
第二步:重写registerStompEndpoints和configureMessageBroker方法。
第三步:注册对外可访问的stomp端点、访问方式和连接跨域设置。
第四步:配置消息代理。可设置广播模式和点对点通讯。也可以添加订阅通道的前缀。
```kotlin
package com.itdragon.server.config
import org.springframework.context.annotation.Configuration
import org.springframework.messaging.simp.config.MessageBrokerRegistry
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker
import org.springframework.web.socket.config.annotation.StompEndpointRegistry
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer
@Configuration
@EnableWebSocketMessageBroker
class WebSocketConfig : WebSocketMessageBrokerConfigurer {
override fun configureMessageBroker(config: MessageBrokerRegistry) {
// 设置订阅Broker名称,/topic为广播模式
config.enableSimpleBroker("/topic")
// 设置应用程序全局目标前缀
config.setApplicationDestinationPrefixes("/itdragon")
}
override fun registerStompEndpoints(registry: StompEndpointRegistry) {
// 允许使用socketJs方式访问,访问端点为socket,并允许跨域
registry.addEndpoint("/socket").setAllowedOrigins("*").withSockJS()
}
}
```
注意:
若使用了setApplicationDestinationPrefixes方法,则作用主要体现在@SubscribeMapping和@MessageMapping上。如控制层配置@MessageMapping("/sendToServer"),则客户端发送的地址是 /itdragon/sendToServer
### 3.3 对外暴露接口
第一步:创建WebSocket的控制层类,并注入用于发送消息的SimpMessagingTemplate。
第二步:配置通过@MessageMapping注解修饰的方法来接收客户端SEND的操作。
第三步:配置通过@SubscribeMapping注解修饰的方法来接收客户端SUBSCRIBE的操作。
第四步:配置通过@SendTo注解的方法来直接将消息推送的指定地址上。
```kotlin
package com.itdragon.server.api.rest
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.messaging.handler.annotation.MessageMapping
import org.springframework.messaging.handler.annotation.SendTo
import org.springframework.messaging.simp.SimpMessagingTemplate
import org.springframework.messaging.simp.annotation.SubscribeMapping
import org.springframework.stereotype.Controller
import org.springframework.web.bind.annotation.RequestMapping
import java.time.Instant
@Controller
class WebSocketController {
@Autowired
lateinit var simpMessagingTemplate: SimpMessagingTemplate
/**
* 订阅广播,服务器主动推给连接的客户端
* 通过Http请求的方式触发订阅操作
*/
@RequestMapping("/subscribeTopic")
fun subscribeTopicByHttp() {
while (true) {
// 可以灵活设置成通道地址,实现发布订阅的功能
val channel = "/topic/subscribeTopic"
simpMessagingTemplate.convertAndSend(channel, Instant.now())
Thread.sleep(10*1000)
}
}
/**
* 订阅广播,服务器主动推给连接的客户端
* 通过Websocket的subscribe操作触发订阅操作
*/
@SubscribeMapping("/subscribeTopic")
fun subscribeTopicByWebSocket(): Long {
return Instant.now().toEpochMilli()
}
/**
* 服务端接收客户端发送的消息,类似OnMessage方法
*/
@MessageMapping("/sendToServer")
fun handleMessage(message: String) {
println("message:{$message}")
}
/**
* 将客户端发送的消息广播出去
*/
@MessageMapping("/sendToTopic")
@SendTo("/topic/subscribeTopic")
fun sendToTopic(message: String): String {
return message
}
}
```
WebSocket的订阅功能,可以用@SubscribeMapping注解,也可以用HTTP的方式触发。[ITDragon龙](https://www.cnblogs.com/itdragon/) 比较倾向HTTP的方式,因为在实现身份验证的功能上会比较方便。在客户端发送订阅操作之前,先发送HTTP请求做身份验证,验证成功后再返回指定的订阅通道地址。
## 4. 前端对接测试
在做消息通道对接的测试中,最常见的对话就是:连上了吗?没连上;收到了吗?没收到;收到了吗?收到了,后端报错....... 作为技术人员,我们有必要对各个领域的知识都有一定的了解。只有清楚明白了前端和移动端的开发思维,我们才能提供更合适的接口。
### 4.1 前端代码
```html
WebSocket 发布订阅
```
### 4.2 测试效果
简单测试了发布和订阅功能
![GIF](https://img2020.cnblogs.com/blog/806956/202003/806956-20200308162225867-2072531815.gif)
## 5. 原生WebSocket配置
有的特殊场景需要检测WebSocket的生命周期,还是会用到原生的WebSocket配置,这里记录一下对应的坑。
### 5.1 配置类注册Bean
在任意一个配置类中添加ServerEndpointExporter的Bean配置
```kotlin
@Bean
fun serverEndpointExporter(): ServerEndpointExporter {
return ServerEndpointExporter()
}
```
问题:
* 1)添加后单元测试启动失败,服务可以正常启动。网上说可以移除代码,由SpringBoot管理。可是移除后websocket链接会出现问题。解决方法目前未找到。
### 5.2 创建WebSocketServer
第一步:通过@ServerEndpoint注解修饰类,表示该类是WebSocket的Server,并对外暴露连接地址。
第二步:通过@OnOpen、@OnClose、@OnMessage、@OnError注解修饰方法,监控WebSocket的生命周期。
第三步:通过静态、私有、ConcurrentHashMap 修饰的变量管理客户端。
第四步:为程序其他类提供发送消息的方法。
```kotlin
package com.itdragon.server.config
import org.slf4j.LoggerFactory
import org.springframework.stereotype.Component
import java.io.IOException
import java.util.concurrent.ConcurrentHashMap
import javax.websocket.*
import javax.websocket.server.PathParam
import javax.websocket.server.ServerEndpoint
@ServerEndpoint("/nativeSocket/{clientKey}")
@Service
class WebSocketServer {
private var logger = LoggerFactory.getLogger(WebSocketServer::class.java)
private var session: Session? = null
private var clientKey = ""
@OnOpen
fun onOpen(session: Session, @PathParam("clientKey") clientKey: String) {
this.session = session
this.clientKey = clientKey
if (webSocketMap.containsKey(clientKey)) {
webSocketMap.remove(clientKey)
webSocketMap[clientKey] = this
} else {
webSocketMap[clientKey] = this
}
logger.info("客户端:$clientKey 连接成功")
}
@OnClose
fun onClose() {
if (webSocketMap.containsKey(clientKey)) {
webSocketMap.remove(clientKey)
}
logger.warn("客户端:$clientKey 连接关闭")
}
@OnMessage
fun onMessage(message: String, session: Session) {
logger.info("客户端:$clientKey 收到消息:$message")
}
@OnError
fun onError(session: Session, error: Throwable) {
logger.error("WebSocket客户端(${this.clientKey})错误: ${error.message}")
}
@Throws(IOException::class)
fun sendMessage(message: String) {
this.session!!.basicRemote.sendText(message)
}
companion object {
private val webSocketMap = ConcurrentHashMap()
@Throws(IOException::class)
fun sendMessage(clientKey: String, message: String) {
webSocketMap[clientKey]?.sendMessage(message)
}
fun getStatus(clientKey: String): Boolean? {
return webSocketMap[clientKey]?.session?.isOpen
}
}
}
```
问题:
* 1)WebSocketServer类的 Bean注入会报错,[解决方法点击连接跳转](https://blog.csdn.net/moshowgame/articlehttps://img.qb5200.com/download-x/details/83415545)
### 5.3 前端测试
```html
WebSocket 简单通讯
```
通过stomp客户端发起的认证操作可以看一下这篇文章:
/topic/subscribeTopic 订阅广播通道;/sendToServer 向服务端推送消息;/sendToTopic 将消息广播出去
加载全部内容