亲宝软件园·资讯

展开

SpringBoot WebSocket STOMP 广播配置

ITDragon龙 人气:0
[TOC] ## 1. 前言 WebSocket是一种在单个TCP连接上进行全双工通信的协议,常用于实时通信的场景。在没有使用高层级线路协议的情况下,直接使用WebSocket是很难实现发布订阅的功能。而STOMP是在WebSocket之上提供了一个基于帧的线路格式层,STOMP客户端可以同时作为生产者和消费者两种模式。为发布订阅的功能提供了基础。 ## 2. STOMP协议 > [STOMP](http://stomp.github.com/) is a simple text-orientated messaging protocol. It defines an [interoperable wire format](http://stomp.github.com/stomp-specification-1.1.html) so that any of the available STOMP clients can communicate with any STOMP message broker to provide easy and widespread messaging interoperability among languages and platforms (the STOMP web site has a [list of STOMP client and server implementations](http://stomp.github.com/implementations.html). 文档地址: ## 3. SpringBoot WebSocket集成 SpringBoot集成WebSocket非常方便,只需要简单的三个步骤:导包、配置、提供接口 ### 3.1 导入websocket包 ```groovy compile('org.springframework.boot:spring-boot-starter-websocket') ``` ### 3.2 配置WebSocket 第一步:创建WebSocketConfig类,通过@EnableWebSocketMessageBroker 启用代理支持的消息传递。 第二步:重写registerStompEndpoints和configureMessageBroker方法。 第三步:注册对外可访问的stomp端点、访问方式和连接跨域设置。 第四步:配置消息代理。可设置广播模式和点对点通讯。也可以添加订阅通道的前缀。 ```kotlin package com.itdragon.server.config import org.springframework.context.annotation.Configuration import org.springframework.messaging.simp.config.MessageBrokerRegistry import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker import org.springframework.web.socket.config.annotation.StompEndpointRegistry import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer @Configuration @EnableWebSocketMessageBroker class WebSocketConfig : WebSocketMessageBrokerConfigurer { override fun configureMessageBroker(config: MessageBrokerRegistry) { // 设置订阅Broker名称,/topic为广播模式 config.enableSimpleBroker("/topic") // 设置应用程序全局目标前缀 config.setApplicationDestinationPrefixes("/itdragon") } override fun registerStompEndpoints(registry: StompEndpointRegistry) { // 允许使用socketJs方式访问,访问端点为socket,并允许跨域 registry.addEndpoint("/socket").setAllowedOrigins("*").withSockJS() } } ``` 注意: 若使用了setApplicationDestinationPrefixes方法,则作用主要体现在@SubscribeMapping和@MessageMapping上。如控制层配置@MessageMapping("/sendToServer"),则客户端发送的地址是 /itdragon/sendToServer ### 3.3 对外暴露接口 第一步:创建WebSocket的控制层类,并注入用于发送消息的SimpMessagingTemplate。 第二步:配置通过@MessageMapping注解修饰的方法来接收客户端SEND的操作。 第三步:配置通过@SubscribeMapping注解修饰的方法来接收客户端SUBSCRIBE的操作。 第四步:配置通过@SendTo注解的方法来直接将消息推送的指定地址上。 ```kotlin package com.itdragon.server.api.rest import org.springframework.beans.factory.annotation.Autowired import org.springframework.messaging.handler.annotation.MessageMapping import org.springframework.messaging.handler.annotation.SendTo import org.springframework.messaging.simp.SimpMessagingTemplate import org.springframework.messaging.simp.annotation.SubscribeMapping import org.springframework.stereotype.Controller import org.springframework.web.bind.annotation.RequestMapping import java.time.Instant @Controller class WebSocketController { @Autowired lateinit var simpMessagingTemplate: SimpMessagingTemplate /** * 订阅广播,服务器主动推给连接的客户端 * 通过Http请求的方式触发订阅操作 */ @RequestMapping("/subscribeTopic") fun subscribeTopicByHttp() { while (true) { // 可以灵活设置成通道地址,实现发布订阅的功能 val channel = "/topic/subscribeTopic" simpMessagingTemplate.convertAndSend(channel, Instant.now()) Thread.sleep(10*1000) } } /** * 订阅广播,服务器主动推给连接的客户端 * 通过Websocket的subscribe操作触发订阅操作 */ @SubscribeMapping("/subscribeTopic") fun subscribeTopicByWebSocket(): Long { return Instant.now().toEpochMilli() } /** * 服务端接收客户端发送的消息,类似OnMessage方法 */ @MessageMapping("/sendToServer") fun handleMessage(message: String) { println("message:{$message}") } /** * 将客户端发送的消息广播出去 */ @MessageMapping("/sendToTopic") @SendTo("/topic/subscribeTopic") fun sendToTopic(message: String): String { return message } } ``` WebSocket的订阅功能,可以用@SubscribeMapping注解,也可以用HTTP的方式触发。[ITDragon龙](https://www.cnblogs.com/itdragon/) 比较倾向HTTP的方式,因为在实现身份验证的功能上会比较方便。在客户端发送订阅操作之前,先发送HTTP请求做身份验证,验证成功后再返回指定的订阅通道地址。 ## 4. 前端对接测试 在做消息通道对接的测试中,最常见的对话就是:连上了吗?没连上;收到了吗?没收到;收到了吗?收到了,后端报错....... 作为技术人员,我们有必要对各个领域的知识都有一定的了解。只有清楚明白了前端和移动端的开发思维,我们才能提供更合适的接口。 ### 4.1 前端代码 ```html WebSocket 发布订阅

