使用java 实现mqtt两种常用方式
houxian1103 人气:0前言
在开发MQTT时有两种方式一种是使用Paho Java 原生库来完成,一种是使用spring boot 来完成。
Paho Java 库实现
Eclipse Paho Java Client (opens new window)是用 Java 编写的 MQTT 客户端库(MQTT Java Client),可用于 JVM 或其他 Java 兼容平台(例如Android)。
Eclipse Paho Java Client 提供了MqttAsyncClient 和 MqttClient 异步和同步 API
- 通过 Maven 安装 Paho Java
<dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.2</version> </dependency>
- Paho Java 使用示例
Java 体系中 Paho Java 是比较稳定、广泛应用的 MQTT 客户端库,本示例包含 Java 语言的 Paho Java 连接 EMQX Broker,并进行消息收发完整代码:
package io.emqx; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; public class App { public static void main(String[] args) { String subTopic = "testtopic/#"; String pubTopic = "testtopic/1"; String content = "Hello World"; int qos = 2; String broker = "tcp://broker.emqx.io:1883"; String clientId = "emqx_test"; MemoryPersistence persistence = new MemoryPersistence(); try { MqttClient client = new MqttClient(broker, clientId, persistence); // MQTT 连接选项 MqttConnectOptions connOpts = new MqttConnectOptions(); connOpts.setUserName("emqx_test"); connOpts.setPassword("emqx_test_password".toCharArray()); // 保留会话 connOpts.setCleanSession(true); // 设置回调 client.setCallback(new PushCallback()); // 建立连接 System.out.println("Connecting to broker: " + broker); client.connect(connOpts); System.out.println("Connected"); System.out.println("Publishing message: " + content); // 订阅 client.subscribe(subTopic); // 消息发布所需参数 MqttMessage message = new MqttMessage(content.getBytes()); message.setQos(qos); client.publish(pubTopic, message); System.out.println("Message published"); client.disconnect(); System.out.println("Disconnected"); client.close(); System.exit(0); } catch (MqttException me) { System.out.println("reason " + me.getReasonCode()); System.out.println("msg " + me.getMessage()); System.out.println("loc " + me.getLocalizedMessage()); System.out.println("cause " + me.getCause()); System.out.println("excep " + me); me.printStackTrace(); } } }
回调消息处理类 OnMessageCallback.java
package io.emqx; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; public class OnMessageCallback implements MqttCallback { public void connectionLost(Throwable cause) { // 连接丢失后,一般在这里面进行重连 System.out.println("连接断开,可以做重连"); } public void messageArrived(String topic, MqttMessage message) throws Exception { // subscribe后得到的消息会执行到这里面 System.out.println("接收消息主题:" + topic); System.out.println("接收消息Qos:" + message.getQos()); System.out.println("接收消息内容:" + new String(message.getPayload())); } public void deliveryComplete(IMqttDeliveryToken token) { System.out.println("deliveryComplete---------" + token.isComplete()); } }
好的上述就实现了简单的 MQTT的连接和消息收发。
spring boot集成mqtt
spring boot 环境
spring-boot 版本 2.2.2 spring-integration的版本为:5.4.3 Spring Integration提供了入站适配器和出站适配器以支持MQTT协议。
Maven 依赖:
<!-- https://mvnrepository.com/artifact/org.springframework.integration/spring-integration-mqtt --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> <version>5.4.3</version> </dependency>
配置文件 application.yml:
spring: mqtt: username: password: url: tcp://ip:port clientId: clientId topic: default completionTimeout: 2000
核心代码
配置类
@Data @Configuration @ConfigurationProperties(prefix = "spring.mqtt") public class MqttConfiguration { private String username; private String password; private String url; private String clientId; private String topic = "TOPIC_DEFAULT"; private Integer completionTimeout = 2000; /** * 注册MQTT客户端工厂 * @return */ @Bean public MqttPahoClientFactory mqttClientFactory(){ DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); MqttConnectOptions options = new MqttConnectOptions(); //如果设置为 false,客户端和服务器将在客户端、服务器和连接重新启动时保持状态。随着状态的保持: // 即使客户端、服务器或连接重新启动,消息传递也将可靠地满足指定的 QOS。服务器将订阅视为持久的。 // 如果设置为 true,客户端和服务器将不会在客户端、服务器或连接重新启动时保持状态。 options.setCleanSession(true); //该值以秒为单位,必须>0,定义了客户端等待与 MQTT 服务器建立网络连接的最大时间间隔。 // 默认超时为 30 秒。值 0 禁用超时处理,这意味着客户端将等待直到网络连接成功或失败。 options.setConnectionTimeout(0); //此值以秒为单位,定义发送或接收消息之间的最大时间间隔,必须>0 options.setKeepAliveInterval(90); //自动重新连接 options.setAutomaticReconnect(true); options.setUserName(this.getUsername()); options.setPassword(this.getPassword().toCharArray()); options.setServerURIs(new String[]{this.getUrl()}); factory.setConnectionOptions(options); return factory; } }
@Slf4j @AllArgsConstructor @Configuration @IntegrationComponentScan public class MqttInboundConfiguration { private MqttConfiguration mqttConfig; private MqttPahoClientFactory factory; private MqttMessageReceiver mqttMessageReceiver; /** * 此处可以使用其他消息通道 * Spring Integration默认的消息通道,它允许将消息发送给一个订阅者,然后阻碍发送直到消息被接收。 * * @return */ @Bean public MessageChannel mqttInBoundChannel() { return new DirectChannel(); } /** * 适配器, 两个topic共用一个adapter * 客户端作为消费者,订阅主题,消费消息 * * @param * @param * @return */ @Bean public MessageProducerSupport mqttInbound() { MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttConfig.getClientId()+"-"+System.currentTimeMillis(), factory, mqttConfig.getTopic()); adapter.setCompletionTimeout(60000); adapter.setConverter(new DefaultPahoMessageConverter()); adapter.setRecoveryInterval(10000); adapter.setQos(0); adapter.setOutputChannel(mqttInBoundChannel()); return adapter; } /** * mqtt入站消息处理工具,对于指定消息入站通道接收到生产者生产的消息后处理消息的工具。 * * @return */ @Bean @ServiceActivator(inputChannel = "mqttInBoundChannel") public MessageHandler mqttMessageHandler() { return this.mqttMessageReceiver; } }
数据接收
@Slf4j @AllArgsConstructor @Component public class MqttMessageReceiver implements MessageHandler { @Override public void handleMessage(Message<?> message) throws MessagingException { try { MessageHeaders headers = message.getHeaders(); //获取消息Topic String receivedTopic = (String) headers.get(MqttHeaders.RECEIVED_TOPIC); log.info("[获取到的消息的topic :]{} ", receivedTopic); //获取消息体 String payload = (String) message.getPayload(); log.info("[获取到的消息的payload :]{} ", payload); //todo .... } catch (Exception e) { e.printStackTrace(); } } }
@Slf4j @AllArgsConstructor @Configuration public class MqttOutboundConfiguration { private MqttConfiguration mqttConfig; private MqttPahoClientFactory factory; @Bean public MessageChannel mqttOutboundChannel() { return new DirectChannel(); } @Bean @ServiceActivator(inputChannel = "mqttOutboundChannel") public MessageHandler mqttOutbound() { MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler( mqttConfig.getClientId()+"-"+System.currentTimeMillis() + System.currentTimeMillis(), factory); messageHandler.setDefaultQos(0); //开启异步 messageHandler.setAsync(true); messageHandler.setDefaultTopic(mqttConfig.getTopic()); return messageHandler; } }
发送者
@Component @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel") public interface MqttGateway { /** * 发送mqtt消息 * @param topic 主题 * @param payload 内容 * @return void */ void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload); /** * 发送包含qos的消息 * @param topic 主题 * @param qos 对消息处理的几种机制。 * * 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。<br> * * 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。<br> * * 2 多了一次去重的动作,确保订阅者收到的消息有一次。 * @param payload 消息体 * @return void */ void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload); /** * 发送包含qos的消息 * @param topic 主题 * @param qos 对消息处理的几种机制。 * * 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。<br> * * 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。<br> * * 2 多了一次去重的动作,确保订阅者收到的消息有一次。 * @param payload 消息体 * @return void */ void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, byte[] payload); }
@Component @AllArgsConstructor public class MqttMessageSender { private MqttGateway mqttGateway; /** * 发送mqtt消息 * @param topic 主题 * @param message 内容 * @return void */ public void send(String topic, String message) { mqttGateway.sendToMqtt(topic, message); } /** * 发送包含qos的消息 * @param topic 主题 * @param qos 质量 * @param messageBody 消息体 * @return void */ public void send(String topic, int qos, JSONObject messageBody){ mqttGateway.sendToMqtt(topic, qos, messageBody.toString()); } /** * 发送包含qos的消息 * @param topic 主题 * @param qos 质量 * @param message 消息体 * @return void */ public void send(String topic, int qos, byte[] message){ mqttGateway.sendToMqtt(topic, qos, message); } }
总结
综上所述上面就是我们经常用的到两种方式,希望对你有所帮助
加载全部内容