Browse Source

1、监控管理:兼容mqtt代码

dev
mh 4 months ago
parent
commit
b99cf1311c
  1. 19
      mh-admin/src/main/java/com/mh/web/controller/device/OperationController.java
  2. 63
      mh-admin/src/main/java/com/mh/web/controller/mqtt/MqttTopicController.java
  3. 4
      mh-admin/src/main/java/com/mh/web/controller/operate/AlarmCodeController.java
  4. 4
      mh-admin/src/main/java/com/mh/web/controller/operate/AlarmRulesController.java
  5. 137
      mh-admin/src/main/resources/application-dev.yml
  6. 137
      mh-admin/src/main/resources/application-prod.yml
  7. 137
      mh-admin/src/main/resources/application-test.yml
  8. 121
      mh-admin/src/main/resources/application.yml
  9. 15
      mh-common/pom.xml
  10. 99
      mh-common/src/main/java/com/mh/common/config/mqtt/MqttConfig.java
  11. 84
      mh-common/src/main/java/com/mh/common/config/mqtt/MqttInboundConfig.java
  12. 53
      mh-common/src/main/java/com/mh/common/config/mqtt/MqttMessageChannel.java
  13. 51
      mh-common/src/main/java/com/mh/common/config/mqtt/MqttOutboundConfig.java
  14. 59
      mh-common/src/main/java/com/mh/common/constant/ChannelName.java
  15. 31
      mh-common/src/main/java/com/mh/common/constant/TopicConst.java
  16. 108
      mh-common/src/main/java/com/mh/common/enums/MqttClientOptions.java
  17. 33
      mh-common/src/main/java/com/mh/common/enums/MqttProtocolEnum.java
  18. 16
      mh-common/src/main/java/com/mh/common/enums/MqttUseEnum.java
  19. 35
      mh-common/src/main/java/com/mh/common/model/request/CommonTopicReceiver.java
  20. 36
      mh-common/src/main/java/com/mh/common/model/response/CommonTopicResponse.java
  21. 20
      mh-common/src/main/java/com/mh/common/model/response/ServiceReply.java
  22. 49
      mh-system/src/main/java/com/mh/system/service/mqtt/IEventsService.java
  23. 41
      mh-system/src/main/java/com/mh/system/service/mqtt/IMqttMessageGateway.java
  24. 96
      mh-system/src/main/java/com/mh/system/service/mqtt/IMqttMsgSenderService.java
  25. 40
      mh-system/src/main/java/com/mh/system/service/mqtt/IMqttTopicService.java
  26. 71
      mh-system/src/main/java/com/mh/system/service/mqtt/impl/EventsServiceImpl.java
  27. 140
      mh-system/src/main/java/com/mh/system/service/mqtt/impl/MqttMsgSenderServiceImpl.java
  28. 41
      mh-system/src/main/java/com/mh/system/service/mqtt/impl/MqttTopicServiceImpl.java
  29. 25
      pom.xml

19
mh-admin/src/main/java/com/mh/web/controller/device/OperationController.java

@ -0,0 +1,19 @@
package com.mh.web.controller.device;
import com.mh.common.core.controller.BaseController;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author LJF
* @version 1.0
* @project EEMCS
* @description 设备控制类
* @date 2025-01-15 14:11:22
*/
@RestController
@RequestMapping("/device/operation")
public class OperationController extends BaseController {
}

63
mh-admin/src/main/java/com/mh/web/controller/mqtt/MqttTopicController.java

@ -0,0 +1,63 @@
package com.mh.web.controller.mqtt;
import com.mh.system.service.mqtt.IMqttMsgSenderService;
import com.mh.system.service.mqtt.IMqttTopicService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Arrays;
/**
* @author LJF
* @version 1.0
* @project springboot-mqtt-demo
* @description 主题消息类
* @date 2024-10-30 15:05:24
*/
@RestController
@RequestMapping("/topic")
public class MqttTopicController {
@Autowired
private IMqttTopicService mqttTopicService;
@Autowired
private IMqttMsgSenderService mqttMsgSenderService;
@GetMapping("/add")
public String add(String topic) {
mqttTopicService.subscribe(topic);
return "添加主题:" + topic + "成功";
}
@GetMapping("/del")
public String del(String topic) {
mqttTopicService.unsubscribe(topic);
return "删除主题:" + topic + "成功";
}
@GetMapping("/list")
public String list() {
return "已订阅的主题:" + String.join(",", mqttTopicService.getSubscribedTopics());
}
@GetMapping("/publish")
public String publish(String topic, String pushMessage) {
String[] subscribedTopics = mqttTopicService.getSubscribedTopics();
boolean exists = Arrays.stream(subscribedTopics).anyMatch(val -> val.equals(topic));
if (!exists) {
return "主题不存在";
}
mqttMsgSenderService.publish(topic, pushMessage);
return "发布主题:" + topic + "成功";
}
@GetMapping("/reply")
public String reply(String topic, String pushMessage) {
mqttMsgSenderService.publish(topic, pushMessage);
return "回复主题:" + topic + "成功";
}
}

4
mh-admin/src/main/java/com/mh/web/controller/operation/AlarmCodeController.java → mh-admin/src/main/java/com/mh/web/controller/operate/AlarmCodeController.java

