diff --git a/mh-admin/src/main/java/com/mh/web/controller/device/OperationController.java b/mh-admin/src/main/java/com/mh/web/controller/device/OperationController.java new file mode 100644 index 0000000..54eab16 --- /dev/null +++ b/mh-admin/src/main/java/com/mh/web/controller/device/OperationController.java @@ -0,0 +1,19 @@ +package com.mh.web.controller.device; + +import com.mh.common.core.controller.BaseController; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +/** + * @author LJF + * @version 1.0 + * @project EEMCS + * @description 设备控制类 + * @date 2025-01-15 14:11:22 + */ +@RestController +@RequestMapping("/device/operation") +public class OperationController extends BaseController { + + +} diff --git a/mh-admin/src/main/java/com/mh/web/controller/mqtt/MqttTopicController.java b/mh-admin/src/main/java/com/mh/web/controller/mqtt/MqttTopicController.java new file mode 100644 index 0000000..07a755f --- /dev/null +++ b/mh-admin/src/main/java/com/mh/web/controller/mqtt/MqttTopicController.java @@ -0,0 +1,63 @@ +package com.mh.web.controller.mqtt; + +import com.mh.system.service.mqtt.IMqttMsgSenderService; +import com.mh.system.service.mqtt.IMqttTopicService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.util.Arrays; + +/** + * @author LJF + * @version 1.0 + * @project springboot-mqtt-demo + * @description 主题消息类 + * @date 2024-10-30 15:05:24 + */ +@RestController +@RequestMapping("/topic") +public class MqttTopicController { + + @Autowired + private IMqttTopicService mqttTopicService; + + @Autowired + private IMqttMsgSenderService mqttMsgSenderService; + + @GetMapping("/add") + public String add(String topic) { + mqttTopicService.subscribe(topic); + return "添加主题:" + topic + "成功"; + } + + @GetMapping("/del") + public String del(String topic) { + mqttTopicService.unsubscribe(topic); + return "删除主题:" + topic + "成功"; + } + + @GetMapping("/list") + public String list() { + return "已订阅的主题:" + String.join(",", mqttTopicService.getSubscribedTopics()); + } + + @GetMapping("/publish") + public String publish(String topic, String pushMessage) { + String[] subscribedTopics = mqttTopicService.getSubscribedTopics(); + boolean exists = Arrays.stream(subscribedTopics).anyMatch(val -> val.equals(topic)); + if (!exists) { + return "主题不存在"; + } + mqttMsgSenderService.publish(topic, pushMessage); + return "发布主题:" + topic + "成功"; + } + + @GetMapping("/reply") + public String reply(String topic, String pushMessage) { + mqttMsgSenderService.publish(topic, pushMessage); + return "回复主题:" + topic + "成功"; + } + +} diff --git a/mh-admin/src/main/java/com/mh/web/controller/operation/AlarmCodeController.java b/mh-admin/src/main/java/com/mh/web/controller/operate/AlarmCodeController.java similarity index 97% rename from mh-admin/src/main/java/com/mh/web/controller/operation/AlarmCodeController.java rename to mh-admin/src/main/java/com/mh/web/controller/operate/AlarmCodeController.java index 48ec9cd..52d5983 100644 --- a/mh-admin/src/main/java/com/mh/web/controller/operation/AlarmCodeController.java +++ b/mh-admin/src/main/java/com/mh/web/controller/operate/AlarmCodeController.java @@ -1,4 +1,4 @@ -package com.mh.web.controller.operation; +package com.mh.web.controller.operate; import com.mh.common.annotation.Log; import com.mh.common.core.controller.BaseController; @@ -22,7 +22,7 @@ import java.util.List; * @date 2025-01-14 16:40:58 */ @RestController -@RequestMapping("/operation/ac") +@RequestMapping("/operate/ac") public class AlarmCodeController extends BaseController { @Autowired diff --git a/mh-admin/src/main/java/com/mh/web/controller/operation/AlarmRulesController.java b/mh-admin/src/main/java/com/mh/web/controller/operate/AlarmRulesController.java similarity index 97% rename from mh-admin/src/main/java/com/mh/web/controller/operation/AlarmRulesController.java rename to mh-admin/src/main/java/com/mh/web/controller/operate/AlarmRulesController.java index 221446e..bc114a1 100644 --- a/mh-admin/src/main/java/com/mh/web/controller/operation/AlarmRulesController.java +++ b/mh-admin/src/main/java/com/mh/web/controller/operate/AlarmRulesController.java @@ -1,4 +1,4 @@ -package com.mh.web.controller.operation; +package com.mh.web.controller.operate; import com.mh.common.annotation.Log; import com.mh.common.core.controller.BaseController; @@ -22,7 +22,7 @@ import java.util.List; * @date 2025-01-14 17:32:55 */ @RestController -@RequestMapping("/operation/ar") +@RequestMapping("/operate/ar") public class AlarmRulesController extends BaseController { @Autowired diff --git a/mh-admin/src/main/resources/application-dev.yml b/mh-admin/src/main/resources/application-dev.yml new file mode 100644 index 0000000..358af37 --- /dev/null +++ b/mh-admin/src/main/resources/application-dev.yml @@ -0,0 +1,137 @@ +# 项目相关配置 +mh: + # 名称 + name: MH + # 版本 + version: 1.0.0 + # 版权年份 + copyrightYear: 2024 + # 文件路径 示例( Windows配置D:/mh/uploadPath,Linux配置 /home/mh/uploadPath) + profile: D:/mh/uploadPath + # 获取ip地址开关 + addressEnabled: false + # 验证码类型 math 数字计算 char 字符验证 + captchaType: math + +# 开发环境配置 +server: + # 服务器的HTTP端口,默认为8080 + port: 8080 + servlet: + # 应用的访问路径 + context-path: / + tomcat: + # tomcat的URI编码 + uri-encoding: UTF-8 + # 连接数满后的排队数,默认为100 + accept-count: 1000 + threads: + # tomcat最大线程数,默认为200 + max: 800 + # Tomcat启动初始化的线程数,默认值10 + min-spare: 100 + +# 日志配置 +logging: + level: + com.mh: debug + org.springframework: warn + +# Spring配置 +spring: + # 资源信息 + messages: + # 国际化资源文件路径 + basename: i18n/messages + profiles: + active: druid + # 文件上传 + servlet: + multipart: + # 单个文件大小 + max-file-size: 10MB + # 设置总上传的文件大小 + max-request-size: 20MB + # 服务模块 + devtools: + restart: + # 热部署开关 + enabled: true + data: + # redis 配置 + redis: + # 地址 + host: localhost + # 端口,默认为6379 + port: 6379 + # 数据库索引 + database: 0 + # 密码 + password: + # 连接超时时间 + timeout: 10s + lettuce: + pool: + # 连接池中的最小空闲连接 + min-idle: 0 + # 连接池中的最大空闲连接 + max-idle: 8 + # 连接池的最大数据库连接数 + max-active: 8 + # #连接池最大阻塞等待时间(使用负值表示没有限制) + max-wait: -1ms + +# MyBatis配置 +mybatis-plus: + # 搜索指定包别名 + type-aliases-package: com.mh.**.domain + # 配置mapper的扫描,找到所有的mapper.xml映射文件 + mapper-locations: classpath*:mapper/**/*Mapper.xml + # 加载全局的配置文件 + config-location: classpath:mybatis/mybatis-config.xml + configuration: + map-underscore-to-camel-case: true + +# Springdoc配置 +springdoc: + api-docs: + path: /v3/api-docs + swagger-ui: + enabled: true + path: /swagger-ui.html + tags-sorter: alpha + group-configs: + - group: 'default' + display-name: '测试模块' + paths-to-match: '/**' + packages-to-scan: com.mh.web.controller.tool + +# 防止XSS攻击 +xss: + # 过滤开关 + enabled: true + # 排除链接(多个用逗号分隔) + excludes: /system/notice + # 匹配链接 + urlPatterns: /system/*,/monitor/*,/tool/* + +mqttSpring: + # BASIC parameters are required. + BASIC: + protocol: MQTT + host: 127.0.0.1 + port: 2883 + username: mh + password: mhtech@803 + client-id: mqtt_mz_producer_dev + # If the protocol is ws/wss, this value is required. + path: + # Topics that need to be subscribed when initially connecting to mqtt, multiple topics are divided by ",". + inbound-topic: A/cmd/ctl/receive1 + # 此部分是提供给后端生成token返回给前端,让前端使用websocket方式去和MQTT实现交互的,笔者此文的案例中并没有去实现 + # 无人机远程控制模式(drone remote control) + DRC: + protocol: WS + host: 127.0.0.1 + port: 8083 + path: /mqtt \ No newline at end of file diff --git a/mh-admin/src/main/resources/application-prod.yml b/mh-admin/src/main/resources/application-prod.yml new file mode 100644 index 0000000..b6b4c74 --- /dev/null +++ b/mh-admin/src/main/resources/application-prod.yml @@ -0,0 +1,137 @@ +# 项目相关配置 +mh: + # 名称 + name: MH + # 版本 + version: 1.0.0 + # 版权年份 + copyrightYear: 2024 + # 文件路径 示例( Windows配置D:/mh/uploadPath,Linux配置 /home/mh/uploadPath) + profile: D:/mh/uploadPath + # 获取ip地址开关 + addressEnabled: false + # 验证码类型 math 数字计算 char 字符验证 + captchaType: math + +# 开发环境配置 +server: + # 服务器的HTTP端口,默认为8080 + port: 8080 + servlet: + # 应用的访问路径 + context-path: / + tomcat: + # tomcat的URI编码 + uri-encoding: UTF-8 + # 连接数满后的排队数,默认为100 + accept-count: 1000 + threads: + # tomcat最大线程数,默认为200 + max: 800 + # Tomcat启动初始化的线程数,默认值10 + min-spare: 100 + +# 日志配置 +logging: + level: + com.mh: debug + org.springframework: warn + +# Spring配置 +spring: + # 资源信息 + messages: + # 国际化资源文件路径 + basename: i18n/messages + profiles: + active: druid + # 文件上传 + servlet: + multipart: + # 单个文件大小 + max-file-size: 10MB + # 设置总上传的文件大小 + max-request-size: 20MB + # 服务模块 + devtools: + restart: + # 热部署开关 + enabled: true + data: + # redis 配置 + redis: + # 地址 + host: localhost + # 端口,默认为6379 + port: 6379 + # 数据库索引 + database: 0 + # 密码 + password: + # 连接超时时间 + timeout: 10s + lettuce: + pool: + # 连接池中的最小空闲连接 + min-idle: 0 + # 连接池中的最大空闲连接 + max-idle: 8 + # 连接池的最大数据库连接数 + max-active: 8 + # #连接池最大阻塞等待时间(使用负值表示没有限制) + max-wait: -1ms + +# MyBatis配置 +mybatis-plus: + # 搜索指定包别名 + type-aliases-package: com.mh.**.domain + # 配置mapper的扫描,找到所有的mapper.xml映射文件 + mapper-locations: classpath*:mapper/**/*Mapper.xml + # 加载全局的配置文件 + config-location: classpath:mybatis/mybatis-config.xml + configuration: + map-underscore-to-camel-case: true + +# Springdoc配置 +springdoc: + api-docs: + path: /v3/api-docs + swagger-ui: + enabled: true + path: /swagger-ui.html + tags-sorter: alpha + group-configs: + - group: 'default' + display-name: '测试模块' + paths-to-match: '/**' + packages-to-scan: com.mh.web.controller.tool + +# 防止XSS攻击 +xss: + # 过滤开关 + enabled: true + # 排除链接(多个用逗号分隔) + excludes: /system/notice + # 匹配链接 + urlPatterns: /system/*,/monitor/*,/tool/* + +mqttSpring: + # BASIC parameters are required. + BASIC: + protocol: MQTT + host: 127.0.0.1 + port: 2883 + username: mh + password: mhtech@803 + client-id: mqtt_mz_producer_prod + # If the protocol is ws/wss, this value is required. + path: + # Topics that need to be subscribed when initially connecting to mqtt, multiple topics are divided by ",". + inbound-topic: A/cmd/ctl/receive1 + # 此部分是提供给后端生成token返回给前端,让前端使用websocket方式去和MQTT实现交互的,笔者此文的案例中并没有去实现 + # 无人机远程控制模式(drone remote control) + DRC: + protocol: WS + host: 127.0.0.1 + port: 8083 + path: /mqtt \ No newline at end of file diff --git a/mh-admin/src/main/resources/application-test.yml b/mh-admin/src/main/resources/application-test.yml new file mode 100644 index 0000000..e5683a1 --- /dev/null +++ b/mh-admin/src/main/resources/application-test.yml @@ -0,0 +1,137 @@ +# 项目相关配置 +mh: + # 名称 + name: MH + # 版本 + version: 1.0.0 + # 版权年份 + copyrightYear: 2024 + # 文件路径 示例( Windows配置D:/mh/uploadPath,Linux配置 /home/mh/uploadPath) + profile: D:/mh/uploadPath + # 获取ip地址开关 + addressEnabled: false + # 验证码类型 math 数字计算 char 字符验证 + captchaType: math + +# 开发环境配置 +server: + # 服务器的HTTP端口,默认为8080 + port: 8080 + servlet: + # 应用的访问路径 + context-path: / + tomcat: + # tomcat的URI编码 + uri-encoding: UTF-8 + # 连接数满后的排队数,默认为100 + accept-count: 1000 + threads: + # tomcat最大线程数,默认为200 + max: 800 + # Tomcat启动初始化的线程数,默认值10 + min-spare: 100 + +# 日志配置 +logging: + level: + com.mh: debug + org.springframework: warn + +# Spring配置 +spring: + # 资源信息 + messages: + # 国际化资源文件路径 + basename: i18n/messages + profiles: + active: druid + # 文件上传 + servlet: + multipart: + # 单个文件大小 + max-file-size: 10MB + # 设置总上传的文件大小 + max-request-size: 20MB + # 服务模块 + devtools: + restart: + # 热部署开关 + enabled: true + data: + # redis 配置 + redis: + # 地址 + host: localhost + # 端口,默认为6379 + port: 6379 + # 数据库索引 + database: 0 + # 密码 + password: + # 连接超时时间 + timeout: 10s + lettuce: + pool: + # 连接池中的最小空闲连接 + min-idle: 0 + # 连接池中的最大空闲连接 + max-idle: 8 + # 连接池的最大数据库连接数 + max-active: 8 + # #连接池最大阻塞等待时间(使用负值表示没有限制) + max-wait: -1ms + +# MyBatis配置 +mybatis-plus: + # 搜索指定包别名 + type-aliases-package: com.mh.**.domain + # 配置mapper的扫描,找到所有的mapper.xml映射文件 + mapper-locations: classpath*:mapper/**/*Mapper.xml + # 加载全局的配置文件 + config-location: classpath:mybatis/mybatis-config.xml + configuration: + map-underscore-to-camel-case: true + +# Springdoc配置 +springdoc: + api-docs: + path: /v3/api-docs + swagger-ui: + enabled: true + path: /swagger-ui.html + tags-sorter: alpha + group-configs: + - group: 'default' + display-name: '测试模块' + paths-to-match: '/**' + packages-to-scan: com.mh.web.controller.tool + +# 防止XSS攻击 +xss: + # 过滤开关 + enabled: true + # 排除链接(多个用逗号分隔) + excludes: /system/notice + # 匹配链接 + urlPatterns: /system/*,/monitor/*,/tool/* + +mqttSpring: + # BASIC parameters are required. + BASIC: + protocol: MQTT + host: 127.0.0.1 + port: 2883 + username: mh + password: mhtech@803 + client-id: mqtt_mz_test_dev + # If the protocol is ws/wss, this value is required. + path: + # Topics that need to be subscribed when initially connecting to mqtt, multiple topics are divided by ",". + inbound-topic: A/cmd/ctl/receive1 + # 此部分是提供给后端生成token返回给前端,让前端使用websocket方式去和MQTT实现交互的,笔者此文的案例中并没有去实现 + # 无人机远程控制模式(drone remote control) + DRC: + protocol: WS + host: 127.0.0.1 + port: 8083 + path: /mqtt \ No newline at end of file diff --git a/mh-admin/src/main/resources/application.yml b/mh-admin/src/main/resources/application.yml index 35ee359..c3ae7c3 100644 --- a/mh-admin/src/main/resources/application.yml +++ b/mh-admin/src/main/resources/application.yml @@ -1,41 +1,6 @@ -# 项目相关配置 -mh: - # 名称 - name: MH - # 版本 - version: 1.0.0 - # 版权年份 - copyrightYear: 2024 - # 文件路径 示例( Windows配置D:/mh/uploadPath,Linux配置 /home/mh/uploadPath) - profile: D:/mh/uploadPath - # 获取ip地址开关 - addressEnabled: false - # 验证码类型 math 数字计算 char 字符验证 - captchaType: math - -# 开发环境配置 -server: - # 服务器的HTTP端口,默认为8080 - port: 8080 - servlet: - # 应用的访问路径 - context-path: / - tomcat: - # tomcat的URI编码 - uri-encoding: UTF-8 - # 连接数满后的排队数,默认为100 - accept-count: 1000 - threads: - # tomcat最大线程数,默认为200 - max: 800 - # Tomcat启动初始化的线程数,默认值10 - min-spare: 100 - -# 日志配置 -logging: - level: - com.mh: debug - org.springframework: warn +spring: + profiles: + active: dev # 用户配置 user: @@ -45,50 +10,6 @@ user: # 密码锁定时间(默认10分钟) lockTime: 10 -# Spring配置 -spring: - # 资源信息 - messages: - # 国际化资源文件路径 - basename: i18n/messages - profiles: - active: druid - # 文件上传 - servlet: - multipart: - # 单个文件大小 - max-file-size: 10MB - # 设置总上传的文件大小 - max-request-size: 20MB - # 服务模块 - devtools: - restart: - # 热部署开关 - enabled: true - data: - # redis 配置 - redis: - # 地址 - host: localhost - # 端口,默认为6379 - port: 6379 - # 数据库索引 - database: 0 - # 密码 - password: - # 连接超时时间 - timeout: 10s - lettuce: - pool: - # 连接池中的最小空闲连接 - min-idle: 0 - # 连接池中的最大空闲连接 - max-idle: 8 - # 连接池的最大数据库连接数 - max-active: 8 - # #连接池最大阻塞等待时间(使用负值表示没有限制) - max-wait: -1ms - # token配置 token: # 令牌自定义标识 @@ -98,43 +19,9 @@ token: # 令牌有效期(默认30分钟) expireTime: 30 -# MyBatis配置 -mybatis-plus: - # 搜索指定包别名 - type-aliases-package: com.mh.**.domain - # 配置mapper的扫描,找到所有的mapper.xml映射文件 - mapper-locations: classpath*:mapper/**/*Mapper.xml - # 加载全局的配置文件 - config-location: classpath:mybatis/mybatis-config.xml - configuration: - map-underscore-to-camel-case: true - # PageHelper分页插件 pagehelper: reasonable: true params: count=countSql support-methods-arguments: true - helper-dialect: postgresql - -# Springdoc配置 -springdoc: - api-docs: - path: /v3/api-docs - swagger-ui: - enabled: true - path: /swagger-ui.html - tags-sorter: alpha - group-configs: - - group: 'default' - display-name: '测试模块' - paths-to-match: '/**' - packages-to-scan: com.mh.web.controller.tool - -# 防止XSS攻击 -xss: - # 过滤开关 - enabled: true - # 排除链接(多个用逗号分隔) - excludes: /system/notice - # 匹配链接 - urlPatterns: /system/*,/monitor/*,/tool/* + helper-dialect: postgresql \ No newline at end of file diff --git a/mh-common/pom.xml b/mh-common/pom.xml index 8eb1725..6769ec2 100644 --- a/mh-common/pom.xml +++ b/mh-common/pom.xml @@ -140,6 +140,21 @@ core + + org.springframework.integration + spring-integration-mqtt + + + + org.springframework + spring-messaging + + + + org.projectlombok + lombok + + \ No newline at end of file diff --git a/mh-common/src/main/java/com/mh/common/config/mqtt/MqttConfig.java b/mh-common/src/main/java/com/mh/common/config/mqtt/MqttConfig.java new file mode 100644 index 0000000..d06a3cc --- /dev/null +++ b/mh-common/src/main/java/com/mh/common/config/mqtt/MqttConfig.java @@ -0,0 +1,99 @@ +package com.mh.common.config.mqtt; + +import com.mh.common.enums.MqttClientOptions; +import com.mh.common.enums.MqttProtocolEnum; +import com.mh.common.enums.MqttUseEnum; +import lombok.Data; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; +import org.springframework.integration.mqtt.core.MqttPahoClientFactory; +import org.springframework.util.StringUtils; + +import java.util.Map; + +/** + * @author LJF + * @version 1.0 + * @project springboot-mqtt-demo + * @description mqtt连接配置 + * @date 2024-10-29 14:44:51 + */ +@Configuration +@Data +@ConfigurationProperties +public class MqttConfig { + + private static Map mqttSpring; + + public void setMqttSpring(Map mqtt) { + MqttConfig.mqttSpring = mqtt; + } + + /** + * 获取mqtt基本配置 + * @return + */ + static MqttClientOptions getBasicMqttClientOptions() { + if (!mqttSpring.containsKey(MqttUseEnum.BASIC)) { + throw new Error("请先配置MQTT的基本连接参数,否则无法启动项目"); + } + return mqttSpring.get(MqttUseEnum.BASIC); + } + + /** + * 拼接获取对应mqtt的连接地址 + * @param options + * @return + */ + public static String getMqttAddress(MqttClientOptions options) { + StringBuilder addr = new StringBuilder(); + addr.append(options.getProtocol().getProtocolAddr()) + .append(options.getHost().trim()) + .append(":") + .append(options.getPort()); + if ((options.getProtocol() == MqttProtocolEnum.WS || options.getProtocol() == MqttProtocolEnum.WSS) + && StringUtils.hasText(options.getPath())) { + addr.append(options.getPath()); + } + return addr.toString(); + } + + public static String getBasicMqttAddress() { + return getMqttAddress(getBasicMqttClientOptions()); + } + + /** + * 获取连接参数,注入到spring中 + * @return + */ + @Bean + public MqttConnectOptions mqttConnectionOptions() { + + MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); + + MqttClientOptions customizeOptions = getBasicMqttClientOptions(); + String basicMqttAddress = getBasicMqttAddress(); + mqttConnectOptions.setServerURIs(new String[]{basicMqttAddress}); + mqttConnectOptions.setUserName(StringUtils.hasText(customizeOptions.getUsername()) ? + customizeOptions.getUsername() : ""); + mqttConnectOptions.setPassword(StringUtils.hasText(customizeOptions.getPassword()) ? + customizeOptions.getPassword().toCharArray() : new char[0]); + // 直接进行自动连接 + mqttConnectOptions.setAutomaticReconnect(true); + // 时间间隔时间10s + mqttConnectOptions.setKeepAliveInterval(10); + + return mqttConnectOptions; + } + + @Bean + public MqttPahoClientFactory mqttClientFactory() { + DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); + factory.setConnectionOptions(mqttConnectionOptions()); + return factory; + } + +} diff --git a/mh-common/src/main/java/com/mh/common/config/mqtt/MqttInboundConfig.java b/mh-common/src/main/java/com/mh/common/config/mqtt/MqttInboundConfig.java new file mode 100644 index 0000000..4ef5a26 --- /dev/null +++ b/mh-common/src/main/java/com/mh/common/config/mqtt/MqttInboundConfig.java @@ -0,0 +1,84 @@ +package com.mh.common.config.mqtt; + +import com.mh.common.constant.ChannelName; +import com.mh.common.enums.MqttClientOptions; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.annotation.IntegrationComponentScan; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.endpoint.MessageProducerSupport; +import org.springframework.integration.mqtt.core.MqttPahoClientFactory; +import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; +import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; +import org.springframework.integration.mqtt.support.MqttHeaders; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHandler; + +/** + * @author LJF + * @version 1.0 + * @project springboot-mqtt-demo + * @description 入站配置 + * @date 2024-10-29 16:22:10 + */ +@Slf4j +@Configuration +@IntegrationComponentScan +public class MqttInboundConfig { + + @Autowired + private MqttPahoClientFactory mqttClientFactory; + + @Resource(name = ChannelName.INBOUND) + private MessageChannel inboundChannel; + + /** + * 入站适配器 + * @return + */ + @Bean(name = "adapter") + public MessageProducerSupport mqttInbound() { + MqttClientOptions options = MqttConfig.getBasicMqttClientOptions(); + // 此处初始化的时候,默认订阅了配置文件中已经写好的topic + // 如果需要订阅多个,可以自己手动订阅,会写一个addTopic()进行添加订阅 + MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter( + options.getClientId() + "_consumer_" + System.currentTimeMillis(), + mqttClientFactory, + options.getInboundTopic().split(",")); + + DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter(); + // 统一是字节处理 + converter.setPayloadAsBytes(true); + // 设置消息转换器 + adapter.setConverter(converter); + // 设置qos(quality of service) + // 0:最多一次传输(消息会丢失), + // 1:至少一次传输(消息会重复), + // 2:只有当消息发送成功时才确认(消息不回丢,但延迟高)。 + adapter.setQos(0); + // 设置在接收已经订阅的主题信息后,发送给哪个通道,具体的发送方法需要翻上层的抽象类 + adapter.setOutputChannel(inboundChannel); + return adapter; + } + + /** + * 默认声明一个消息处理器,用于处理无效的消息 + * @return + */ + @Bean + @ServiceActivator(inputChannel = ChannelName.DEFAULT) + public MessageHandler handler() { + return message -> { + log.info("The default channel does not handle messages." + + "\nTopic: {}" + + "\nPayload: {}", + message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC), + message.getPayload()); + }; + } + + +} diff --git a/mh-common/src/main/java/com/mh/common/config/mqtt/MqttMessageChannel.java b/mh-common/src/main/java/com/mh/common/config/mqtt/MqttMessageChannel.java new file mode 100644 index 0000000..c3e26bb --- /dev/null +++ b/mh-common/src/main/java/com/mh/common/config/mqtt/MqttMessageChannel.java @@ -0,0 +1,53 @@ +package com.mh.common.config.mqtt; + +import com.mh.common.constant.ChannelName; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.channel.ExecutorChannel; +import org.springframework.messaging.MessageChannel; + +import java.util.concurrent.Executor; + +/** + * @author LJF + * @version 1.0 + * @project springboot-mqtt-demo + * @description 声明所有通道的定义类 + * @date 2024-10-29 16:23:32 + */ +@Slf4j +@Configuration +public class MqttMessageChannel { + + @Autowired + private Executor threadPool; + + @Bean(name = ChannelName.INBOUND) + public MessageChannel inboundChannel() { + return new ExecutorChannel(threadPool); + } + + @Bean(name = ChannelName.EVENTS_UPLOAD_INBOUND) + public MessageChannel eventsUploadInbound() { + return new DirectChannel(); + } + + @Bean(name = ChannelName.EVENTS_COLLECTION_INBOUND) + public MessageChannel eventsCollectionInbound() { + return new DirectChannel(); + } + + @Bean(name = ChannelName.EVENTS_CONTROL_INBOUND) + public MessageChannel eventsControlInbound() { + return new DirectChannel(); + } + + @Bean(name = ChannelName.EVENTS_SEND_INBOUND) + public MessageChannel eventsSendInbound() { + return new DirectChannel(); + } + +} diff --git a/mh-common/src/main/java/com/mh/common/config/mqtt/MqttOutboundConfig.java b/mh-common/src/main/java/com/mh/common/config/mqtt/MqttOutboundConfig.java new file mode 100644 index 0000000..287e0b9 --- /dev/null +++ b/mh-common/src/main/java/com/mh/common/config/mqtt/MqttOutboundConfig.java @@ -0,0 +1,51 @@ +package com.mh.common.config.mqtt; + +import com.mh.common.constant.ChannelName; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.annotation.IntegrationComponentScan; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.mqtt.core.MqttPahoClientFactory; +import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; +import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; +import org.springframework.messaging.MessageHandler; + +/** + * @author LJF + * @version 1.0 + * @project springboot-mqtt-demo + * @description 入站配置 + * @date 2024-10-29 16:22:10 + */ +@Slf4j +@Configuration +@IntegrationComponentScan +public class MqttOutboundConfig { + + @Autowired + private MqttPahoClientFactory mqttClientFactory; + + /** + * 默认声明一个出站处理器,用于处理无效的消息 + * @return + */ + @Bean + @ServiceActivator(inputChannel = ChannelName.OUTBOUND) + public MessageHandler mqttOutbound() { + MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler( + MqttConfig.getBasicMqttClientOptions().getClientId() + "_producer_" + System.currentTimeMillis(), + mqttClientFactory); + DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter(); + // use byte types uniformly + converter.setPayloadAsBytes(true); + + messageHandler.setAsync(true); + messageHandler.setDefaultQos(0); + messageHandler.setConverter(converter); + return messageHandler; + } + + +} diff --git a/mh-common/src/main/java/com/mh/common/constant/ChannelName.java b/mh-common/src/main/java/com/mh/common/constant/ChannelName.java new file mode 100644 index 0000000..8c94500 --- /dev/null +++ b/mh-common/src/main/java/com/mh/common/constant/ChannelName.java @@ -0,0 +1,59 @@ +package com.mh.common.constant; + +/** + * @author LJF + * @version 1.0 + * @project springboot-mqtt-demo + * @description 声明所有通道 + * @date 2024-10-29 16:04:19 + */ +public class ChannelName { + + /** + * 默认通道名称(防止出错) + */ + public static final String DEFAULT = "default"; + + /** + * 主动上报入站 + */ + public static final String INBOUND = "inbound"; + + /** + * 出站 + */ + public static final String OUTBOUND = "outbound"; + + + /** + * 入站主动上报 + */ + public static final String EVENTS_UPLOAD_INBOUND = "events_upload_inbound"; + + /** + * 入站主动采集 + */ + public static final String EVENTS_COLLECTION_INBOUND = "events_collection_inbound"; + + /** + * 入站主动控制 + */ + public static final String EVENTS_CONTROL_INBOUND = "events_control_inbound"; + + /** + * 默认进站处理 + */ + public static final String EVENTS_DEFAULT_INBOUND = "events_default_inbound"; + + public static final String REPLY_EVENTS_OUTBOUND = "reply_events_outbound"; + + /** + * 新珠江收到的信息 + */ + public static final String EVENTS_RECEIVE_INBOUND = "events_receive_inbound"; + + /** + * 接收服务端的数据报文 + */ + public static final String EVENTS_SEND_INBOUND = "events_send_inbound"; +} diff --git a/mh-common/src/main/java/com/mh/common/constant/TopicConst.java b/mh-common/src/main/java/com/mh/common/constant/TopicConst.java new file mode 100644 index 0000000..daec78d --- /dev/null +++ b/mh-common/src/main/java/com/mh/common/constant/TopicConst.java @@ -0,0 +1,31 @@ +package com.mh.common.constant; + +/** + * All the topics that need to be used in the project. + * + * @author sean.zhou + * @version 0.1 + * @date 2021/11/10 + */ +public class TopicConst { + + public static final String MH_UPLOAD = "mh_upload/"; + + public static final String EVENTS_UPLOAD = "events_upload/"; + + public static final String MH_COLLECTION = "mh_collection/"; + + public static final String EVENTS_COLLECTION = "events_collection/"; + + public static final String MH_CONTROL = "mh_control/"; + + public static final String EVENTS_CONTROL = "events_control/"; + + public static final String REGEX_SN = "[A-Za-z0-9]+"; + + public static final String THING_MODEL_PRE = "thing/"; + + public static final String PRODUCT = "product/"; + + public static final String SERVICES_SUF = "/services"; +} diff --git a/mh-common/src/main/java/com/mh/common/enums/MqttClientOptions.java b/mh-common/src/main/java/com/mh/common/enums/MqttClientOptions.java new file mode 100644 index 0000000..88cfaff --- /dev/null +++ b/mh-common/src/main/java/com/mh/common/enums/MqttClientOptions.java @@ -0,0 +1,108 @@ +package com.mh.common.enums; + +/** + * @author LJF + * @version 1.0 + * @project springboot-mqtt-demo + * @description mqtt连接的参数 + * @date 2024-10-29 14:46:24 + */ +public class MqttClientOptions { + + private MqttProtocolEnum protocol; + + private String host; + + private Integer port; + + private String username; + + private String password; + + private String clientId; + + private String path; + + /** + * 客户端连接的时候,订阅的主题 + */ + private String inboundTopic; + + public MqttProtocolEnum getProtocol() { + return protocol; + } + + public void setProtocol(MqttProtocolEnum protocol) { + this.protocol = protocol; + } + + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + + public Integer getPort() { + return port; + } + + public void setPort(Integer port) { + this.port = port; + } + + public String getUsername() { + return username; + } + + public void setUsername(String username) { + this.username = username; + } + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } + + public String getClientId() { + return clientId; + } + + public void setClientId(String clientId) { + this.clientId = clientId; + } + + public String getPath() { + return path; + } + + public void setPath(String path) { + this.path = path; + } + + public String getInboundTopic() { + return inboundTopic; + } + + public void setInboundTopic(String inboundTopic) { + this.inboundTopic = inboundTopic; + } + + @Override + public String toString() { + return "MqttClientOptions{" + + "protocol=" + protocol + + ", host='" + host + '\'' + + ", port=" + port + + ", username='" + username + '\'' + + ", password='" + password + '\'' + + ", clientId='" + clientId + '\'' + + ", path='" + path + '\'' + + ", inboundTopic='" + inboundTopic + '\'' + + '}'; + } +} diff --git a/mh-common/src/main/java/com/mh/common/enums/MqttProtocolEnum.java b/mh-common/src/main/java/com/mh/common/enums/MqttProtocolEnum.java new file mode 100644 index 0000000..f980f7e --- /dev/null +++ b/mh-common/src/main/java/com/mh/common/enums/MqttProtocolEnum.java @@ -0,0 +1,33 @@ +package com.mh.common.enums; + +/** + * @author LJF + * @version 1.0 + * @project springboot-mqtt-demo + * @description 采用哪种协议进行数据交互 + * @date 2024-10-29 15:21:07 + */ +public enum MqttProtocolEnum { + + MQTT("tcp"), + + MQTTS("tcp"), + + WS("ws"), + + WSS("wss"); + + final String protocol; + + MqttProtocolEnum(String protocol) { + this.protocol = protocol; + } + + public String getProtocolAddr() { + return protocol + "://"; + } + + public String getProtocol() { + return protocol; + } +} diff --git a/mh-common/src/main/java/com/mh/common/enums/MqttUseEnum.java b/mh-common/src/main/java/com/mh/common/enums/MqttUseEnum.java new file mode 100644 index 0000000..4262a64 --- /dev/null +++ b/mh-common/src/main/java/com/mh/common/enums/MqttUseEnum.java @@ -0,0 +1,16 @@ +package com.mh.common.enums; + +/** + * @author LJF + * @version 1.0 + * @project springboot-mqtt-demo + * @description mqtt选择模式 + * @date 2024-10-29 15:19:21 + */ +public enum MqttUseEnum { + + BASIC, + + DRC + +} diff --git a/mh-common/src/main/java/com/mh/common/model/request/CommonTopicReceiver.java b/mh-common/src/main/java/com/mh/common/model/request/CommonTopicReceiver.java new file mode 100644 index 0000000..a1c47ab --- /dev/null +++ b/mh-common/src/main/java/com/mh/common/model/request/CommonTopicReceiver.java @@ -0,0 +1,35 @@ +package com.mh.common.model.request; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.Data; + +/** + * Unified topic receiving format. + * + * @author sean.zhou + * @version 0.1 + * @date 2021/11/10 + */ +@Data +@JsonIgnoreProperties(ignoreUnknown = true) +public class CommonTopicReceiver { + + /** + * The command is sent and the response is matched by the tid and bid fields in the message, + * and the reply should keep the tid and bid the same. + */ + private String tid; + + private String bid; + + private String method; + + private Long timestamp; + + private T data; + + private String gateway; + + private Integer needReply; + +} diff --git a/mh-common/src/main/java/com/mh/common/model/response/CommonTopicResponse.java b/mh-common/src/main/java/com/mh/common/model/response/CommonTopicResponse.java new file mode 100644 index 0000000..9ab05ac --- /dev/null +++ b/mh-common/src/main/java/com/mh/common/model/response/CommonTopicResponse.java @@ -0,0 +1,36 @@ +package com.mh.common.model.response; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * Unified Topic response format + * + * @author sean.zhou + * @version 0.1 + * @date 2021/11/15 + */ +@Data +@AllArgsConstructor +@NoArgsConstructor +@Builder +@JsonIgnoreProperties(ignoreUnknown = true) +public class CommonTopicResponse { + + /** + * The command is sent and the response is matched by the tid and bid fields in the message, + * and the reply should keep the tid and bid the same. + */ + private String tid; + + private String bid; + + private String method; + + private T data; + + private Long timestamp; +} diff --git a/mh-common/src/main/java/com/mh/common/model/response/ServiceReply.java b/mh-common/src/main/java/com/mh/common/model/response/ServiceReply.java new file mode 100644 index 0000000..7072e48 --- /dev/null +++ b/mh-common/src/main/java/com/mh/common/model/response/ServiceReply.java @@ -0,0 +1,20 @@ +package com.mh.common.model.response; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.Data; + +/** + * @author sean.zhou + * @version 0.1 + * @date 2021/11/22 + */ +@Data +@JsonIgnoreProperties(ignoreUnknown = true) +public class ServiceReply { + + private Integer result; + + private T info; + + private T output; +} diff --git a/mh-system/src/main/java/com/mh/system/service/mqtt/IEventsService.java b/mh-system/src/main/java/com/mh/system/service/mqtt/IEventsService.java new file mode 100644 index 0000000..18ffdb5 --- /dev/null +++ b/mh-system/src/main/java/com/mh/system/service/mqtt/IEventsService.java @@ -0,0 +1,49 @@ +package com.mh.system.service.mqtt; + +import org.springframework.messaging.MessageHeaders; + +import java.io.IOException; + +/** + * @author LJF + * @version 1.0 + * @project springboot-mqtt-demo + * @description 通道处理类 + * @date 2024-11-05 11:30:26 + */ +public interface IEventsService { + + /** + * 处理主动上报 + * @param receiver + * @param headers + * @return + */ + void handleInboundUpload(byte[] receiver, MessageHeaders headers) throws IOException; + + + /** + * 处理服务器主动采集上报 + * @param receiver + * @param headers + * @return + */ + void handleInboundCollection(byte[] receiver, MessageHeaders headers) throws IOException; + + /** + * 处理控制上报 + * @param receiver + * @param headers + * @return + */ + void handleInboundControl(byte[] receiver, MessageHeaders headers) throws IOException; + + /** + * 处理接收上报 + * @param receiver + * @param headers + * @throws IOException + */ + void handleInboundReceive(byte[] receiver, MessageHeaders headers) throws IOException; + +} diff --git a/mh-system/src/main/java/com/mh/system/service/mqtt/IMqttMessageGateway.java b/mh-system/src/main/java/com/mh/system/service/mqtt/IMqttMessageGateway.java new file mode 100644 index 0000000..58a1a67 --- /dev/null +++ b/mh-system/src/main/java/com/mh/system/service/mqtt/IMqttMessageGateway.java @@ -0,0 +1,41 @@ +package com.mh.system.service.mqtt; + +import com.mh.common.constant.ChannelName; +import org.springframework.integration.annotation.MessagingGateway; +import org.springframework.integration.mqtt.support.MqttHeaders; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.stereotype.Component; + +/** + * @author LJF + * @version 1.0 + * @project springboot-mqtt-demo + * @description 消息网关 + * @date 2024-10-30 14:43:55 + */ +@Component +@MessagingGateway(defaultRequestChannel = ChannelName.OUTBOUND) +public interface IMqttMessageGateway { + + /** + * 发送消息 + * @param topic + * @param payload + */ + void publish(@Header(MqttHeaders.TOPIC) String topic, String payload, @Header(MqttHeaders.QOS) int qos); + + /** + * 发送消息 + * @param topic + * @param payload + */ + void publish(@Header(MqttHeaders.TOPIC) String topic, byte[] payload); + + /** + * 发送消息并带上qos + * @param topic + * @param payload + * @param qos + */ + void publish(@Header(MqttHeaders.TOPIC) String topic, byte[] payload, @Header(MqttHeaders.QOS) int qos); +} diff --git a/mh-system/src/main/java/com/mh/system/service/mqtt/IMqttMsgSenderService.java b/mh-system/src/main/java/com/mh/system/service/mqtt/IMqttMsgSenderService.java new file mode 100644 index 0000000..050fd9b --- /dev/null +++ b/mh-system/src/main/java/com/mh/system/service/mqtt/IMqttMsgSenderService.java @@ -0,0 +1,96 @@ +package com.mh.system.service.mqtt; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.mh.user.model.response.CommonTopicResponse; +import com.mh.user.model.response.ServiceReply; + +/** + * @author LJF + * @version 1.0 + * @project springboot-mqtt-demo + * @description Mqtt发送消息 + * @date 2024-10-29 17:31:40 + */ +public interface IMqttMsgSenderService { + + void publish(String topic, String pushMessage); + + /** + * 发布消息 + * @param topic target + * @param response message + */ + void publish(String topic, CommonTopicResponse response); + + /** + * 使用qos发布消息 + * + * @param topic target + * @param qos qos + * @param response message + */ + void publish(String topic, int qos, CommonTopicResponse response); + + /** + * 发送消息并同时接收响应 + * @param clazz + * @param topic + * @param response 通知启动是否成功 + * @return + */ + T publishWithReply(Class clazz, String topic, CommonTopicResponse response); + + /** + * 发送消息并同时接收响应 + * @param clazz + * @param topic + * @param response + * @param retryTime + * @param + * @return + */ + T publishWithReply(Class clazz, String topic, CommonTopicResponse response, int retryTime); + + /** + * 专门用于发送服务消息 + * @param clazz The generic class for ServiceReply. + * @param sn + * @param method + * @param data + * @param bid + * @param + * @return + */ + ServiceReply publishServicesTopic(TypeReference clazz, String sn, String method, Object data, String bid); + + /** + * 仅用于为服务发送消息,不设置接收到的子类型 + * @param sn + * @param method + * @param data + * @param bid + * @return + */ + ServiceReply publishServicesTopic(String sn, String method, Object data, String bid); + + /** + * 专门用于发送服务消息 + * @param clazz The generic class for ServiceReply. + * @param sn + * @param method + * @param data + * @param + * @return + */ + ServiceReply publishServicesTopic(TypeReference clazz, String sn, String method, Object data); + + /** + * 仅用于为服务发送消息,不设置接收到的子类型 + * @param sn + * @param method + * @param data + * @return + */ + ServiceReply publishServicesTopic(String sn, String method, Object data); + +} diff --git a/mh-system/src/main/java/com/mh/system/service/mqtt/IMqttTopicService.java b/mh-system/src/main/java/com/mh/system/service/mqtt/IMqttTopicService.java new file mode 100644 index 0000000..7e74089 --- /dev/null +++ b/mh-system/src/main/java/com/mh/system/service/mqtt/IMqttTopicService.java @@ -0,0 +1,40 @@ +package com.mh.system.service.mqtt; + +import org.springframework.integration.mqtt.support.MqttHeaders; +import org.springframework.messaging.handler.annotation.Header; + +/** + * @author LJF + * @version 1.0 + * @project springboot-mqtt-demo + * @description mqtt订阅主题 + * @date 2024-10-29 17:29:52 + */ +public interface IMqttTopicService { + + /** + * 订阅主题 + * @param topic + */ + void subscribe(@Header(MqttHeaders.TOPIC) String topic); + + /** + * 订阅主题并设置qos + * @param topic + * @param qos + */ + void subscribe(@Header(MqttHeaders.TOPIC) String topic, int qos); + + /** + * 解绑主题 + * @param topic + */ + void unsubscribe(@Header(MqttHeaders.TOPIC) String topic); + + /** + * 获取已订阅的主题 + * @return + */ + String[] getSubscribedTopics(); + +} diff --git a/mh-system/src/main/java/com/mh/system/service/mqtt/impl/EventsServiceImpl.java b/mh-system/src/main/java/com/mh/system/service/mqtt/impl/EventsServiceImpl.java new file mode 100644 index 0000000..a003006 --- /dev/null +++ b/mh-system/src/main/java/com/mh/system/service/mqtt/impl/EventsServiceImpl.java @@ -0,0 +1,71 @@ +package com.mh.system.service.mqtt.impl; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.mh.common.constant.ChannelName; +import com.mh.common.core.redis.RedisCache; +import com.mh.common.model.request.CommonTopicReceiver; +import com.mh.system.service.mqtt.IEventsService; +import io.netty.util.CharsetUtil; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.messaging.MessageHeaders; +import org.springframework.stereotype.Service; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +/** + * @author LJF + * @version 1.0 + * @project springboot-mqtt-demo + * @description 通道消息处理 + * @date 2024-11-05 11:42:30 + */ +@Slf4j +@Service +public class EventsServiceImpl implements IEventsService { + + @Autowired + private ObjectMapper mapper; + + @Autowired + private RedisCache redisCache; + + @ServiceActivator(inputChannel = ChannelName.EVENTS_UPLOAD_INBOUND) + @Override + public void handleInboundUpload(byte[] receiver, MessageHeaders headers) { + handleInboundData(receiver, "主动上报数据"); + } + + @ServiceActivator(inputChannel = ChannelName.EVENTS_COLLECTION_INBOUND) + @Override + public void handleInboundCollection(byte[] receiver, MessageHeaders headers) { + handleInboundData(receiver, "主动下发采集数据"); + } + + @ServiceActivator(inputChannel = ChannelName.EVENTS_CONTROL_INBOUND) + @Override + public void handleInboundControl(byte[] receiver, MessageHeaders headers) { + handleInboundData(receiver, "控制指令下发"); + } + + @ServiceActivator(inputChannel = ChannelName.EVENTS_RECEIVE_INBOUND) + @Override + public void handleInboundReceive(byte[] receiver, MessageHeaders headers) { + String receiveStr = new String(receiver, CharsetUtil.UTF_8); + log.info("接收到控制指令返回=>{}", receiveStr); + // 设置成redis,15秒时间响应 + redisCache.setCacheObject("receiveCmd", receiveStr, 15, TimeUnit.SECONDS); + } + + private void handleInboundData(byte[] receiver, String logMessage) { + try { + CommonTopicReceiver commonTopicReceiver = mapper.readValue(receiver, CommonTopicReceiver.class); + log.info("{}: {}", logMessage, commonTopicReceiver); + } catch (IOException e) { + log.error("处理数据时发生错误: ", e); + } + } + +} diff --git a/mh-system/src/main/java/com/mh/system/service/mqtt/impl/MqttMsgSenderServiceImpl.java b/mh-system/src/main/java/com/mh/system/service/mqtt/impl/MqttMsgSenderServiceImpl.java new file mode 100644 index 0000000..b4c7bac --- /dev/null +++ b/mh-system/src/main/java/com/mh/system/service/mqtt/impl/MqttMsgSenderServiceImpl.java @@ -0,0 +1,140 @@ +package com.mh.system.service.mqtt.impl; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.mh.common.constant.TopicConst; +import com.mh.user.model.response.CommonTopicResponse; +import com.mh.user.model.response.ServiceReply; +import com.mh.user.service.mqtt.IMqttMessageGateway; +import com.mh.user.service.mqtt.IMqttMsgSenderService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import org.springframework.util.StringUtils; + +import java.util.Objects; +import java.util.UUID; + +/** + * @author LJF + * @version 1.0 + * @project springboot-mqtt-demo + * @description mqtt不同类型发送消息 + * @date 2024-10-30 14:30:03 + */ +@Slf4j +@Service +public class MqttMsgSenderServiceImpl implements IMqttMsgSenderService { + + @Autowired + private IMqttMessageGateway mqttMessageGateway; + + @Autowired + private ObjectMapper mapper; + + /** + * 发布,默认qos为0,非持久化 + * + * @param pushMessage + * @param topic + */ + @Override + public void publish(String topic, String pushMessage) { + synchronized (this) {//注意:这里一定要同步,否则,在多线程publish的情况下,线程会发生死锁,分析见文章最后补充 + try { + mqttMessageGateway.publish(topic, pushMessage, 0); + log.info("发送主题:{},消息:{}", topic, pushMessage); + } catch (Exception e) { + log.error("发送主题异常:{},消息:{}", topic, pushMessage, e); + } + } + } + + @Override + public void publish(String topic, CommonTopicResponse response) { + this.publish(topic, 1, response); + } + + @Override + public void publish(String topic, int qos, CommonTopicResponse response) { + try { + log.info("发送主题:{},消息:{}", topic, response.toString()); + mqttMessageGateway.publish(topic, mapper.writeValueAsBytes(response), qos); + } catch (JsonProcessingException e) { + log.error("发送主题:{},消息:{}", topic, response.toString(), e); + throw new RuntimeException(e); + } + } + + @Override + public T publishWithReply(Class clazz, String topic, CommonTopicResponse response) { + return this.publishWithReply(clazz, topic, response, 2); + } + + @Override + public T publishWithReply(Class clazz, String topic, CommonTopicResponse response, int retryTime) { +// AtomicInteger time = new AtomicInteger(0); +// // Retry three times +// while (time.getAndIncrement() <= retryTime) { +// this.publish(topic, response); +// +// Chan> chan = Chan.getInstance(); +// // If the message is not received in 0.5 seconds then resend it again. +// CommonTopicReceiver receiver = chan.get(response.getTid()); +// +// // Need to match tid and bid. +// if (Objects.nonNull(receiver) && receiver.getTid().equals(response.getTid()) && +// receiver.getBid().equals(response.getBid())) { +// if (clazz.isAssignableFrom(receiver.getData().getClass())) { +// return receiver.getData(); +// } +// throw new TypeMismatchException(receiver.getData(), clazz); +// } +// // It must be guaranteed that the tid and bid of each message are different. +// response.setBid(UUID.randomUUID().toString()); +// response.setTid(UUID.randomUUID().toString()); +// } +// throw new RuntimeException("没有消息接收到"); + return null; + } + + @Override + public ServiceReply publishServicesTopic(TypeReference clazz, String sn, String method, Object data, String bid) { + String topic = TopicConst.THING_MODEL_PRE + TopicConst.PRODUCT + sn + TopicConst.SERVICES_SUF; + ServiceReply reply = this.publishWithReply(ServiceReply.class, topic, + CommonTopicResponse.builder() + .tid(UUID.randomUUID().toString()) + .bid(StringUtils.hasText(bid) ? bid : UUID.randomUUID().toString()) + .timestamp(System.currentTimeMillis()) + .method(method) + .data(Objects.requireNonNull(data, "")) + .build()); + if (Objects.isNull(clazz)) { + return reply; + } + // put together in "output" + if (Objects.nonNull(reply.getInfo())) { + reply.setOutput(mapper.convertValue(reply.getInfo(), clazz)); + } + if (Objects.nonNull(reply.getOutput())) { + reply.setOutput(mapper.convertValue(reply.getOutput(), clazz)); + } + return reply; + } + + @Override + public ServiceReply publishServicesTopic(String sn, String method, Object data, String bid) { + return this.publishServicesTopic(null, sn, method, data, bid); + } + + @Override + public ServiceReply publishServicesTopic(TypeReference clazz, String sn, String method, Object data) { + return this.publishServicesTopic(clazz, sn, method, data, null); + } + + @Override + public ServiceReply publishServicesTopic(String sn, String method, Object data) { + return this.publishServicesTopic(null, sn, method, data, null); + } +} diff --git a/mh-system/src/main/java/com/mh/system/service/mqtt/impl/MqttTopicServiceImpl.java b/mh-system/src/main/java/com/mh/system/service/mqtt/impl/MqttTopicServiceImpl.java new file mode 100644 index 0000000..11efee3 --- /dev/null +++ b/mh-system/src/main/java/com/mh/system/service/mqtt/impl/MqttTopicServiceImpl.java @@ -0,0 +1,41 @@ +package com.mh.system.service.mqtt.impl; + +import com.mh.user.service.mqtt.IMqttTopicService; +import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; + +/** + * @author LJF + * @version 1.0 + * @project springboot-mqtt-demo + * @description 订阅主题实现类 + * @date 2024-10-29 17:36:31 + */ +@Service +public class MqttTopicServiceImpl implements IMqttTopicService { + + @Resource + private MqttPahoMessageDrivenChannelAdapter adapter; + + @Override + public void subscribe(String topic) { + adapter.addTopic(topic); + } + + @Override + public void subscribe(String topic, int qos) { + adapter.addTopic(topic, qos); + } + + @Override + public void unsubscribe(String topic) { + adapter.removeTopic(topic); + } + + @Override + public String[] getSubscribedTopics() { + return adapter.getTopic(); + } +} diff --git a/pom.xml b/pom.xml index 3ecfb7c..b74ff1a 100644 --- a/pom.xml +++ b/pom.xml @@ -36,6 +36,9 @@ 6.0.0 2.6.0 3.5.3 + 6.4.1 + 1.18.36 + 6.2.1 @@ -236,6 +239,28 @@ + + + org.springframework.integration + spring-integration-mqtt + ${mqtt.version} + + + + + org.projectlombok + lombok + ${lombok.version} + + + + + org.springframework + spring-messaging + ${message.version} + + +