springboot websocket SpringBoot webSocket实现发送广播、点对点消息和Android接收
宁惊蛰 人气:01、SpringBoot webSocket
SpringBoot 使用的websocket 协议,不是标准的websocket协议,使用的是名称叫做STOMP的协议。
1.1 STOMP协议说明
STOMP,Streaming Text Orientated Message Protocol,是流文本定向消息协议,是一种为MOM(Message Oriented Middleware,面向消息的中间件)设计的简单文本协议。
它提供了一个可互操作的连接格式,允许STOMP客户端与任意STOMP消息代理(Broker)进行交互,类似于OpenWire(一种二进制协议)。
由于其设计简单,很容易开发客户端,因此在多种语言和多种平台上得到广泛应用。其中最流行的STOMP消息代理是Apache ActiveMQ。
1.2 搭建
本人使用的是Inject idea 搭建的springBoot websocket,并未采用熟悉的gradle,而是采用了maven方式搭建。
项目结构如下
pom.xml:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.drawthink</groupId> <artifactId>websocketdemo</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>webSocketdemo</name> <description>webSocketDemo project for Spring Boot</description> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.3.6.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-thymeleaf</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
Application:
package com.drawthink; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class WebSocketdemoApplication { public static void main(String[] args) { SpringApplication.run(WebSocketdemoApplication.class, args); } }
WebSocketConfig
package com.drawthink.websocket; import org.springframework.context.annotation.Configuration; import org.springframework.messaging.simp.config.MessageBrokerRegistry; import org.springframework.web.socket.config.annotation.AbstractWebSocketMessageBrokerConfigurer; import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker; import org.springframework.web.socket.config.annotation.StompEndpointRegistry; /** * Created by lincoln on 16-10-25 */ @Configuration @EnableWebSocketMessageBroker public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer { @Override public void registerStompEndpoints(StompEndpointRegistry stompEndpointRegistry) { //允许使用socketJs方式访问,访问点为hello,允许跨域 stompEndpointRegistry.addEndpoint("/hello").setAllowedOrigins("*").withSockJS(); } @Override public void configureMessageBroker(MessageBrokerRegistry registry) { //订阅Broker名称 registry.enableSimpleBroker("/topic","/user"); //全局使用的订阅前缀(客户端订阅路径上会体现出来) registry.setApplicationDestinationPrefixes("/app/"); //点对点使用的订阅前缀(客户端订阅路径上会体现出来),不设置的话,默认也是/user/ //registry.setUserDestinationPrefix("/user/"); } }
WebSocketController
package com.drawthink.websocket.controller; import com.drawthink.message.ClientMessage; import com.drawthink.message.ServerMessage; import com.drawthink.message.ToUserMessage; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.handler.annotation.MessageMapping; import org.springframework.messaging.handler.annotation.SendTo; import org.springframework.messaging.simp.SimpMessagingTemplate; import org.springframework.stereotype.Controller; /** * Created by lincoln on 16-10-25 */ @Controller public class WebSocketController { @MessageMapping("/welcome") //SendTo 发送至 Broker 下的指定订阅路径 @SendTo("/topic/getResponse") public ServerMessage say(ClientMessage clientMessage){ //方法用于广播测试 System.out.println("clientMessage.getName() = " + clientMessage.getName()); return new ServerMessage("Welcome , "+clientMessage.getName()+" !"); } //注入SimpMessagingTemplate 用于点对点消息发送 @Autowired private SimpMessagingTemplate messagingTemplate; @MessageMapping("/cheat") // 发送的订阅路径为/user/{userId}/message // /user/路径是默认的一个,如果想要改变,必须在config 中setUserDestinationPrefix public void cheatTo(ToUserMessage toUserMessage){ //方法用于点对点测试 System.out.println("toUserMessage.getMessage() = " + toUserMessage.getMessage()); System.out.println("toUserMessage.getUserId() = " + toUserMessage.getUserId()); messagingTemplate.convertAndSendToUser(toUserMessage.getUserId(),"/message",toUserMessage.getMessage()); } }
Vo
package com.drawthink.message; /** * Created by lincoln on 16-10-25 */ public class ClientMessage { private String name; public String getName() { return name; } public void setName(String name) { this.name = name; } }
package com.drawthink.message; /** * Created by lincoln on 16-10-25 */ public class ServerMessage { private String responseMessage; public ServerMessage(String responseMessage) { this.responseMessage = responseMessage; } public String getResponseMessage() { return responseMessage; } public void setResponseMessage(String responseMessage) { this.responseMessage = responseMessage; } }
package com.drawthink.message; /** * Created by lincoln on 16-10-25 */ public class ToUserMessage { private String userId; private String message; public String getUserId() { return userId; } public void setUserId(String userId) { this.userId = userId; } public String getMessage() { return message; } public void setMessage(String message) { this.message = message; } }
Android 客户端
STOMP协议在Android系统中没有默认实现,必须自行去实现。不过好消息是,开源大神们已经完成了Android上使用STOMP协议的实现,所以我们只需要使用就好了。
地址:StompProtocolAndroid_jb51.rar
搭建
build.gradle(app)
apply plugin: 'com.android.application' android { compileSdkVersion 24 buildToolsVersion "24.0.3" defaultConfig { applicationId "com.drawthink.websocket" minSdkVersion 16 targetSdkVersion 24 versionCode 1 versionName "1.0" testInstrumentationRunner "android.support.test.runner.AndroidJUnitRunner" } buildTypes { release { minifyEnabled false proguardFiles getDefaultProguardFile('proguard-android.txt'), 'proguard-rules.pro' } } } dependencies { compile fileTree(include: ['*.jar'], dir: 'libs') androidTestCompile('com.android.support.test.espresso:espresso-core:2.2.2', { exclude group: 'com.android.support', module: 'support-annotations' }) compile 'com.android.support:appcompat-v7:24.2.1' testCompile 'junit:junit:4.12' //依赖STOMP协议的Android实现 compile 'com.github.NaikSoftware:StompProtocolAndroid:1.1.1' //StompProtocolAndroid 依赖于webSocket的标准实现 compile 'org.java-websocket:Java-WebSocket:1.3.0' }
接收广播实例:
package com.drawthink.websocket; import android.content.Intent; import android.os.Bundle; import android.support.v7.app.AppCompatActivity; import android.util.Log; import android.view.View; import android.widget.Button; import android.widget.EditText; import android.widget.TextView; import android.widget.Toast; import org.java_websocket.WebSocket; import rx.Subscriber; import rx.functions.Action1; import ua.naiksoftware.stomp.LifecycleEvent; import ua.naiksoftware.stomp.Stomp; import ua.naiksoftware.stomp.client.StompClient; import ua.naiksoftware.stomp.client.StompMessage; import static android.content.ContentValues.TAG; public class MainActivity extends AppCompatActivity { private TextView serverMessage; private Button start; private Button stop; private Button send; private EditText editText; private StompClient mStompClient; private Button cheat; @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_main); bindView(); start.setOnClickListener(new View.OnClickListener() { @Override public void onClick(View v) { //创建client 实例 createStompClient(); //订阅消息 registerStompTopic(); } }); send.setOnClickListener(new View.OnClickListener() { @Override public void onClick(View v) { mStompClient.send("/app/welcome","{\"name\":\""+editText.getText()+"\"}") .subscribe(new Subscriber<Void>() { @Override public void onCompleted() { toast("发送成功"); } @Override public void onError(Throwable e) { e.printStackTrace(); toast("发送错误"); } @Override public void onNext(Void aVoid) { } }); } }); stop.setOnClickListener(new View.OnClickListener() { @Override public void onClick(View v) { mStompClient.disconnect(); } }); cheat.setOnClickListener(new View.OnClickListener() { @Override public void onClick(View v) { startActivity(new Intent(MainActivity.this,CheatActivity.class)); if(mStompClient != null) { mStompClient.disconnect(); } finish(); } }); } private void showMessage(final StompMessage stompMessage) { runOnUiThread(new Runnable() { @Override public void run() { serverMessage.setText("stomp command is --->"+stompMessage.getStompCommand() +" body is --->"+stompMessage.getPayload()); } }); } //创建client 实例 private void createStompClient() { mStompClient = Stomp.over(WebSocket.class, "ws://192.168.0.46:8080/hello/websocket"); mStompClient.connect(); Toast.makeText(MainActivity.this,"开始连接 192.168.0.46:8080",Toast.LENGTH_SHORT).show(); mStompClient.lifecycle().subscribe(new Action1<LifecycleEvent>() { @Override public void call(LifecycleEvent lifecycleEvent) { switch (lifecycleEvent.getType()) { case OPENED: Log.d(TAG, "Stomp connection opened"); toast("连接已开启"); break; case ERROR: Log.e(TAG, "Stomp Error", lifecycleEvent.getException()); toast("连接出错"); break; case CLOSED: Log.d(TAG, "Stomp connection closed"); toast("连接关闭"); break; } } }); } //订阅消息 private void registerStompTopic() { mStompClient.topic("/topic/getResponse").subscribe(new Action1<StompMessage>() { @Override public void call(StompMessage stompMessage) { Log.e(TAG, "call: " +stompMessage.getPayload() ); showMessage(stompMessage); } }); } private void toast(final String message) { runOnUiThread(new Runnable() { @Override public void run() { Toast.makeText(MainActivity.this,message,Toast.LENGTH_SHORT).show(); } }); } private void bindView() { serverMessage = (TextView) findViewById(R.id.serverMessage); start = (Button) findViewById(R.id.start); stop = (Button) findViewById(R.id.stop); send = (Button) findViewById(R.id.send); editText = (EditText) findViewById(R.id.clientMessage); cheat = (Button) findViewById(R.id.cheat); } }
点对点
package com.drawthink.websocket; import android.os.Bundle; import android.support.v7.app.AppCompatActivity; import android.util.Log; import android.view.View; import android.widget.Button; import android.widget.EditText; import android.widget.LinearLayout; import android.widget.TextView; import android.widget.Toast; import org.java_websocket.WebSocket; import rx.Subscriber; import rx.functions.Action1; import ua.naiksoftware.stomp.LifecycleEvent; import ua.naiksoftware.stomp.Stomp; import ua.naiksoftware.stomp.client.StompClient; import ua.naiksoftware.stomp.client.StompMessage; import static android.content.ContentValues.TAG; public class CheatActivity extends AppCompatActivity { private EditText cheat; private Button send; private LinearLayout message; private StompClient mStompClient; @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_cheat); bindView(); createStompClient(); registerStompTopic(); send.setOnClickListener(new View.OnClickListener() { @Override public void onClick(View v) { // 向/app/cheat发送Json数据 mStompClient.send("/app/cheat","{\"userId\":\"lincoln\",\"message\":\""+cheat.getText()+"\"}") .subscribe(new Subscriber<Void>() { @Override public void onCompleted() { toast("发送成功"); } @Override public void onError(Throwable e) { e.printStackTrace(); toast("发送错误"); } @Override public void onNext(Void aVoid) { } }); } }); } private void bindView() { cheat = (EditText) findViewById(R.id.cheat); send = (Button) findViewById(R.id.send); message = (LinearLayout) findViewById(R.id.message); } private void createStompClient() { mStompClient = Stomp.over(WebSocket.class, "ws://192.168.0.46:8080/hello/websocket"); mStompClient.connect(); Toast.makeText(CheatActivity.this,"开始连接 192.168.0.46:8080",Toast.LENGTH_SHORT).show(); mStompClient.lifecycle().subscribe(new Action1<LifecycleEvent>() { @Override public void call(LifecycleEvent lifecycleEvent) { switch (lifecycleEvent.getType()) { case OPENED: Log.d(TAG, "Stomp connection opened"); toast("连接已开启"); break; case ERROR: Log.e(TAG, "Stomp Error", lifecycleEvent.getException()); toast("连接出错"); break; case CLOSED: Log.d(TAG, "Stomp connection closed"); toast("连接关闭"); break; } } }); } // 接收/user/xiaoli/message路径发布的消息 private void registerStompTopic() { mStompClient.topic("/user/xiaoli/message").subscribe(new Action1<StompMessage>() { @Override public void call(StompMessage stompMessage) { Log.e(TAG, "call: " +stompMessage.getPayload() ); showMessage(stompMessage); } }); } private void showMessage(final StompMessage stompMessage) { runOnUiThread(new Runnable() { @Override public void run() { TextView text = new TextView(CheatActivity.this); text.setLayoutParams(new LinearLayout.LayoutParams(LinearLayout.LayoutParams.MATCH_PARENT, LinearLayout.LayoutParams.WRAP_CONTENT)); text.setText(System.currentTimeMillis() +" body is --->"+stompMessage.getPayload()); message.addView(text); } }); } private void toast(final String message) { runOnUiThread(new Runnable() { @Override public void run() { Toast.makeText(CheatActivity.this,message,Toast.LENGTH_SHORT).show(); } }); } }
代码比较乱,说明一下。
1、STOMP 使用的时候,关键是发布订阅的关系,使用过消息队列,例如rabbitMQ的应该很容易理解。
服务器端 WebSocketConfig.Java文件控制的就是订阅发布的路径关系。
2、websocket的路径说明,本例中连接的是ws://192.168.0.46:8080/hello/websocket路径,/hello是在WebSocketConfig的stompEndpointRegistry.addEndpoint(“/hello”).setAllowedOrigins(““).withSockJS();*确定的, 如果有多个endpoint,这个地方的路径也会随之变化。
3、发布路径
发布信息的路径是由WebSocketConfig中的 setApplicationDestinationPrefixes(“/app/”); 和 Controller 中@MessageMapping(“/welcome”) 组合确定的。
例如发广播消息,路径为/app/welcome
例如发点对点消息,路径为/app/cheat
4、消息订阅路径
订阅broker源自WebSocketConfig中的registry.enableSimpleBroker(“/topic”,”/user”);此处开放了两个broker,具体的订阅服务路径给基于Controller中的 @SendTo(“/topic/getResponse”)或SimpMessagingTemplate中给定。(注:此处,服务器和客户端须约定订阅路径)
5、关于心跳
订阅发布模型的心跳很简单,客户端向一个指定的心跳路径发送心跳,服务器处理,服务器使用指定的订阅路径向客户端发心跳,即可。因为没有Socket,只需要记录是否联通的状态即可,重连客户端做一下就好了。
本人菜鸟,肯定有些地方没有搞清楚,如果有误,请大神斧正。
代码下载地址:blogRepository_jb51.rar
加载全部内容