@ -1,4 +1,4 @@
package com.mh.web.controller.operation;
package com.mh.web.controller.operate;
import com.mh.common.annotation.Log;
import com.mh.common.core.controller.BaseController;
@ -22,7 +22,7 @@ import java.util.List;
* @date 2025-01-14 16:40:58
*/
@RestController
@RequestMapping("/operation/ac")
@RequestMapping("/operate/ac")
public class AlarmCodeController extends BaseController {
@Autowired

4
mh-admin/src/main/java/com/mh/web/controller/operation/AlarmRulesController.java → mh-admin/src/main/java/com/mh/web/controller/operate/AlarmRulesController.java

@ -1,4 +1,4 @@
package com.mh.web.controller.operation;
package com.mh.web.controller.operate;
import com.mh.common.annotation.Log;
import com.mh.common.core.controller.BaseController;
@ -22,7 +22,7 @@ import java.util.List;
* @date 2025-01-14 17:32:55
*/
@RestController
@RequestMapping("/operation/ar")
@RequestMapping("/operate/ar")
public class AlarmRulesController extends BaseController {
@Autowired

137
mh-admin/src/main/resources/application-dev.yml

@ -0,0 +1,137 @@
# 项目相关配置
mh:
# 名称
name: MH
# 版本
version: 1.0.0
# 版权年份
copyrightYear: 2024
# 文件路径 示例( Windows配置D:/mh/uploadPath,Linux配置 /home/mh/uploadPath)
profile: D:/mh/uploadPath
# 获取ip地址开关
addressEnabled: false
# 验证码类型 math 数字计算 char 字符验证
captchaType: math
# 开发环境配置
server:
# 服务器的HTTP端口,默认为8080
port: 8080
servlet:
# 应用的访问路径
context-path: /
tomcat:
# tomcat的URI编码
uri-encoding: UTF-8
# 连接数满后的排队数,默认为100
accept-count: 1000
threads:
# tomcat最大线程数,默认为200
max: 800
# Tomcat启动初始化的线程数,默认值10
min-spare: 100
# 日志配置
logging:
level:
com.mh: debug
org.springframework: warn
# Spring配置
spring:
# 资源信息
messages:
# 国际化资源文件路径
basename: i18n/messages
profiles:
active: druid
# 文件上传
servlet:
multipart:
# 单个文件大小
max-file-size: 10MB
# 设置总上传的文件大小
max-request-size: 20MB
# 服务模块
devtools:
restart:
# 热部署开关
enabled: true
data:
# redis 配置
redis:
# 地址
host: localhost
# 端口,默认为6379
port: 6379
# 数据库索引
database: 0
# 密码
password:
# 连接超时时间
timeout: 10s
lettuce:
pool:
# 连接池中的最小空闲连接
min-idle: 0
# 连接池中的最大空闲连接
max-idle: 8
# 连接池的最大数据库连接数
max-active: 8
# #连接池最大阻塞等待时间(使用负值表示没有限制)
max-wait: -1ms
# MyBatis配置
mybatis-plus:
# 搜索指定包别名
type-aliases-package: com.mh.**.domain
# 配置mapper的扫描,找到所有的mapper.xml映射文件
mapper-locations: classpath*:mapper/**/*Mapper.xml
# 加载全局的配置文件
config-location: classpath:mybatis/mybatis-config.xml
configuration:
map-underscore-to-camel-case: true
# Springdoc配置
springdoc:
api-docs:
path: /v3/api-docs
swagger-ui:
enabled: true
path: /swagger-ui.html
tags-sorter: alpha
group-configs:
- group: 'default'
display-name: '测试模块'
paths-to-match: '/**'
packages-to-scan: com.mh.web.controller.tool
# 防止XSS攻击
xss:
# 过滤开关
enabled: true
# 排除链接(多个用逗号分隔)
excludes: /system/notice
# 匹配链接
urlPatterns: /system/*,/monitor/*,/tool/*
mqttSpring:
# BASIC parameters are required.
BASIC:
protocol: MQTT
host: 127.0.0.1
port: 2883
username: mh
password: mhtech@803
client-id: mqtt_mz_producer_dev
# If the protocol is ws/wss, this value is required.
path:
# Topics that need to be subscribed when initially connecting to mqtt, multiple topics are divided by ",".
inbound-topic: A/cmd/ctl/receive1
# 此部分是提供给后端生成token返回给前端,让前端使用websocket方式去和MQTT实现交互的,笔者此文的案例中并没有去实现
# 无人机远程控制模式(drone remote control)
DRC:
protocol: WS
host: 127.0.0.1
port: 8083
path: /mqtt

137
mh-admin/src/main/resources/application-prod.yml

@ -0,0 +1,137 @@
# 项目相关配置
mh:
# 名称
name: MH
# 版本
version: 1.0.0
# 版权年份
copyrightYear: 2024
# 文件路径 示例( Windows配置D:/mh/uploadPath,Linux配置 /home/mh/uploadPath)
profile: D:/mh/uploadPath
# 获取ip地址开关
addressEnabled: false
# 验证码类型 math 数字计算 char 字符验证
captchaType: math
# 开发环境配置
server:
# 服务器的HTTP端口,默认为8080
port: 8080
servlet:
# 应用的访问路径
context-path: /
tomcat:
# tomcat的URI编码
uri-encoding: UTF-8
# 连接数满后的排队数,默认为100
accept-count: 1000
threads:
# tomcat最大线程数,默认为200
max: 800
# Tomcat启动初始化的线程数,默认值10
min-spare: 100
# 日志配置
logging:
level:
com.mh: debug
org.springframework: warn
# Spring配置
spring:
# 资源信息
messages:
# 国际化资源文件路径
basename: i18n/messages
profiles:
active: druid
# 文件上传
servlet:
multipart:
# 单个文件大小
max-file-size: 10MB
# 设置总上传的文件大小
max-request-size: 20MB
# 服务模块
devtools:
restart:
# 热部署开关
enabled: true
data:
# redis 配置
redis:
# 地址
host: localhost
# 端口,默认为6379
port: 6379
# 数据库索引
database: 0
# 密码
password:
# 连接超时时间
timeout: 10s
lettuce:
pool:
# 连接池中的最小空闲连接
min-idle: 0
# 连接池中的最大空闲连接
max-idle: 8
# 连接池的最大数据库连接数
max-active: 8
# #连接池最大阻塞等待时间(使用负值表示没有限制)
max-wait: -1ms
# MyBatis配置
mybatis-plus:
# 搜索指定包别名
type-aliases-package: com.mh.**.domain
# 配置mapper的扫描,找到所有的mapper.xml映射文件
mapper-locations: classpath*:mapper/**/*Mapper.xml
# 加载全局的配置文件
config-location: classpath:mybatis/mybatis-config.xml
configuration:
map-underscore-to-camel-case: true
# Springdoc配置
springdoc:
api-docs:
path: /v3/api-docs
swagger-ui:
enabled: true
path: /swagger-ui.html
tags-sorter: alpha
group-configs:
- group: 'default'
display-name: '测试模块'
paths-to-match: '/**'
packages-to-scan: com.mh.web.controller.tool
# 防止XSS攻击
xss:
# 过滤开关
enabled: true
# 排除链接(多个用逗号分隔)
excludes: /system/notice
# 匹配链接
urlPatterns: /system/*,/monitor/*,/tool/*
mqttSpring:
# BASIC parameters are required.
BASIC:
protocol: MQTT
host: 127.0.0.1
port: 2883
username: mh
password: mhtech@803
client-id: mqtt_mz_producer_prod
# If the protocol is ws/wss, this value is required.
path:
# Topics that need to be subscribed when initially connecting to mqtt, multiple topics are divided by ",".
inbound-topic: A/cmd/ctl/receive1
# 此部分是提供给后端生成token返回给前端,让前端使用websocket方式去和MQTT实现交互的,笔者此文的案例中并没有去实现
# 无人机远程控制模式(drone remote control)
DRC:
protocol: WS
host: 127.0.0.1
port: 8083
path: /mqtt

137
mh-admin/src/main/resources/application-test.yml

@ -0,0 +1,137 @@
# 项目相关配置
mh:
# 名称
name: MH
# 版本
version: 1.0.0
# 版权年份
copyrightYear: 2024
# 文件路径 示例( Windows配置D:/mh/uploadPath,Linux配置 /home/mh/uploadPath)
profile: D:/mh/uploadPath
# 获取ip地址开关
addressEnabled: false
# 验证码类型 math 数字计算 char 字符验证
captchaType: math
# 开发环境配置
server:
# 服务器的HTTP端口,默认为8080
port: 8080
servlet:
# 应用的访问路径
context-path: /
tomcat:
# tomcat的URI编码
uri-encoding: UTF-8
# 连接数满后的排队数,默认为100
accept-count: 1000
threads:
# tomcat最大线程数,默认为200
max: 800
# Tomcat启动初始化的线程数,默认值10
min-spare: 100
# 日志配置
logging:
level:
com.mh: debug
org.springframework: warn
# Spring配置
spring:
# 资源信息
messages:
# 国际化资源文件路径
basename: i18n/messages
profiles:
active: druid
# 文件上传
servlet:
multipart:
# 单个文件大小
max-file-size: 10MB
# 设置总上传的文件大小
max-request-size: 20MB
# 服务模块
devtools:
restart:
# 热部署开关
enabled: true
data:
# redis 配置
redis:
# 地址
host: localhost
# 端口,默认为6379
port: 6379
# 数据库索引
database: 0
# 密码
password:
# 连接超时时间
timeout: 10s
lettuce:
pool:
# 连接池中的最小空闲连接
min-idle: 0
# 连接池中的最大空闲连接
max-idle: 8
# 连接池的最大数据库连接数
max-active: 8
# #连接池最大阻塞等待时间(使用负值表示没有限制)
max-wait: -1ms
# MyBatis配置
mybatis-plus:
# 搜索指定包别名
type-aliases-package: com.mh.**.domain
# 配置mapper的扫描,找到所有的mapper.xml映射文件
mapper-locations: classpath*:mapper/**/*Mapper.xml
# 加载全局的配置文件
config-location: classpath:mybatis/mybatis-config.xml
configuration:
map-underscore-to-camel-case: true
# Springdoc配置
springdoc:
api-docs:
path: /v3/api-docs
swagger-ui:
enabled: true
path: /swagger-ui.html
tags-sorter: alpha
group-configs:
- group: 'default'
display-name: '测试模块'
paths-to-match: '/**'
packages-to-scan: com.mh.web.controller.tool
# 防止XSS攻击
xss:
# 过滤开关
enabled: true
# 排除链接(多个用逗号分隔)
excludes: /system/notice
# 匹配链接
urlPatterns: /system/*,/monitor/*,/tool/*
mqttSpring:
# BASIC parameters are required.
BASIC:
protocol: MQTT
host: 127.0.0.1
port: 2883
username: mh
password: mhtech@803
client-id: mqtt_mz_test_dev
# If the protocol is ws/wss, this value is required.
path:
# Topics that need to be subscribed when initially connecting to mqtt, multiple topics are divided by ",".
inbound-topic: A/cmd/ctl/receive1
# 此部分是提供给后端生成token返回给前端,让前端使用websocket方式去和MQTT实现交互的,笔者此文的案例中并没有去实现
# 无人机远程控制模式(drone remote control)
DRC:
protocol: WS
host: 127.0.0.1
port: 8083
path: /mqtt

121
mh-admin/src/main/resources/application.yml

@ -1,41 +1,6 @@
# 项目相关配置
mh:
# 名称
name: MH
# 版本
version: 1.0.0
# 版权年份
copyrightYear: 2024
# 文件路径 示例( Windows配置D:/mh/uploadPath,Linux配置 /home/mh/uploadPath)
profile: D:/mh/uploadPath
# 获取ip地址开关
addressEnabled: false
# 验证码类型 math 数字计算 char 字符验证
captchaType: math
# 开发环境配置
server:
# 服务器的HTTP端口,默认为8080
port: 8080
servlet:
# 应用的访问路径
context-path: /
tomcat:
# tomcat的URI编码
uri-encoding: UTF-8
# 连接数满后的排队数,默认为100
accept-count: 1000
threads:
# tomcat最大线程数,默认为200
max: 800
# Tomcat启动初始化的线程数,默认值10
min-spare: 100
# 日志配置
logging:
level:
com.mh: debug
org.springframework: warn
spring:
profiles:
active: dev
# 用户配置
user:
@ -45,50 +10,6 @@ user:
# 密码锁定时间(默认10分钟)
lockTime: 10
# Spring配置
spring:
# 资源信息
messages:
# 国际化资源文件路径
basename: i18n/messages
profiles:
active: druid
# 文件上传
servlet:
multipart:
# 单个文件大小
max-file-size: 10MB
# 设置总上传的文件大小
max-request-size: 20MB
# 服务模块
devtools:
restart:
# 热部署开关
enabled: true
data:
# redis 配置
redis:
# 地址
host: localhost
# 端口,默认为6379
port: 6379
# 数据库索引
database: 0
# 密码
password:
# 连接超时时间
timeout: 10s
lettuce:
pool:
# 连接池中的最小空闲连接
min-idle: 0
# 连接池中的最大空闲连接
max-idle: 8
# 连接池的最大数据库连接数
max-active: 8
# #连接池最大阻塞等待时间(使用负值表示没有限制)
max-wait: -1ms
# token配置
token:
# 令牌自定义标识
@ -98,43 +19,9 @@ token:
# 令牌有效期(默认30分钟)
expireTime: 30
# MyBatis配置
mybatis-plus:
# 搜索指定包别名
type-aliases-package: com.mh.**.domain
# 配置mapper的扫描,找到所有的mapper.xml映射文件
mapper-locations: classpath*:mapper/**/*Mapper.xml
# 加载全局的配置文件
config-location: classpath:mybatis/mybatis-config.xml
configuration:
map-underscore-to-camel-case: true
# PageHelper分页插件
pagehelper:
reasonable: true
params: count=countSql
support-methods-arguments: true
helper-dialect: postgresql
# Springdoc配置
springdoc:
api-docs:
path: /v3/api-docs
swagger-ui:
enabled: true
path: /swagger-ui.html
tags-sorter: alpha
group-configs:
- group: 'default'
display-name: '测试模块'
paths-to-match: '/**'
packages-to-scan: com.mh.web.controller.tool
# 防止XSS攻击
xss:
# 过滤开关
enabled: true
# 排除链接(多个用逗号分隔)
excludes: /system/notice
# 匹配链接
urlPatterns: /system/*,/monitor/*,/tool/*
helper-dialect: postgresql

15
mh-common/pom.xml

@ -140,6 +140,21 @@
<artifactId>core</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-messaging</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
</project>

99
mh-common/src/main/java/com/mh/common/config/mqtt/MqttConfig.java

@ -0,0 +1,99 @@
package com.mh.common.config.mqtt;
import com.mh.common.enums.MqttClientOptions;
import com.mh.common.enums.MqttProtocolEnum;
import com.mh.common.enums.MqttUseEnum;
import lombok.Data;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.util.StringUtils;
import java.util.Map;
/**
* @author LJF
* @version 1.0
* @project springboot-mqtt-demo
* @description mqtt连接配置
* @date 2024-10-29 14:44:51
*/
@Configuration
@Data
@ConfigurationProperties
public class MqttConfig {
private static Map<MqttUseEnum, MqttClientOptions> mqttSpring;
public void setMqttSpring(Map<MqttUseEnum, MqttClientOptions> mqtt) {
MqttConfig.mqttSpring = mqtt;
}
/**
* 获取mqtt基本配置
* @return
*/
static MqttClientOptions getBasicMqttClientOptions() {
if (!mqttSpring.containsKey(MqttUseEnum.BASIC)) {
throw new Error("请先配置MQTT的基本连接参数,否则无法启动项目");
}
return mqttSpring.get(MqttUseEnum.BASIC);
}
/**
* 拼接获取对应mqtt的连接地址
* @param options
* @return
*/
public static String getMqttAddress(MqttClientOptions options) {
StringBuilder addr = new StringBuilder();
addr.append(options.getProtocol().getProtocolAddr())
.append(options.getHost().trim())
.append(":")
.append(options.getPort());
if ((options.getProtocol() == MqttProtocolEnum.WS || options.getProtocol() == MqttProtocolEnum.WSS)
&& StringUtils.hasText(options.getPath())) {
addr.append(options.getPath());
}
return addr.toString();
}
public static String getBasicMqttAddress() {
return getMqttAddress(getBasicMqttClientOptions());
}
/**
* 获取连接参数注入到spring中
* @return
*/
@Bean
public MqttConnectOptions mqttConnectionOptions() {
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
MqttClientOptions customizeOptions = getBasicMqttClientOptions();
String basicMqttAddress = getBasicMqttAddress();
mqttConnectOptions.setServerURIs(new String[]{basicMqttAddress});
mqttConnectOptions.setUserName(StringUtils.hasText(customizeOptions.getUsername()) ?
customizeOptions.getUsername() : "");
mqttConnectOptions.setPassword(StringUtils.hasText(customizeOptions.getPassword()) ?
customizeOptions.getPassword().toCharArray() : new char[0]);
// 直接进行自动连接
mqttConnectOptions.setAutomaticReconnect(true);
// 时间间隔时间10s
mqttConnectOptions.setKeepAliveInterval(10);
return mqttConnectOptions;
}
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(mqttConnectionOptions());
return factory;
}
}

84
mh-common/src/main/java/com/mh/common/config/mqtt/MqttInboundConfig.java

@ -0,0 +1,84 @@
package com.mh.common.config.mqtt;
import com.mh.common.constant.ChannelName;
import com.mh.common.enums.MqttClientOptions;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
/**
* @author LJF
* @version 1.0
* @project springboot-mqtt-demo
* @description 入站配置
* @date 2024-10-29 16:22:10
*/
@Slf4j
@Configuration
@IntegrationComponentScan
public class MqttInboundConfig {
@Autowired
private MqttPahoClientFactory mqttClientFactory;
@Resource(name = ChannelName.INBOUND)
private MessageChannel inboundChannel;
/**
* 入站适配器
* @return
*/
@Bean(name = "adapter")
public MessageProducerSupport mqttInbound() {
MqttClientOptions options = MqttConfig.getBasicMqttClientOptions();
// 此处初始化的时候,默认订阅了配置文件中已经写好的topic
// 如果需要订阅多个,可以自己手动订阅,会写一个addTopic()进行添加订阅
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
options.getClientId() + "_consumer_" + System.currentTimeMillis(),
mqttClientFactory,
options.getInboundTopic().split(","));
DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();
// 统一是字节处理
converter.setPayloadAsBytes(true);
// 设置消息转换器
adapter.setConverter(converter);
// 设置qos(quality of service)
// 0:最多一次传输(消息会丢失),
// 1:至少一次传输(消息会重复),
// 2:只有当消息发送成功时才确认(消息不回丢,但延迟高)。
adapter.setQos(0);
// 设置在接收已经订阅的主题信息后,发送给哪个通道,具体的发送方法需要翻上层的抽象类
adapter.setOutputChannel(inboundChannel);
return adapter;
}
/**
* 默认声明一个消息处理器用于处理无效的消息
* @return
*/
@Bean
@ServiceActivator(inputChannel = ChannelName.DEFAULT)
public MessageHandler handler() {
return message -> {
log.info("The default channel does not handle messages." +
"\nTopic: {}" +
"\nPayload: {}",
message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC),
message.getPayload());
};
}
}