/topic/subscribeTopic 订阅广播通道;/sendToServer 向服务端推送消息;/sendToTopic 将消息广播出去






``` ### 4.2 测试效果 简单测试了发布和订阅功能 ![GIF](https://img2020.cnblogs.com/blog/806956/202003/806956-20200308162225867-2072531815.gif) ## 5. 原生WebSocket配置 有的特殊场景需要检测WebSocket的生命周期,还是会用到原生的WebSocket配置,这里记录一下对应的坑。 ### 5.1 配置类注册Bean 在任意一个配置类中添加ServerEndpointExporter的Bean配置 ```kotlin @Bean fun serverEndpointExporter(): ServerEndpointExporter { return ServerEndpointExporter() } ``` 问题: * 1)添加后单元测试启动失败,服务可以正常启动。网上说可以移除代码,由SpringBoot管理。可是移除后websocket链接会出现问题。解决方法目前未找到。 ### 5.2 创建WebSocketServer 第一步:通过@ServerEndpoint注解修饰类,表示该类是WebSocket的Server,并对外暴露连接地址。 第二步:通过@OnOpen、@OnClose、@OnMessage、@OnError注解修饰方法,监控WebSocket的生命周期。 第三步:通过静态、私有、ConcurrentHashMap 修饰的变量管理客户端。 第四步:为程序其他类提供发送消息的方法。 ```kotlin package com.itdragon.server.config import org.slf4j.LoggerFactory import org.springframework.stereotype.Component import java.io.IOException import java.util.concurrent.ConcurrentHashMap import javax.websocket.* import javax.websocket.server.PathParam import javax.websocket.server.ServerEndpoint @ServerEndpoint("/nativeSocket/{clientKey}") @Service class WebSocketServer { private var logger = LoggerFactory.getLogger(WebSocketServer::class.java) private var session: Session? = null private var clientKey = "" @OnOpen fun onOpen(session: Session, @PathParam("clientKey") clientKey: String) { this.session = session this.clientKey = clientKey if (webSocketMap.containsKey(clientKey)) { webSocketMap.remove(clientKey) webSocketMap[clientKey] = this } else { webSocketMap[clientKey] = this } logger.info("客户端:$clientKey 连接成功") } @OnClose fun onClose() { if (webSocketMap.containsKey(clientKey)) { webSocketMap.remove(clientKey) } logger.warn("客户端:$clientKey 连接关闭") } @OnMessage fun onMessage(message: String, session: Session) { logger.info("客户端:$clientKey 收到消息:$message") } @OnError fun onError(session: Session, error: Throwable) { logger.error("WebSocket客户端(${this.clientKey})错误: ${error.message}") } @Throws(IOException::class) fun sendMessage(message: String) { this.session!!.basicRemote.sendText(message) } companion object { private val webSocketMap = ConcurrentHashMap() @Throws(IOException::class) fun sendMessage(clientKey: String, message: String) { webSocketMap[clientKey]?.sendMessage(message) } fun getStatus(clientKey: String): Boolean? { return webSocketMap[clientKey]?.session?.isOpen } } } ``` 问题: * 1)WebSocketServer类的 Bean注入会报错,[解决方法点击连接跳转](https://blog.csdn.net/moshowgame/articlehttps://img.qb5200.com/download-x/details/83415545) ### 5.3 前端测试 ```html WebSocket 简单通讯


``` 通过stomp客户端发起的认证操作可以看一下这篇文章:

加载全部内容

相关教程
猜你喜欢
用户评论