From 84e0119c63e689d442b676edf929dfeec60f4d20 Mon Sep 17 00:00:00 2001 From: mh Date: Thu, 23 Jan 2025 17:35:50 +0800 Subject: [PATCH] =?UTF-8?q?1=E3=80=81=E7=9B=91=E6=8E=A7=E7=AE=A1=E7=90=86?= =?UTF-8?q?=EF=BC=9A=E5=85=BC=E5=AE=B9mqtt=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/main/java/com/mh/MHApplication.java | 1 + .../device/OperationController.java | 1 - .../controller/mqtt/MqttTopicController.java | 10 ++- .../src/main/resources/application-dev.yml | 65 ++++++++++++++++++- mh-common/pom.xml | 8 +-- .../com/mh/common/constant/ChannelName.java | 2 +- .../com/mh/common/constant/TopicConst.java | 4 +- .../com/mh/common/constant/TopicEnum.java | 59 +++++++++++++++++ .../common/model/request/AdvantechDatas.java | 25 +++++++ .../model/request/AdvantechReceiver.java | 29 +++++++++ .../model/response/AdvantechResponse.java | 29 +++++++++ .../manager/factory/AsyncFactory.java | 21 ++++++ .../mh/framework/mqtt/config}/MqttConfig.java | 2 +- .../mqtt/config}/MqttInboundConfig.java | 4 +- .../mqtt/config}/MqttMessageChannel.java | 10 +-- .../mqtt/config}/MqttOutboundConfig.java | 2 +- .../mqtt/handler/InboundMessageRouter.java | 47 ++++++++++++++ .../mqtt/service}/IEventsService.java | 6 +- .../mqtt/service/IMqttGatewayService.java | 8 ++- .../mqtt/service}/IMqttMsgSenderService.java | 6 +- .../mqtt/service}/IMqttTopicService.java | 2 +- .../mqtt/service}/impl/EventsServiceImpl.java | 24 +++---- .../impl/MqttMsgSenderServiceImpl.java | 22 ++++--- .../service}/impl/MqttTopicServiceImpl.java | 7 +- 24 files changed, 331 insertions(+), 63 deletions(-) create mode 100644 mh-common/src/main/java/com/mh/common/constant/TopicEnum.java create mode 100644 mh-common/src/main/java/com/mh/common/model/request/AdvantechDatas.java create mode 100644 mh-common/src/main/java/com/mh/common/model/request/AdvantechReceiver.java create mode 100644 mh-common/src/main/java/com/mh/common/model/response/AdvantechResponse.java rename {mh-common/src/main/java/com/mh/common/config/mqtt => mh-framework/src/main/java/com/mh/framework/mqtt/config}/MqttConfig.java (98%) rename {mh-common/src/main/java/com/mh/common/config/mqtt => mh-framework/src/main/java/com/mh/framework/mqtt/config}/MqttInboundConfig.java (96%) rename {mh-common/src/main/java/com/mh/common/config/mqtt => mh-framework/src/main/java/com/mh/framework/mqtt/config}/MqttMessageChannel.java (84%) rename {mh-common/src/main/java/com/mh/common/config/mqtt => mh-framework/src/main/java/com/mh/framework/mqtt/config}/MqttOutboundConfig.java (97%) create mode 100644 mh-framework/src/main/java/com/mh/framework/mqtt/handler/InboundMessageRouter.java rename {mh-system/src/main/java/com/mh/system/service/mqtt => mh-framework/src/main/java/com/mh/framework/mqtt/service}/IEventsService.java (86%) rename mh-system/src/main/java/com/mh/system/service/mqtt/IMqttMessageGateway.java => mh-framework/src/main/java/com/mh/framework/mqtt/service/IMqttGatewayService.java (84%) rename {mh-system/src/main/java/com/mh/system/service/mqtt => mh-framework/src/main/java/com/mh/framework/mqtt/service}/IMqttMsgSenderService.java (94%) rename {mh-system/src/main/java/com/mh/system/service/mqtt => mh-framework/src/main/java/com/mh/framework/mqtt/service}/IMqttTopicService.java (95%) rename {mh-system/src/main/java/com/mh/system/service/mqtt => mh-framework/src/main/java/com/mh/framework/mqtt/service}/impl/EventsServiceImpl.java (67%) rename {mh-system/src/main/java/com/mh/system/service/mqtt => mh-framework/src/main/java/com/mh/framework/mqtt/service}/impl/MqttMsgSenderServiceImpl.java (87%) rename {mh-system/src/main/java/com/mh/system/service/mqtt => mh-framework/src/main/java/com/mh/framework/mqtt/service}/impl/MqttTopicServiceImpl.java (85%) diff --git a/mh-admin/src/main/java/com/mh/MHApplication.java b/mh-admin/src/main/java/com/mh/MHApplication.java index a733d29..864d8eb 100644 --- a/mh-admin/src/main/java/com/mh/MHApplication.java +++ b/mh-admin/src/main/java/com/mh/MHApplication.java @@ -18,3 +18,4 @@ public class MHApplication SpringApplication.run(MHApplication.class, args); } } + diff --git a/mh-admin/src/main/java/com/mh/web/controller/device/OperationController.java b/mh-admin/src/main/java/com/mh/web/controller/device/OperationController.java index 54eab16..8d32dbe 100644 --- a/mh-admin/src/main/java/com/mh/web/controller/device/OperationController.java +++ b/mh-admin/src/main/java/com/mh/web/controller/device/OperationController.java @@ -15,5 +15,4 @@ import org.springframework.web.bind.annotation.RestController; @RequestMapping("/device/operation") public class OperationController extends BaseController { - } diff --git a/mh-admin/src/main/java/com/mh/web/controller/mqtt/MqttTopicController.java b/mh-admin/src/main/java/com/mh/web/controller/mqtt/MqttTopicController.java index 07a755f..564f011 100644 --- a/mh-admin/src/main/java/com/mh/web/controller/mqtt/MqttTopicController.java +++ b/mh-admin/src/main/java/com/mh/web/controller/mqtt/MqttTopicController.java @@ -1,7 +1,7 @@ package com.mh.web.controller.mqtt; -import com.mh.system.service.mqtt.IMqttMsgSenderService; -import com.mh.system.service.mqtt.IMqttTopicService; +import com.mh.framework.mqtt.service.IMqttMsgSenderService; +import com.mh.framework.mqtt.service.IMqttTopicService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; @@ -50,7 +50,11 @@ public class MqttTopicController { if (!exists) { return "主题不存在"; } - mqttMsgSenderService.publish(topic, pushMessage); + try { + mqttMsgSenderService.publish(topic, pushMessage); + } catch (Exception e) { + throw new RuntimeException(e); + } return "发布主题:" + topic + "成功"; } diff --git a/mh-admin/src/main/resources/application-dev.yml b/mh-admin/src/main/resources/application-dev.yml index 358af37..924115d 100644 --- a/mh-admin/src/main/resources/application-dev.yml +++ b/mh-admin/src/main/resources/application-dev.yml @@ -43,8 +43,6 @@ spring: messages: # 国际化资源文件路径 basename: i18n/messages - profiles: - active: druid # 文件上传 servlet: multipart: @@ -80,6 +78,67 @@ spring: max-active: 8 # #连接池最大阻塞等待时间(使用负值表示没有限制) max-wait: -1ms + # 数据源配置 + datasource: + type: com.alibaba.druid.pool.DruidDataSource + driverClassName: org.postgresql.Driver + druid: + # 主库数据源 + master: + #添加allowMultiQueries=true 在批量更新时才不会出错 + url: jdbc:postgresql://127.0.0.1:5432/eemcs + username: postgres + password: mh@803 + # 从库数据源 + slave: + # 从数据源开关/默认关闭 + enabled: false + url: + username: + password: + # 初始连接数 + initialSize: 5 + # 最小连接池数量 + minIdle: 10 + # 最大连接池数量 + maxActive: 20 + # 配置获取连接等待超时的时间 + maxWait: 60000 + # 配置连接超时时间 + connectTimeout: 30000 + # 配置网络超时时间 + socketTimeout: 60000 + # 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 + timeBetweenEvictionRunsMillis: 60000 + # 配置一个连接在池中最小生存的时间,单位是毫秒 + minEvictableIdleTimeMillis: 300000 + # 配置一个连接在池中最大生存的时间,单位是毫秒 + maxEvictableIdleTimeMillis: 900000 + # 配置检测连接是否有效 + validationQuery: SELECT 1 + testWhileIdle: true + testOnBorrow: false + testOnReturn: false + webStatFilter: + enabled: true + statViewServlet: + enabled: true + # 设置白名单,不填则允许所有访问 + allow: + url-pattern: /druid/* + # 控制台管理用户名和密码 + login-username: mh + login-password: 123456 + filter: + stat: + enabled: true + # 慢SQL记录 + log-slow-sql: true + slow-sql-millis: 1000 + merge-sql: true + wall: + config: + multi-statement-allow: true # MyBatis配置 mybatis-plus: @@ -134,4 +193,4 @@ mqttSpring: protocol: WS host: 127.0.0.1 port: 8083 - path: /mqtt \ No newline at end of file + path: /mqtt diff --git a/mh-common/pom.xml b/mh-common/pom.xml index 6769ec2..de4032c 100644 --- a/mh-common/pom.xml +++ b/mh-common/pom.xml @@ -145,10 +145,10 @@ spring-integration-mqtt - - org.springframework - spring-messaging - + + + + org.projectlombok diff --git a/mh-common/src/main/java/com/mh/common/constant/ChannelName.java b/mh-common/src/main/java/com/mh/common/constant/ChannelName.java index 8c94500..530d08a 100644 --- a/mh-common/src/main/java/com/mh/common/constant/ChannelName.java +++ b/mh-common/src/main/java/com/mh/common/constant/ChannelName.java @@ -12,7 +12,7 @@ public class ChannelName { /** * 默认通道名称(防止出错) */ - public static final String DEFAULT = "default"; + public static final String DEFAULT_BOUND = "default_bound"; /** * 主动上报入站 diff --git a/mh-common/src/main/java/com/mh/common/constant/TopicConst.java b/mh-common/src/main/java/com/mh/common/constant/TopicConst.java index daec78d..45224c1 100644 --- a/mh-common/src/main/java/com/mh/common/constant/TopicConst.java +++ b/mh-common/src/main/java/com/mh/common/constant/TopicConst.java @@ -3,9 +3,9 @@ package com.mh.common.constant; /** * All the topics that need to be used in the project. * - * @author sean.zhou + * @author ljf * @version 0.1 - * @date 2021/11/10 + * @date 2025-01-22 */ public class TopicConst { diff --git a/mh-common/src/main/java/com/mh/common/constant/TopicEnum.java b/mh-common/src/main/java/com/mh/common/constant/TopicEnum.java new file mode 100644 index 0000000..8569098 --- /dev/null +++ b/mh-common/src/main/java/com/mh/common/constant/TopicEnum.java @@ -0,0 +1,59 @@ +package com.mh.common.constant; + +import java.util.Arrays; +import java.util.regex.Pattern; + +import static com.mh.common.constant.TopicConst.*; + + +/** + * @author ljf + * @version 1.0 + * @description: TODO + * @date 2024/11/06 14:28 + */ +public enum TopicEnum { + + /** + * 客户端主动上报数据 + */ + CLIENT_UPLOAD_DATA(Pattern.compile("^" + MH_UPLOAD + EVENTS_UPLOAD + REGEX_SN + "$"), ChannelName.EVENTS_UPLOAD_INBOUND), + + /** + * 服务端采集数据 + */ + SERVER_COLLECTION_DATA(Pattern.compile("^" + MH_COLLECTION + EVENTS_COLLECTION + REGEX_SN + "$"), ChannelName.EVENTS_COLLECTION_INBOUND), + + /** + * 服务端控制指令 + */ + SERVER_CONTROL_DATA(Pattern.compile("^" + MH_CONTROL + EVENTS_CONTROL + REGEX_SN + "$"), ChannelName.EVENTS_CONTROL_INBOUND), + + /** + * 订阅服务端发送的主题命令 + */ + SERVER_SEND_DATA(Pattern.compile("^A/cmd/ctl/send" + "$"), ChannelName.EVENTS_SEND_INBOUND), + + UNKNOWN(Pattern.compile("^.*$"), ChannelName.DEFAULT_BOUND); + + final Pattern pattern; + + final String beanName; + + TopicEnum(Pattern pattern, String beanName) { + this.pattern = pattern; + this.beanName = beanName; + } + + public Pattern getPattern() { + return pattern; + } + + public String getBeanName() { + return beanName; + } + + public static TopicEnum find(String topic) { + return Arrays.stream(TopicEnum.values()).filter(topicEnum -> topicEnum.pattern.matcher(topic).matches()).findAny().orElse(UNKNOWN); + } +} diff --git a/mh-common/src/main/java/com/mh/common/model/request/AdvantechDatas.java b/mh-common/src/main/java/com/mh/common/model/request/AdvantechDatas.java new file mode 100644 index 0000000..8f27aab --- /dev/null +++ b/mh-common/src/main/java/com/mh/common/model/request/AdvantechDatas.java @@ -0,0 +1,25 @@ +package com.mh.common.model.request; + +import lombok.Data; + +/** + * @author LJF + * @version 1.0 + * @project EEMCS + * @description 研华数据体 + * @date 2025-01-22 14:47:25 + */ +@Data +public class AdvantechDatas { + + /** + * 对应研华的标签值 + */ + private String tag; + + /** + * 上报值 + */ + private String value; + +} diff --git a/mh-common/src/main/java/com/mh/common/model/request/AdvantechReceiver.java b/mh-common/src/main/java/com/mh/common/model/request/AdvantechReceiver.java new file mode 100644 index 0000000..5fae9ec --- /dev/null +++ b/mh-common/src/main/java/com/mh/common/model/request/AdvantechReceiver.java @@ -0,0 +1,29 @@ +package com.mh.common.model.request; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.Data; + +import java.util.List; + +/** + * @author LJF + * @version 1.0 + * @project EEMCS + * @description 研华网关发送接收数据 + * @date 2025-01-22 14:43:15 + */ +@Data +@JsonIgnoreProperties(ignoreUnknown = true) +public class AdvantechReceiver { + + /** + * 数据集合 + */ + private List d; + + /** + * 主动上报数据时间(带T类型) + */ + private String ts; + +} diff --git a/mh-common/src/main/java/com/mh/common/model/response/AdvantechResponse.java b/mh-common/src/main/java/com/mh/common/model/response/AdvantechResponse.java new file mode 100644 index 0000000..bcae0ba --- /dev/null +++ b/mh-common/src/main/java/com/mh/common/model/response/AdvantechResponse.java @@ -0,0 +1,29 @@ +package com.mh.common.model.response; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.Data; + +import java.util.List; + +/** + * @author LJF + * @version 1.0 + * @project EEMCS + * @description 研华网关发送接收数据 + * @date 2025-01-22 14:43:15 + */ +@Data +@JsonIgnoreProperties(ignoreUnknown = true) +public class AdvantechResponse { + + /** + * 数据集合 + */ + private List w; + + /** + * 主动上报数据时间(带T类型) + */ + private String ts; + +} diff --git a/mh-framework/src/main/java/com/mh/framework/manager/factory/AsyncFactory.java b/mh-framework/src/main/java/com/mh/framework/manager/factory/AsyncFactory.java index a3c16df..af7b983 100644 --- a/mh-framework/src/main/java/com/mh/framework/manager/factory/AsyncFactory.java +++ b/mh-framework/src/main/java/com/mh/framework/manager/factory/AsyncFactory.java @@ -1,6 +1,8 @@ package com.mh.framework.manager.factory; import java.util.TimerTask; + +import lombok.extern.slf4j.Slf4j; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.mh.common.constant.Constants; @@ -21,6 +23,7 @@ import eu.bitwalker.useragentutils.UserAgent; * * @author mh */ +@Slf4j public class AsyncFactory { private static final Logger sys_user_logger = LoggerFactory.getLogger("sys-user"); @@ -99,4 +102,22 @@ public class AsyncFactory } }; } + + /** + * mqtt线程开启日志记录 + * + * @return 任务task + */ + public static TimerTask mqttRecord() + { + return new TimerTask() + { + @Override + public void run() + { + // 远程查询操作地点 + log.info("mqtt开启日志记录"); + } + }; + } } diff --git a/mh-common/src/main/java/com/mh/common/config/mqtt/MqttConfig.java b/mh-framework/src/main/java/com/mh/framework/mqtt/config/MqttConfig.java similarity index 98% rename from mh-common/src/main/java/com/mh/common/config/mqtt/MqttConfig.java rename to mh-framework/src/main/java/com/mh/framework/mqtt/config/MqttConfig.java index d06a3cc..3488b5c 100644 --- a/mh-common/src/main/java/com/mh/common/config/mqtt/MqttConfig.java +++ b/mh-framework/src/main/java/com/mh/framework/mqtt/config/MqttConfig.java @@ -1,4 +1,4 @@ -package com.mh.common.config.mqtt; +package com.mh.framework.mqtt.config; import com.mh.common.enums.MqttClientOptions; import com.mh.common.enums.MqttProtocolEnum; diff --git a/mh-common/src/main/java/com/mh/common/config/mqtt/MqttInboundConfig.java b/mh-framework/src/main/java/com/mh/framework/mqtt/config/MqttInboundConfig.java similarity index 96% rename from mh-common/src/main/java/com/mh/common/config/mqtt/MqttInboundConfig.java rename to mh-framework/src/main/java/com/mh/framework/mqtt/config/MqttInboundConfig.java index 4ef5a26..c72e245 100644 --- a/mh-common/src/main/java/com/mh/common/config/mqtt/MqttInboundConfig.java +++ b/mh-framework/src/main/java/com/mh/framework/mqtt/config/MqttInboundConfig.java @@ -1,4 +1,4 @@ -package com.mh.common.config.mqtt; +package com.mh.framework.mqtt.config; import com.mh.common.constant.ChannelName; import com.mh.common.enums.MqttClientOptions; @@ -69,7 +69,7 @@ public class MqttInboundConfig { * @return */ @Bean - @ServiceActivator(inputChannel = ChannelName.DEFAULT) + @ServiceActivator(inputChannel = ChannelName.DEFAULT_BOUND) public MessageHandler handler() { return message -> { log.info("The default channel does not handle messages." + diff --git a/mh-common/src/main/java/com/mh/common/config/mqtt/MqttMessageChannel.java b/mh-framework/src/main/java/com/mh/framework/mqtt/config/MqttMessageChannel.java similarity index 84% rename from mh-common/src/main/java/com/mh/common/config/mqtt/MqttMessageChannel.java rename to mh-framework/src/main/java/com/mh/framework/mqtt/config/MqttMessageChannel.java index c3e26bb..7ceda6c 100644 --- a/mh-common/src/main/java/com/mh/common/config/mqtt/MqttMessageChannel.java +++ b/mh-framework/src/main/java/com/mh/framework/mqtt/config/MqttMessageChannel.java @@ -1,16 +1,13 @@ -package com.mh.common.config.mqtt; +package com.mh.framework.mqtt.config; import com.mh.common.constant.ChannelName; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.channel.ExecutorChannel; import org.springframework.messaging.MessageChannel; -import java.util.concurrent.Executor; - /** * @author LJF * @version 1.0 @@ -22,12 +19,9 @@ import java.util.concurrent.Executor; @Configuration public class MqttMessageChannel { - @Autowired - private Executor threadPool; - @Bean(name = ChannelName.INBOUND) public MessageChannel inboundChannel() { - return new ExecutorChannel(threadPool); + return new DirectChannel(); } @Bean(name = ChannelName.EVENTS_UPLOAD_INBOUND) diff --git a/mh-common/src/main/java/com/mh/common/config/mqtt/MqttOutboundConfig.java b/mh-framework/src/main/java/com/mh/framework/mqtt/config/MqttOutboundConfig.java similarity index 97% rename from mh-common/src/main/java/com/mh/common/config/mqtt/MqttOutboundConfig.java rename to mh-framework/src/main/java/com/mh/framework/mqtt/config/MqttOutboundConfig.java index 287e0b9..2c73e2f 100644 --- a/mh-common/src/main/java/com/mh/common/config/mqtt/MqttOutboundConfig.java +++ b/mh-framework/src/main/java/com/mh/framework/mqtt/config/MqttOutboundConfig.java @@ -1,4 +1,4 @@ -package com.mh.common.config.mqtt; +package com.mh.framework.mqtt.config; import com.mh.common.constant.ChannelName; import lombok.extern.slf4j.Slf4j; diff --git a/mh-framework/src/main/java/com/mh/framework/mqtt/handler/InboundMessageRouter.java b/mh-framework/src/main/java/com/mh/framework/mqtt/handler/InboundMessageRouter.java new file mode 100644 index 0000000..99fb838 --- /dev/null +++ b/mh-framework/src/main/java/com/mh/framework/mqtt/handler/InboundMessageRouter.java @@ -0,0 +1,47 @@ +package com.mh.framework.mqtt.handler; + +import com.mh.common.constant.ChannelName; +import com.mh.common.constant.TopicEnum; +import com.mh.common.utils.spring.SpringUtils; +import lombok.extern.slf4j.Slf4j; +import org.springframework.integration.annotation.Router; +import org.springframework.integration.mqtt.support.MqttHeaders; +import org.springframework.integration.router.AbstractMessageRouter; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHeaders; +import org.springframework.stereotype.Component; + +import java.util.Collection; +import java.util.Collections; +import java.util.Objects; + +/** + * @author LJF + * @version 1.0 + * @project springboot-mqtt-demo + * @description 入站消息路由分发中心 + * @date 2024-10-29 17:04:17 + */ +@Slf4j +@Component +public class InboundMessageRouter extends AbstractMessageRouter { + + /** + * 目前只需要这个方式,后期在拓展使用IntegrationFlow方式 + * @param message + * @return + */ + @Router(inputChannel = ChannelName.INBOUND) + @Override + protected Collection determineTargetChannels(Message message) { + MessageHeaders headers = message.getHeaders(); + String topic = Objects.requireNonNull(headers.get(MqttHeaders.RECEIVED_TOPIC)).toString(); + byte[] payload = (byte[]) message.getPayload(); + log.info("从当前主题 topic: {}, 接收到的消息:{}", topic, new String(payload)); + // 找到对应的主题消息通道 + TopicEnum topicEnum = TopicEnum.find(topic); + MessageChannel bean = (MessageChannel) SpringUtils.getBean(topicEnum.getBeanName()); + return Collections.singleton(bean); + } +} diff --git a/mh-system/src/main/java/com/mh/system/service/mqtt/IEventsService.java b/mh-framework/src/main/java/com/mh/framework/mqtt/service/IEventsService.java similarity index 86% rename from mh-system/src/main/java/com/mh/system/service/mqtt/IEventsService.java rename to mh-framework/src/main/java/com/mh/framework/mqtt/service/IEventsService.java index 18ffdb5..240c674 100644 --- a/mh-system/src/main/java/com/mh/system/service/mqtt/IEventsService.java +++ b/mh-framework/src/main/java/com/mh/framework/mqtt/service/IEventsService.java @@ -1,4 +1,4 @@ -package com.mh.system.service.mqtt; +package com.mh.framework.mqtt.service; import org.springframework.messaging.MessageHeaders; @@ -39,11 +39,11 @@ public interface IEventsService { void handleInboundControl(byte[] receiver, MessageHeaders headers) throws IOException; /** - * 处理接收上报 + * 处理发送 * @param receiver * @param headers * @throws IOException */ - void handleInboundReceive(byte[] receiver, MessageHeaders headers) throws IOException; + void handleInboundSend(byte[] receiver, MessageHeaders headers) throws IOException; } diff --git a/mh-system/src/main/java/com/mh/system/service/mqtt/IMqttMessageGateway.java b/mh-framework/src/main/java/com/mh/framework/mqtt/service/IMqttGatewayService.java similarity index 84% rename from mh-system/src/main/java/com/mh/system/service/mqtt/IMqttMessageGateway.java rename to mh-framework/src/main/java/com/mh/framework/mqtt/service/IMqttGatewayService.java index 58a1a67..e2cdf50 100644 --- a/mh-system/src/main/java/com/mh/system/service/mqtt/IMqttMessageGateway.java +++ b/mh-framework/src/main/java/com/mh/framework/mqtt/service/IMqttGatewayService.java @@ -1,6 +1,7 @@ -package com.mh.system.service.mqtt; +package com.mh.framework.mqtt.service; import com.mh.common.constant.ChannelName; +import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.MessagingGateway; import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.messaging.handler.annotation.Header; @@ -9,13 +10,14 @@ import org.springframework.stereotype.Component; /** * @author LJF * @version 1.0 - * @project springboot-mqtt-demo + * @project springboot-mqtt * @description 消息网关 * @date 2024-10-30 14:43:55 */ @Component +@Configuration @MessagingGateway(defaultRequestChannel = ChannelName.OUTBOUND) -public interface IMqttMessageGateway { +public interface IMqttGatewayService { /** * 发送消息 diff --git a/mh-system/src/main/java/com/mh/system/service/mqtt/IMqttMsgSenderService.java b/mh-framework/src/main/java/com/mh/framework/mqtt/service/IMqttMsgSenderService.java similarity index 94% rename from mh-system/src/main/java/com/mh/system/service/mqtt/IMqttMsgSenderService.java rename to mh-framework/src/main/java/com/mh/framework/mqtt/service/IMqttMsgSenderService.java index 050fd9b..11a06ac 100644 --- a/mh-system/src/main/java/com/mh/system/service/mqtt/IMqttMsgSenderService.java +++ b/mh-framework/src/main/java/com/mh/framework/mqtt/service/IMqttMsgSenderService.java @@ -1,8 +1,8 @@ -package com.mh.system.service.mqtt; +package com.mh.framework.mqtt.service; import com.fasterxml.jackson.core.type.TypeReference; -import com.mh.user.model.response.CommonTopicResponse; -import com.mh.user.model.response.ServiceReply; +import com.mh.common.model.response.CommonTopicResponse; +import com.mh.common.model.response.ServiceReply; /** * @author LJF diff --git a/mh-system/src/main/java/com/mh/system/service/mqtt/IMqttTopicService.java b/mh-framework/src/main/java/com/mh/framework/mqtt/service/IMqttTopicService.java similarity index 95% rename from mh-system/src/main/java/com/mh/system/service/mqtt/IMqttTopicService.java rename to mh-framework/src/main/java/com/mh/framework/mqtt/service/IMqttTopicService.java index 7e74089..04edcd9 100644 --- a/mh-system/src/main/java/com/mh/system/service/mqtt/IMqttTopicService.java +++ b/mh-framework/src/main/java/com/mh/framework/mqtt/service/IMqttTopicService.java @@ -1,4 +1,4 @@ -package com.mh.system.service.mqtt; +package com.mh.framework.mqtt.service; import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.messaging.handler.annotation.Header; diff --git a/mh-system/src/main/java/com/mh/system/service/mqtt/impl/EventsServiceImpl.java b/mh-framework/src/main/java/com/mh/framework/mqtt/service/impl/EventsServiceImpl.java similarity index 67% rename from mh-system/src/main/java/com/mh/system/service/mqtt/impl/EventsServiceImpl.java rename to mh-framework/src/main/java/com/mh/framework/mqtt/service/impl/EventsServiceImpl.java index a003006..adc35fc 100644 --- a/mh-system/src/main/java/com/mh/system/service/mqtt/impl/EventsServiceImpl.java +++ b/mh-framework/src/main/java/com/mh/framework/mqtt/service/impl/EventsServiceImpl.java @@ -1,10 +1,9 @@ -package com.mh.system.service.mqtt.impl; +package com.mh.framework.mqtt.service.impl; import com.fasterxml.jackson.databind.ObjectMapper; import com.mh.common.constant.ChannelName; -import com.mh.common.core.redis.RedisCache; -import com.mh.common.model.request.CommonTopicReceiver; -import com.mh.system.service.mqtt.IEventsService; +import com.mh.common.model.request.AdvantechDatas; +import com.mh.framework.mqtt.service.IEventsService; import io.netty.util.CharsetUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -13,7 +12,6 @@ import org.springframework.messaging.MessageHeaders; import org.springframework.stereotype.Service; import java.io.IOException; -import java.util.concurrent.TimeUnit; /** * @author LJF @@ -29,9 +27,6 @@ public class EventsServiceImpl implements IEventsService { @Autowired private ObjectMapper mapper; - @Autowired - private RedisCache redisCache; - @ServiceActivator(inputChannel = ChannelName.EVENTS_UPLOAD_INBOUND) @Override public void handleInboundUpload(byte[] receiver, MessageHeaders headers) { @@ -50,22 +45,21 @@ public class EventsServiceImpl implements IEventsService { handleInboundData(receiver, "控制指令下发"); } - @ServiceActivator(inputChannel = ChannelName.EVENTS_RECEIVE_INBOUND) + @ServiceActivator(inputChannel = ChannelName.EVENTS_SEND_INBOUND) @Override - public void handleInboundReceive(byte[] receiver, MessageHeaders headers) { - String receiveStr = new String(receiver, CharsetUtil.UTF_8); - log.info("接收到控制指令返回=>{}", receiveStr); - // 设置成redis,15秒时间响应 - redisCache.setCacheObject("receiveCmd", receiveStr, 15, TimeUnit.SECONDS); + public void handleInboundSend(byte[] receiver, MessageHeaders headers) throws IOException { + String sendStr = new String(receiver, CharsetUtil.UTF_8); + log.info("接收到控制指令下发=>{}", sendStr); } private void handleInboundData(byte[] receiver, String logMessage) { try { - CommonTopicReceiver commonTopicReceiver = mapper.readValue(receiver, CommonTopicReceiver.class); + AdvantechDatas commonTopicReceiver = mapper.readValue(receiver, AdvantechDatas.class); log.info("{}: {}", logMessage, commonTopicReceiver); } catch (IOException e) { log.error("处理数据时发生错误: ", e); } } + } diff --git a/mh-system/src/main/java/com/mh/system/service/mqtt/impl/MqttMsgSenderServiceImpl.java b/mh-framework/src/main/java/com/mh/framework/mqtt/service/impl/MqttMsgSenderServiceImpl.java similarity index 87% rename from mh-system/src/main/java/com/mh/system/service/mqtt/impl/MqttMsgSenderServiceImpl.java rename to mh-framework/src/main/java/com/mh/framework/mqtt/service/impl/MqttMsgSenderServiceImpl.java index b4c7bac..f4bddd5 100644 --- a/mh-system/src/main/java/com/mh/system/service/mqtt/impl/MqttMsgSenderServiceImpl.java +++ b/mh-framework/src/main/java/com/mh/framework/mqtt/service/impl/MqttMsgSenderServiceImpl.java @@ -1,13 +1,14 @@ -package com.mh.system.service.mqtt.impl; +package com.mh.framework.mqtt.service.impl; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.mh.common.constant.TopicConst; -import com.mh.user.model.response.CommonTopicResponse; -import com.mh.user.model.response.ServiceReply; -import com.mh.user.service.mqtt.IMqttMessageGateway; -import com.mh.user.service.mqtt.IMqttMsgSenderService; +import com.mh.common.model.response.CommonTopicResponse; +import com.mh.common.model.response.ServiceReply; +import com.mh.common.utils.spring.SpringUtils; +import com.mh.framework.mqtt.service.IMqttGatewayService; +import com.mh.framework.mqtt.service.IMqttMsgSenderService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -27,8 +28,12 @@ import java.util.UUID; @Service public class MqttMsgSenderServiceImpl implements IMqttMsgSenderService { + private final IMqttGatewayService iMqttGatewayService; + @Autowired - private IMqttMessageGateway mqttMessageGateway; + public MqttMsgSenderServiceImpl(IMqttGatewayService myGateway) { + this.iMqttGatewayService = myGateway; + } @Autowired private ObjectMapper mapper; @@ -43,10 +48,11 @@ public class MqttMsgSenderServiceImpl implements IMqttMsgSenderService { public void publish(String topic, String pushMessage) { synchronized (this) {//注意:这里一定要同步,否则,在多线程publish的情况下,线程会发生死锁,分析见文章最后补充 try { - mqttMessageGateway.publish(topic, pushMessage, 0); + iMqttGatewayService.publish(topic, pushMessage, 0); log.info("发送主题:{},消息:{}", topic, pushMessage); } catch (Exception e) { log.error("发送主题异常:{},消息:{}", topic, pushMessage, e); + throw new RuntimeException(e); } } } @@ -60,7 +66,7 @@ public class MqttMsgSenderServiceImpl implements IMqttMsgSenderService { public void publish(String topic, int qos, CommonTopicResponse response) { try { log.info("发送主题:{},消息:{}", topic, response.toString()); - mqttMessageGateway.publish(topic, mapper.writeValueAsBytes(response), qos); + iMqttGatewayService.publish(topic, mapper.writeValueAsBytes(response), qos); } catch (JsonProcessingException e) { log.error("发送主题:{},消息:{}", topic, response.toString(), e); throw new RuntimeException(e); diff --git a/mh-system/src/main/java/com/mh/system/service/mqtt/impl/MqttTopicServiceImpl.java b/mh-framework/src/main/java/com/mh/framework/mqtt/service/impl/MqttTopicServiceImpl.java similarity index 85% rename from mh-system/src/main/java/com/mh/system/service/mqtt/impl/MqttTopicServiceImpl.java rename to mh-framework/src/main/java/com/mh/framework/mqtt/service/impl/MqttTopicServiceImpl.java index 11efee3..1a6eeb7 100644 --- a/mh-system/src/main/java/com/mh/system/service/mqtt/impl/MqttTopicServiceImpl.java +++ b/mh-framework/src/main/java/com/mh/framework/mqtt/service/impl/MqttTopicServiceImpl.java @@ -1,11 +1,10 @@ -package com.mh.system.service.mqtt.impl; +package com.mh.framework.mqtt.service.impl; -import com.mh.user.service.mqtt.IMqttTopicService; +import com.mh.framework.mqtt.service.IMqttTopicService; +import jakarta.annotation.Resource; import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; import org.springframework.stereotype.Service; -import javax.annotation.Resource; - /** * @author LJF * @version 1.0