53
mh-common/src/main/java/com/mh/common/config/mqtt/MqttMessageChannel.java

@ -0,0 +1,53 @@
package com.mh.common.config.mqtt;
import com.mh.common.constant.ChannelName;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.ExecutorChannel;
import org.springframework.messaging.MessageChannel;
import java.util.concurrent.Executor;
/**
* @author LJF
* @version 1.0
* @project springboot-mqtt-demo
* @description 声明所有通道的定义类
* @date 2024-10-29 16:23:32
*/
@Slf4j
@Configuration
public class MqttMessageChannel {
@Autowired
private Executor threadPool;
@Bean(name = ChannelName.INBOUND)
public MessageChannel inboundChannel() {
return new ExecutorChannel(threadPool);
}
@Bean(name = ChannelName.EVENTS_UPLOAD_INBOUND)
public MessageChannel eventsUploadInbound() {
return new DirectChannel();
}
@Bean(name = ChannelName.EVENTS_COLLECTION_INBOUND)
public MessageChannel eventsCollectionInbound() {
return new DirectChannel();
}
@Bean(name = ChannelName.EVENTS_CONTROL_INBOUND)
public MessageChannel eventsControlInbound() {
return new DirectChannel();
}
@Bean(name = ChannelName.EVENTS_SEND_INBOUND)
public MessageChannel eventsSendInbound() {
return new DirectChannel();
}
}

