Springboot整合mqtt
李泰山 人气:0首先在pom文件里引入mqtt的依赖配置
<!--mqtt--> <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.4</version> </dependency>
其次在springboot 的配置yml文件,配置mqtt的服务配置
spring: mqtt: url: tcp://127.0.0.1:1883 client-id: niubility-tiger username: password: topic: [/unify/test]
创建 MqttProperties配置参数类
import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; @Data @ConfigurationProperties("spring.mqtt") public class MqttProperties { private String url; private String clientId; private String username; private String password; private String[] topic; }
创建 MqttConfiguration 配置类
import org.eclipse.paho.client.mqttv3.IMqttClient; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springblade.core.tool.utils.Func; import org.springblade.ubw.listener.MqttSubscribeListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration @EnableConfigurationProperties({MqttProperties.class}) public class MqttConfiguration { private static final Logger log = LoggerFactory.getLogger(MqttConfiguration.class); @Autowired private MqttProperties mqttProperties; public MqttConfiguration() { } @Bean public MqttConnectOptions mqttConnectOptions() { MqttConnectOptions connectOptions = new MqttConnectOptions(); connectOptions.setServerURIs(new String[]{this.mqttProperties.getUrl()}); if (Func.isNotBlank(this.mqttProperties.getUrl())) { connectOptions.setUserName(this.mqttProperties.getUsername()); } if (Func.isNotBlank(this.mqttProperties.getPassword())) { connectOptions.setPassword(this.mqttProperties.getPassword().toCharArray()); } connectOptions.setKeepAliveInterval(60); return connectOptions; } @Bean public IMqttClient mqttClient(MqttConnectOptions options) throws MqttException { IMqttClient mqttClient = new MqttClient(this.mqttProperties.getUrl(), this.mqttProperties.getClientId()); mqttClient.connect(options); for(int i = 0; i< this.mqttProperties.getTopic().length; ++i) { mqttClient.subscribe(this.mqttProperties.getTopic()[i], new MqttSubscribeListener()); } return mqttClient; } }
创建 订阅事件类
import org.springframework.context.ApplicationEvent; public class UWBMqttSubscribeEvent extends ApplicationEvent { private String topic; public UWBMqttSubscribeEvent(String topic, Object source) { super(source); this.topic = topic; } public String getTopic() { return this.topic; } }
创建订阅事件监听器
import org.eclipse.paho.client.mqttv3.IMqttMessageListener; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.springblade.core.tool.utils.SpringUtil; import org.springblade.ubw.event.UWBMqttSubscribeEvent; public class MqttSubscribeListener implements IMqttMessageListener { @Override public void messageArrived(String s, MqttMessage mqttMessage) { String content = new String(mqttMessage.getPayload()); UWBMqttSubscribeEvent event = new UWBMqttSubscribeEvent(s, content); SpringUtil.publishEvent(event); } }
创建mqtt消息事件异步处理监听器
import com.baomidou.mybatisplus.core.toolkit.StringPool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springblade.core.tool.utils.Func; import org.springblade.ubw.config.MqttProperties; import org.springblade.ubw.event.UWBMqttSubscribeEvent; import org.springblade.ubw.service.MqttService; import org.springframework.context.annotation.Configuration; import org.springframework.context.event.EventListener; import org.springframework.scheduling.annotation.Async; import javax.annotation.Resource; import java.util.Arrays; import java.util.List; @Configuration public class MqttEventListener { private static final Logger log = LoggerFactory.getLogger(MqttEventListener.class); @Resource private MqttProperties mqttProperties; @Resource private MqttService mqttService; private String processTopic (String topic) { List<String> topics = Arrays.asList(mqttProperties.getTopic()); for (String wild : topics) { wild = wild.replace(StringPool.HASH, StringPool.EMPTY); if (topic.startsWith(wild)) { return topic.replace(wild, StringPool.EMPTY); } } return StringPool.EMPTY; } @Async @EventListener(UWBMqttSubscribeEvent.class) public void listen (UWBMqttSubscribeEvent event) { String topic = processTopic(event.getTopic()); Object source = event.getSource(); if (Func.isEmpty(source)) { return; } mqttService.issue(topic,source); // log.info("mqtt接收到 通道 {} 的信息为:{}",topic,source); } }
创建MqttService 数据处理服务类
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springblade.core.tool.utils.Func; import org.springblade.ubw.area.entity.WorkArea; import org.springblade.ubw.area.entity.WorkSite; import org.springblade.ubw.area.entity.WorkSiteNeighbourInfo; import org.springblade.ubw.area.entity.WorkSitePassInfo; import org.springblade.ubw.area.service.WorkAreaService; import org.springblade.ubw.area.service.WorkSiteNeighbourInfoService; import org.springblade.ubw.area.service.WorkSitePassInfoService; import org.springblade.ubw.area.service.WorkSiteService; import org.springblade.ubw.constant.UbwConstant; import org.springblade.ubw.history.entity.HistoryLocusInfo; import org.springblade.ubw.history.entity.HistoryOverTimeSosAlarmInfo; import org.springblade.ubw.history.service.HistoryLocusInfoService; import org.springblade.ubw.history.service.HistoryOverTimeSosAlarmInfoService; import org.springblade.ubw.loc.entity.LocStatusInfo; import org.springblade.ubw.loc.entity.LocStatusInfoHistory; import org.springblade.ubw.loc.service.LocStatusInfoHistoryService; import org.springblade.ubw.loc.service.LocStatusInfoService; import org.springblade.ubw.msg.entity.*; import org.springblade.ubw.msg.service.*; import org.springblade.ubw.system.entity.*; import org.springblade.ubw.system.service.*; import org.springblade.ubw.system.wrapper.MqttWrapper; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.util.List; import java.util.stream.Collectors; @Service public class MqttService { private static final Logger log = LoggerFactory.getLogger(MqttService.class); @Resource private EmployeeAndDepartmentService employeeAndDepartmentService; @Resource private VehicleInfoService vehicleInfoService; @Resource private WorkSiteService workSiteService; @Resource private LocStatusInfoService locStatusInfoService; @Resource private LocStatusInfoHistoryService locStatusInfoHistoryService; @Resource private LocOverTimeSosAlarminfoService locOverTimeSosAlarminfoService; @Resource private LocAreaOverSosAlarminfoService locAreaOverSosAlarmInfoService; @Resource private LocSosAlarminfoService locSosAlarminfoService; @Resource private AttendanceInfoService attendanceInfoService; @Resource private HistoryLocusInfoService historyLocusInfoService; @Resource private WorkSitePassInfoService workSitePassInfoService; @Resource private EnvironmentalMonitorInfoService environmentalMonitorInfoService; @Resource private TrAlertService trAlertService; @Resource private AddEvacuateInfoService addEvacuateInfoService; @Resource private CancelEvacuateInfoService cancelEvacuateInfoService; @Resource private WorkSiteNeighbourInfoService workSiteNeighbourInfoService; @Resource private LinkMsgAlarmInfoService linkMsgAlarmInfoService; @Resource private LeaderEmployeeInfoService leaderEmployeeInfoService; @Resource private ElectricMsgInfoService electricMsgInfoService; @Resource private WorkAreaService workAreaService; @Resource private HistoryOverTimeSosAlarmInfoService historyOverTimeSosAlarmInfoService; @Resource private SpecialWorksService specialWorksService; @Resource private AttendanceLocusInfoService attendanceLocusInfoService; @Resource private WorkTypeService workTypeService; @Resource private OfficePositionService officePositionService; @Resource private ClassTeamService classTeamService; /** * 方法描述: 消息分发 * * @param topic * @param source * @author liwenbin * @date 2021年12月14日 14:14:09 */ public void issue(String topic,Object source){ switch(topic){ case UbwConstant.TOPIC_EMP : //人员和部门信息 employeeAndDepartmentService.saveBatch(source); break; case UbwConstant.TOPIC_VEHICLE : //车辆信息 List<VehicleInfo> vehicleInfos = MqttWrapper.build().toEntityList(source,new VehicleInfo()); vehicleInfoService.deleteAll(); vehicleInfoService.saveBatch(vehicleInfos); break; case UbwConstant.TOPIC_WORK_SITE : //基站信息 List<WorkSite> workSites = MqttWrapper.build().toEntityList(source,new WorkSite()); workSiteService.deleteAll(); workSiteService.saveBatch(workSites); break; case UbwConstant.TOPIC_LOC_STATUS: //井下车辆人员实时 List<LocStatusInfo> locStatusInfos = MqttWrapper.build().toEntityList(source,new LocStatusInfo()); if (Func.isEmpty(locStatusInfos)){ break; } locStatusInfoService.deleteAll(); //筛选入井人员列表 List<LocStatusInfo> inWellList = locStatusInfos.stream().filter(s -> s.getIsInWell() == 1).collect(Collectors.toList()); locStatusInfoService.saveBatch(inWellList); //人员历史数据入库 List<LocStatusInfoHistory> locStatusInfoHistorys = MqttWrapper.build().toEntityList(source,new LocStatusInfoHistory()); locStatusInfoHistoryService.saveBatch(locStatusInfoHistorys); break; case UbwConstant.TOPIC_LOC_OVER_TIME: //超时报警信息 List<LocOverTimeSosAlarminfo> locOverTimeSosAlarmInfos = MqttWrapper.build().toEntityList(source,new LocOverTimeSosAlarminfo()); locOverTimeSosAlarminfoService.saveBatch(locOverTimeSosAlarmInfos); break; case UbwConstant.TOPIC_LOC_OVER_AREA: //超员报警信息 List<LocAreaOverSosAlarminfo> locAreaOverSosAlarmInfos = MqttWrapper.build().toEntityList(source,new LocAreaOverSosAlarminfo()); locAreaOverSosAlarmInfoService.saveBatch(locAreaOverSosAlarmInfos); break; case UbwConstant.TOPIC_LOC_SOS: //求救报警信息 List<LocSosAlarminfo> locSosAlarmInfos = MqttWrapper.build().toEntityList(source,new LocSosAlarminfo()); locSosAlarminfoService.saveBatch(locSosAlarmInfos); break; case UbwConstant.TOPIC_ATTEND: //考勤信息 List<AttendanceInfo> attendanceInfos = MqttWrapper.build().toEntityList(source,new AttendanceInfo()); attendanceInfoService.saveBatch(attendanceInfos); break; case UbwConstant.TOPIC_HISTORY_LOCUS: //精确轨迹信息 List<HistoryLocusInfo> historyLocusInfos = MqttWrapper.build().toEntityList(source,new HistoryLocusInfo()); historyLocusInfoService.saveBatch(historyLocusInfos); break; case UbwConstant.TOPIC_WORK_SITE_PASS: //基站经过信息 List<WorkSitePassInfo> workSitePassInfos = MqttWrapper.build().toEntityList(source,new WorkSitePassInfo()); workSitePassInfoService.saveBatch(workSitePassInfos); break; case UbwConstant.TOPIC_ENV_MON: //环境监测信息 List<EnvironmentalMonitorInfo> environmentalMonitorInfos = MqttWrapper.build().toEntityList(source,new EnvironmentalMonitorInfo()); environmentalMonitorInfoService.saveBatch(environmentalMonitorInfos); break; case UbwConstant.TOPIC_TR_ALERT: //环境监测报警信息 List<TrAlert> trAlerts = MqttWrapper.build().toEntityList(source,new TrAlert()); trAlertService.saveBatch(trAlerts); break; case UbwConstant.TOPIC_ADD_EVA: //下发撤离信息 List<AddEvacuateInfo> addEvacuateInfos = MqttWrapper.build().toEntityList(source,new AddEvacuateInfo()); addEvacuateInfoService.saveBatch(addEvacuateInfos); break; case UbwConstant.TOPIC_CANCEL_EVA: //取消撤离信息 List<CancelEvacuateInfo> cancelEvacuateInfos = MqttWrapper.build().toEntityList(source,new CancelEvacuateInfo()); cancelEvacuateInfoService.saveBatch(cancelEvacuateInfos); break; case UbwConstant.TOPIC_WORK_SITE_NEI: //相邻基站关系信息 workSiteNeighbourInfoService.deleteAll(); List<WorkSiteNeighbourInfo> workSiteNeighbourInfos = MqttWrapper.build().toEntityList(source,new WorkSiteNeighbourInfo()); workSiteNeighbourInfoService.saveBatch(workSiteNeighbourInfos); break; case UbwConstant.TOPIC_LINK_MSG: //基站链路信息 linkMsgAlarmInfoService.deleteAll(); List<LinkMsgAlarmInfo> linkMsgAlarmInfos = MqttWrapper.build().toEntityList(source,new LinkMsgAlarmInfo()); linkMsgAlarmInfoService.saveBatch(linkMsgAlarmInfos); break; case UbwConstant.TOPIC_LEADER_EMP: //带班领导信息 leaderEmployeeInfoService.deleteAll(); List<LeaderEmployeeInfo> leaderEmployeeInfos = MqttWrapper.build().toEntityList(source,new LeaderEmployeeInfo()); leaderEmployeeInfoService.saveBatch(leaderEmployeeInfos); break; case UbwConstant.TOPIC_ELE_MSG: //低电报警信息 List<ElectricMsgInfo> electricMsgInfos = MqttWrapper.build().toEntityList(source,new ElectricMsgInfo()); electricMsgInfoService.saveBatch(electricMsgInfos); break; case UbwConstant.TOPIC_WORK_AREA: //区域信息 workAreaService.deleteAll(); List<WorkArea> workAreas = MqttWrapper.build().toEntityList(source,new WorkArea()); workAreaService.saveBatch(workAreas); break; case UbwConstant.TOPIC_HIS_OVER_TIME_SOS: //历史超时报警信息 List<HistoryOverTimeSosAlarmInfo> historyOverTimeSosAlarmInfos = MqttWrapper.build().toEntityList(source,new HistoryOverTimeSosAlarmInfo()); historyOverTimeSosAlarmInfoService.saveBatch(historyOverTimeSosAlarmInfos); break; case UbwConstant.TOPIC_SPECIAL_WORK: //特种人员预设线路信息 specialWorksService.deleteAll(); List<SpecialWorks> specialWorks = MqttWrapper.build().toEntityList(source,new SpecialWorks()); specialWorksService.saveBatch(specialWorks); break; case UbwConstant.TOPIC_ATTEND_LOC: //历史考勤轨迹信息 List<AttendanceLocusInfo> attendanceLocusInfos = MqttWrapper.build().toEntityList(source,new AttendanceLocusInfo()); attendanceLocusInfoService.saveBatch(attendanceLocusInfos); break; case UbwConstant.TOPIC_WORK_TYPE: //工种信息 workTypeService.deleteAll(); List<WorkType> workTypes = MqttWrapper.build().toEntityList(source,new WorkType()); workTypeService.saveBatch(workTypes); break; case UbwConstant.TOPIC_OFFICE_POS: //职务信息 officePositionService.deleteAll(); List<OfficePosition> officePositions = MqttWrapper.build().toEntityList(source,new OfficePosition()); officePositionService.saveBatch(officePositions); break; case UbwConstant.TOPIC_CLASS_TEAM: //班组信息 classTeamService.deleteAll(); List<ClassTeam> classTeams = MqttWrapper.build().toEntityList(source,new ClassTeam()); classTeamService.saveBatch(classTeams); break; default : //可选 break; } } }
完结,小伙伴们,可以根据这个demo 改造自己的mqtt服务处理!!!
加载全部内容