51
mh-common/src/main/java/com/mh/common/config/mqtt/MqttOutboundConfig.java

@ -0,0 +1,51 @@
package com.mh.common.config.mqtt;
import com.mh.common.constant.ChannelName;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageHandler;
/**
* @author LJF
* @version 1.0
* @project springboot-mqtt-demo
* @description 入站配置
* @date 2024-10-29 16:22:10
*/
@Slf4j
@Configuration
@IntegrationComponentScan
public class MqttOutboundConfig {
@Autowired
private MqttPahoClientFactory mqttClientFactory;
/**
* 默认声明一个出站处理器用于处理无效的消息
* @return
*/
@Bean
@ServiceActivator(inputChannel = ChannelName.OUTBOUND)
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
MqttConfig.getBasicMqttClientOptions().getClientId() + "_producer_" + System.currentTimeMillis(),
mqttClientFactory);
DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();
// use byte types uniformly
converter.setPayloadAsBytes(true);
messageHandler.setAsync(true);
messageHandler.setDefaultQos(0);
messageHandler.setConverter(converter);
return messageHandler;
}
}

59
mh-common/src/main/java/com/mh/common/constant/ChannelName.java

@ -0,0 +1,59 @@
package com.mh.common.constant;
/**
* @author LJF
* @version 1.0
* @project springboot-mqtt-demo
* @description 声明所有通道
* @date 2024-10-29 16:04:19
*/
public class ChannelName {
/**
* 默认通道名称防止出错
*/
public static final String DEFAULT = "default";
/**
* 主动上报入站
*/
public static final String INBOUND = "inbound";
/**
* 出站
*/
public static final String OUTBOUND = "outbound";
/**
* 入站主动上报
*/
public static final String EVENTS_UPLOAD_INBOUND = "events_upload_inbound";
/**
* 入站主动采集
*/
public static final String EVENTS_COLLECTION_INBOUND = "events_collection_inbound";
/**
* 入站主动控制
*/
public static final String EVENTS_CONTROL_INBOUND = "events_control_inbound";
/**
* 默认进站处理
*/
public static final String EVENTS_DEFAULT_INBOUND = "events_default_inbound";
public static final String REPLY_EVENTS_OUTBOUND = "reply_events_outbound";
/**
* 新珠江收到的信息
*/
public static final String EVENTS_RECEIVE_INBOUND = "events_receive_inbound";
/**
* 接收服务端的数据报文
*/
public static final String EVENTS_SEND_INBOUND = "events_send_inbound";
}

31
mh-common/src/main/java/com/mh/common/constant/TopicConst.java

@ -0,0 +1,31 @@
package com.mh.common.constant;
/**
* All the topics that need to be used in the project.
*
* @author sean.zhou
* @version 0.1
* @date 2021/11/10
*/
public class TopicConst {
public static final String MH_UPLOAD = "mh_upload/";
public static final String EVENTS_UPLOAD = "events_upload/";
public static final String MH_COLLECTION = "mh_collection/";
public static final String EVENTS_COLLECTION = "events_collection/";
public static final String MH_CONTROL = "mh_control/";
public static final String EVENTS_CONTROL = "events_control/";
public static final String REGEX_SN = "[A-Za-z0-9]+";
public static final String THING_MODEL_PRE = "thing/";
public static final String PRODUCT = "product/";
public static final String SERVICES_SUF = "/services";
}

108
mh-common/src/main/java/com/mh/common/enums/MqttClientOptions.java

@ -0,0 +1,108 @@
package com.mh.common.enums;
/**
* @author LJF
* @version 1.0
* @project springboot-mqtt-demo
* @description mqtt连接的参数
* @date 2024-10-29 14:46:24
*/
public class MqttClientOptions {
private MqttProtocolEnum protocol;
private String host;
private Integer port;
private String username;
private String password;
private String clientId;
private String path;
/**
* 客户端连接的时候订阅的主题
*/
private String inboundTopic;
public MqttProtocolEnum getProtocol() {
return protocol;
}
public void setProtocol(MqttProtocolEnum protocol) {
this.protocol = protocol;
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public Integer getPort() {
return port;
}
public void setPort(Integer port) {
this.port = port;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public String getClientId() {
return clientId;
}
public void setClientId(String clientId) {
this.clientId = clientId;
}
public String getPath() {
return path;
}
public void setPath(String path) {
this.path = path;
}
public String getInboundTopic() {
return inboundTopic;
}
public void setInboundTopic(String inboundTopic) {
this.inboundTopic = inboundTopic;
}
@Override
public String toString() {
return "MqttClientOptions{" +
"protocol=" + protocol +
", host='" + host + '\'' +
", port=" + port +
", username='" + username + '\'' +
", password='" + password + '\'' +
", clientId='" + clientId + '\'' +
", path='" + path + '\'' +
", inboundTopic='" + inboundTopic + '\'' +
'}';
}
}

33
mh-common/src/main/java/com/mh/common/enums/MqttProtocolEnum.java

@ -0,0 +1,33 @@
package com.mh.common.enums;
/**
* @author LJF
* @version 1.0
* @project springboot-mqtt-demo
* @description 采用哪种协议进行数据交互
* @date 2024-10-29 15:21:07
*/
public enum MqttProtocolEnum {
MQTT("tcp"),
MQTTS("tcp"),
WS("ws"),
WSS("wss");
final String protocol;
MqttProtocolEnum(String protocol) {
this.protocol = protocol;
}
public String getProtocolAddr() {
return protocol + "://";
}
public String getProtocol() {
return protocol;
}
}

16
mh-common/src/main/java/com/mh/common/enums/MqttUseEnum.java

@ -0,0 +1,16 @@
package com.mh.common.enums;
/**
* @author LJF
* @version 1.0
* @project springboot-mqtt-demo
* @description mqtt选择模式
* @date 2024-10-29 15:19:21
*/
public enum MqttUseEnum {
BASIC,
DRC
}

35
mh-common/src/main/java/com/mh/common/model/request/CommonTopicReceiver.java

@ -0,0 +1,35 @@
package com.mh.common.model.request;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.Data;
/**
* Unified topic receiving format.
*
* @author sean.zhou
* @version 0.1
* @date 2021/11/10
*/
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public class CommonTopicReceiver<T> {
/**
* The command is sent and the response is matched by the tid and bid fields in the message,
* and the reply should keep the tid and bid the same.
*/
private String tid;
private String bid;
private String method;
private Long timestamp;
private T data;
private String gateway;
private Integer needReply;
}

36
mh-common/src/main/java/com/mh/common/model/response/CommonTopicResponse.java

@ -0,0 +1,36 @@
package com.mh.common.model.response;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* Unified Topic response format
*
* @author sean.zhou
* @version 0.1
* @date 2021/11/15
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
@JsonIgnoreProperties(ignoreUnknown = true)
public class CommonTopicResponse<T> {
/**
* The command is sent and the response is matched by the tid and bid fields in the message,
* and the reply should keep the tid and bid the same.
*/
private String tid;
private String bid;
private String method;
private T data;
private Long timestamp;
}

20
mh-common/src/main/java/com/mh/common/model/response/ServiceReply.java

@ -0,0 +1,20 @@
package com.mh.common.model.response;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.Data;
/**
* @author sean.zhou
* @version 0.1
* @date 2021/11/22
*/
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public class ServiceReply<T> {
private Integer result;
private T info;
private T output;
}

49
mh-system/src/main/java/com/mh/system/service/mqtt/IEventsService.java

@ -0,0 +1,49 @@
package com.mh.system.service.mqtt;
import org.springframework.messaging.MessageHeaders;
import java.io.IOException;
/**
* @author LJF
* @version 1.0
* @project springboot-mqtt-demo
* @description 通道处理类
* @date 2024-11-05 11:30:26
*/
public interface IEventsService {
/**
* 处理主动上报
* @param receiver
* @param headers
* @return
*/
void handleInboundUpload(byte[] receiver, MessageHeaders headers) throws IOException;
/**
* 处理服务器主动采集上报
* @param receiver
* @param headers
* @return
*/
void handleInboundCollection(byte[] receiver, MessageHeaders headers) throws IOException;
/**
* 处理控制上报
* @param receiver
* @param headers
* @return
*/
void handleInboundControl(byte[] receiver, MessageHeaders headers) throws IOException;
/**
* 处理接收上报
* @param receiver
* @param headers
* @throws IOException
*/
void handleInboundReceive(byte[] receiver, MessageHeaders headers) throws IOException;
}

41
mh-system/src/main/java/com/mh/system/service/mqtt/IMqttMessageGateway.java

@ -0,0 +1,41 @@
package com.mh.system.service.mqtt;
import com.mh.common.constant.ChannelName;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
/**
* @author LJF
* @version 1.0
* @project springboot-mqtt-demo
* @description 消息网关
* @date 2024-10-30 14:43:55
*/
@Component
@MessagingGateway(defaultRequestChannel = ChannelName.OUTBOUND)
public interface IMqttMessageGateway {
/**
* 发送消息
* @param topic
* @param payload
*/
void publish(@Header(MqttHeaders.TOPIC) String topic, String payload, @Header(MqttHeaders.QOS) int qos);
/**
* 发送消息
* @param topic
* @param payload
*/
void publish(@Header(MqttHeaders.TOPIC) String topic, byte[] payload);
/**
* 发送消息并带上qos
* @param topic
* @param payload
* @param qos
*/
void publish(@Header(MqttHeaders.TOPIC) String topic, byte[] payload, @Header(MqttHeaders.QOS) int qos);
}

96
mh-system/src/main/java/com/mh/system/service/mqtt/IMqttMsgSenderService.java

@ -0,0 +1,96 @@
package com.mh.system.service.mqtt;
import com.fasterxml.jackson.core.type.TypeReference;
import com.mh.user.model.response.CommonTopicResponse;
import com.mh.user.model.response.ServiceReply;
/**
* @author LJF
* @version 1.0
* @project springboot-mqtt-demo
* @description Mqtt发送消息
* @date 2024-10-29 17:31:40
*/
public interface IMqttMsgSenderService {
void publish(String topic, String pushMessage);
/**
* 发布消息
* @param topic target
* @param response message
*/
void publish(String topic, CommonTopicResponse response);
/**
* 使用qos发布消息
*
* @param topic target
* @param qos qos
* @param response message
*/
void publish(String topic, int qos, CommonTopicResponse response);
/**
* 发送消息并同时接收响应
* @param clazz
* @param topic
* @param response 通知启动是否成功
* @return
*/
<T> T publishWithReply(Class<T> clazz, String topic, CommonTopicResponse response);
/**
* 发送消息并同时接收响应
* @param clazz
* @param topic
* @param response
* @param retryTime
* @param <T>
* @return
*/
<T> T publishWithReply(Class<T> clazz, String topic, CommonTopicResponse response, int retryTime);
/**
* 专门用于发送服务消息
* @param clazz The generic class for ServiceReply.
* @param sn
* @param method
* @param data
* @param bid
* @param <T>
* @return
*/
<T> ServiceReply<T> publishServicesTopic(TypeReference<T> clazz, String sn, String method, Object data, String bid);
/**
* 仅用于为服务发送消息不设置接收到的子类型
* @param sn
* @param method
* @param data
* @param bid
* @return
*/
ServiceReply publishServicesTopic(String sn, String method, Object data, String bid);
/**
* 专门用于发送服务消息
* @param clazz The generic class for ServiceReply.
* @param sn
* @param method
* @param data
* @param <T>
* @return
*/
<T> ServiceReply<T> publishServicesTopic(TypeReference<T> clazz, String sn, String method, Object data);
/**
* 仅用于为服务发送消息不设置接收到的子类型
* @param sn
* @param method
* @param data
* @return
*/
ServiceReply publishServicesTopic(String sn, String method, Object data);
}

40
mh-system/src/main/java/com/mh/system/service/mqtt/IMqttTopicService.java

@ -0,0 +1,40 @@
package com.mh.system.service.mqtt;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
/**
* @author LJF
* @version 1.0
* @project springboot-mqtt-demo
* @description mqtt订阅主题
* @date 2024-10-29 17:29:52
*/
public interface IMqttTopicService {
/**
* 订阅主题
* @param topic
*/
void subscribe(@Header(MqttHeaders.TOPIC) String topic);
/**
* 订阅主题并设置qos
* @param topic
* @param qos
*/
void subscribe(@Header(MqttHeaders.TOPIC) String topic, int qos);
/**
* 解绑主题
* @param topic
*/
void unsubscribe(@Header(MqttHeaders.TOPIC) String topic);
/**
* 获取已订阅的主题
* @return
*/
String[] getSubscribedTopics();
}

71
mh-system/src/main/java/com/mh/system/service/mqtt/impl/EventsServiceImpl.java

@ -0,0 +1,71 @@
package com.mh.system.service.mqtt.impl;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.mh.common.constant.ChannelName;
import com.mh.common.core.redis.RedisCache;
import com.mh.common.model.request.CommonTopicReceiver;
import com.mh.system.service.mqtt.IEventsService;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.MessageHeaders;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
/**
* @author LJF
* @version 1.0
* @project springboot-mqtt-demo
* @description 通道消息处理
* @date 2024-11-05 11:42:30
*/
@Slf4j
@Service
public class EventsServiceImpl implements IEventsService {
@Autowired
private ObjectMapper mapper;
@Autowired
private RedisCache redisCache;
@ServiceActivator(inputChannel = ChannelName.EVENTS_UPLOAD_INBOUND)
@Override
public void handleInboundUpload(byte[] receiver, MessageHeaders headers) {
handleInboundData(receiver, "主动上报数据");
}
@ServiceActivator(inputChannel = ChannelName.EVENTS_COLLECTION_INBOUND)
@Override
public void handleInboundCollection(byte[] receiver, MessageHeaders headers) {
handleInboundData(receiver, "主动下发采集数据");
}
@ServiceActivator(inputChannel = ChannelName.EVENTS_CONTROL_INBOUND)
@Override
public void handleInboundControl(byte[] receiver, MessageHeaders headers) {
handleInboundData(receiver, "控制指令下发");
}
@ServiceActivator(inputChannel = ChannelName.EVENTS_RECEIVE_INBOUND)
@Override
public void handleInboundReceive(byte[] receiver, MessageHeaders headers) {
String receiveStr = new String(receiver, CharsetUtil.UTF_8);
log.info("接收到控制指令返回=>{}", receiveStr);
// 设置成redis,15秒时间响应
redisCache.setCacheObject("receiveCmd", receiveStr, 15, TimeUnit.SECONDS);
}
private void handleInboundData(byte[] receiver, String logMessage) {
try {
CommonTopicReceiver commonTopicReceiver = mapper.readValue(receiver, CommonTopicReceiver.class);
log.info("{}: {}", logMessage, commonTopicReceiver);
} catch (IOException e) {
log.error("处理数据时发生错误: ", e);
}
}
}

140
mh-system/src/main/java/com/mh/system/service/mqtt/impl/MqttMsgSenderServiceImpl.java

@ -0,0 +1,140 @@
package com.mh.system.service.mqtt.impl;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.mh.common.constant.TopicConst;
import com.mh.user.model.response.CommonTopicResponse;
import com.mh.user.model.response.ServiceReply;
import com.mh.user.service.mqtt.IMqttMessageGateway;
import com.mh.user.service.mqtt.IMqttMsgSenderService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import java.util.Objects;
import java.util.UUID;
/**
* @author LJF
* @version 1.0
* @project springboot-mqtt-demo
* @description mqtt不同类型发送消息
* @date 2024-10-30 14:30:03
*/
@Slf4j
@Service
public class MqttMsgSenderServiceImpl implements IMqttMsgSenderService {
@Autowired
private IMqttMessageGateway mqttMessageGateway;
@Autowired
private ObjectMapper mapper;
/**
* 发布默认qos为0非持久化
*
* @param pushMessage
* @param topic
*/
@Override
public void publish(String topic, String pushMessage) {
synchronized (this) {//注意:这里一定要同步,否则,在多线程publish的情况下,线程会发生死锁,分析见文章最后补充
try {
mqttMessageGateway.publish(topic, pushMessage, 0);
log.info("发送主题:{},消息:{}", topic, pushMessage);
} catch (Exception e) {
log.error("发送主题异常:{},消息:{}", topic, pushMessage, e);
}
}
}
@Override
public void publish(String topic, CommonTopicResponse response) {
this.publish(topic, 1, response);
}
@Override
public void publish(String topic, int qos, CommonTopicResponse response) {
try {
log.info("发送主题:{},消息:{}", topic, response.toString());
mqttMessageGateway.publish(topic, mapper.writeValueAsBytes(response), qos);
} catch (JsonProcessingException e) {
log.error("发送主题:{},消息:{}", topic, response.toString(), e);
throw new RuntimeException(e);
}
}
@Override
public <T> T publishWithReply(Class<T> clazz, String topic, CommonTopicResponse response) {
return this.publishWithReply(clazz, topic, response, 2);
}
@Override
public <T> T publishWithReply(Class<T> clazz, String topic, CommonTopicResponse response, int retryTime) {
// AtomicInteger time = new AtomicInteger(0);
// // Retry three times
// while (time.getAndIncrement() <= retryTime) {
// this.publish(topic, response);
//
// Chan<CommonTopicReceiver<T>> chan = Chan.getInstance();
// // If the message is not received in 0.5 seconds then resend it again.
// CommonTopicReceiver<T> receiver = chan.get(response.getTid());
//
// // Need to match tid and bid.
// if (Objects.nonNull(receiver) && receiver.getTid().equals(response.getTid()) &&
// receiver.getBid().equals(response.getBid())) {
// if (clazz.isAssignableFrom(receiver.getData().getClass())) {
// return receiver.getData();
// }
// throw new TypeMismatchException(receiver.getData(), clazz);
// }
// // It must be guaranteed that the tid and bid of each message are different.
// response.setBid(UUID.randomUUID().toString());
// response.setTid(UUID.randomUUID().toString());
// }
// throw new RuntimeException("没有消息接收到");
return null;
}
@Override
public <T> ServiceReply<T> publishServicesTopic(TypeReference<T> clazz, String sn, String method, Object data, String bid) {
String topic = TopicConst.THING_MODEL_PRE + TopicConst.PRODUCT + sn + TopicConst.SERVICES_SUF;
ServiceReply reply = this.publishWithReply(ServiceReply.class, topic,
CommonTopicResponse.builder()
.tid(UUID.randomUUID().toString())
.bid(StringUtils.hasText(bid) ? bid : UUID.randomUUID().toString())
.timestamp(System.currentTimeMillis())
.method(method)
.data(Objects.requireNonNull(data, ""))
.build());
if (Objects.isNull(clazz)) {
return reply;
}
// put together in "output"
if (Objects.nonNull(reply.getInfo())) {
reply.setOutput(mapper.convertValue(reply.getInfo(), clazz));
}
if (Objects.nonNull(reply.getOutput())) {
reply.setOutput(mapper.convertValue(reply.getOutput(), clazz));
}
return reply;
}
@Override
public ServiceReply publishServicesTopic(String sn, String method, Object data, String bid) {
return this.publishServicesTopic(null, sn, method, data, bid);
}
@Override
public <T> ServiceReply<T> publishServicesTopic(TypeReference<T> clazz, String sn, String method, Object data) {
return this.publishServicesTopic(clazz, sn, method, data, null);
}
@Override
public ServiceReply publishServicesTopic(String sn, String method, Object data) {
return this.publishServicesTopic(null, sn, method, data, null);
}
}

41
mh-system/src/main/java/com/mh/system/service/mqtt/impl/MqttTopicServiceImpl.java

@ -0,0 +1,41 @@
package com.mh.system.service.mqtt.impl;
import com.mh.user.service.mqtt.IMqttTopicService;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
/**
* @author LJF
* @version 1.0
* @project springboot-mqtt-demo
* @description 订阅主题实现类
* @date 2024-10-29 17:36:31
*/
@Service
public class MqttTopicServiceImpl implements IMqttTopicService {
@Resource
private MqttPahoMessageDrivenChannelAdapter adapter;
@Override
public void subscribe(String topic) {
adapter.addTopic(topic);
}
@Override
public void subscribe(String topic, int qos) {
adapter.addTopic(topic, qos);
}
@Override
public void unsubscribe(String topic) {
adapter.removeTopic(topic);
}
@Override
public String[] getSubscribedTopics() {
return adapter.getTopic();
}
}

25
pom.xml

@ -36,6 +36,9 @@
<jakarta.version>6.0.0</jakarta.version>
<springdoc.version>2.6.0</springdoc.version>
<qrcode.version>3.5.3</qrcode.version>
<mqtt.version>6.4.1</mqtt.version>
<lombok.version>1.18.36</lombok.version>
<message.version>6.2.1</message.version>
</properties>
<!-- 依赖声明 -->
@ -236,6 +239,28 @@
</dependency>
<!--qrcode end-->
<!-- spring-integration-mqtt -->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>${mqtt.version}</version>
</dependency>
<!-- lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
</dependency>
<!-- spring-messaging -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-messaging</artifactId>
<version>${message.version}</version>
</dependency>
</dependencies>
</dependencyManagement>

Loading…
Cancel
Save