diff --git a/mh-admin/src/main/java/com/mh/MHApplication.java b/mh-admin/src/main/java/com/mh/MHApplication.java
index a733d29..864d8eb 100644
--- a/mh-admin/src/main/java/com/mh/MHApplication.java
+++ b/mh-admin/src/main/java/com/mh/MHApplication.java
@@ -18,3 +18,4 @@ public class MHApplication
SpringApplication.run(MHApplication.class, args);
}
}
+
diff --git a/mh-admin/src/main/java/com/mh/web/controller/device/OperationController.java b/mh-admin/src/main/java/com/mh/web/controller/device/OperationController.java
index 54eab16..8d32dbe 100644
--- a/mh-admin/src/main/java/com/mh/web/controller/device/OperationController.java
+++ b/mh-admin/src/main/java/com/mh/web/controller/device/OperationController.java
@@ -15,5 +15,4 @@ import org.springframework.web.bind.annotation.RestController;
@RequestMapping("/device/operation")
public class OperationController extends BaseController {
-
}
diff --git a/mh-admin/src/main/java/com/mh/web/controller/mqtt/MqttTopicController.java b/mh-admin/src/main/java/com/mh/web/controller/mqtt/MqttTopicController.java
index 07a755f..564f011 100644
--- a/mh-admin/src/main/java/com/mh/web/controller/mqtt/MqttTopicController.java
+++ b/mh-admin/src/main/java/com/mh/web/controller/mqtt/MqttTopicController.java
@@ -1,7 +1,7 @@
package com.mh.web.controller.mqtt;
-import com.mh.system.service.mqtt.IMqttMsgSenderService;
-import com.mh.system.service.mqtt.IMqttTopicService;
+import com.mh.framework.mqtt.service.IMqttMsgSenderService;
+import com.mh.framework.mqtt.service.IMqttTopicService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
@@ -50,7 +50,11 @@ public class MqttTopicController {
if (!exists) {
return "主题不存在";
}
- mqttMsgSenderService.publish(topic, pushMessage);
+ try {
+ mqttMsgSenderService.publish(topic, pushMessage);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
return "发布主题:" + topic + "成功";
}
diff --git a/mh-admin/src/main/resources/application-dev.yml b/mh-admin/src/main/resources/application-dev.yml
index 358af37..924115d 100644
--- a/mh-admin/src/main/resources/application-dev.yml
+++ b/mh-admin/src/main/resources/application-dev.yml
@@ -43,8 +43,6 @@ spring:
messages:
# 国际化资源文件路径
basename: i18n/messages
- profiles:
- active: druid
# 文件上传
servlet:
multipart:
@@ -80,6 +78,67 @@ spring:
max-active: 8
# #连接池最大阻塞等待时间(使用负值表示没有限制)
max-wait: -1ms
+ # 数据源配置
+ datasource:
+ type: com.alibaba.druid.pool.DruidDataSource
+ driverClassName: org.postgresql.Driver
+ druid:
+ # 主库数据源
+ master:
+ #添加allowMultiQueries=true 在批量更新时才不会出错
+ url: jdbc:postgresql://127.0.0.1:5432/eemcs
+ username: postgres
+ password: mh@803
+ # 从库数据源
+ slave:
+ # 从数据源开关/默认关闭
+ enabled: false
+ url:
+ username:
+ password:
+ # 初始连接数
+ initialSize: 5
+ # 最小连接池数量
+ minIdle: 10
+ # 最大连接池数量
+ maxActive: 20
+ # 配置获取连接等待超时的时间
+ maxWait: 60000
+ # 配置连接超时时间
+ connectTimeout: 30000
+ # 配置网络超时时间
+ socketTimeout: 60000
+ # 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
+ timeBetweenEvictionRunsMillis: 60000
+ # 配置一个连接在池中最小生存的时间,单位是毫秒
+ minEvictableIdleTimeMillis: 300000
+ # 配置一个连接在池中最大生存的时间,单位是毫秒
+ maxEvictableIdleTimeMillis: 900000
+ # 配置检测连接是否有效
+ validationQuery: SELECT 1
+ testWhileIdle: true
+ testOnBorrow: false
+ testOnReturn: false
+ webStatFilter:
+ enabled: true
+ statViewServlet:
+ enabled: true
+ # 设置白名单,不填则允许所有访问
+ allow:
+ url-pattern: /druid/*
+ # 控制台管理用户名和密码
+ login-username: mh
+ login-password: 123456
+ filter:
+ stat:
+ enabled: true
+ # 慢SQL记录
+ log-slow-sql: true
+ slow-sql-millis: 1000
+ merge-sql: true
+ wall:
+ config:
+ multi-statement-allow: true
# MyBatis配置
mybatis-plus:
@@ -134,4 +193,4 @@ mqttSpring:
protocol: WS
host: 127.0.0.1
port: 8083
- path: /mqtt
\ No newline at end of file
+ path: /mqtt
diff --git a/mh-common/pom.xml b/mh-common/pom.xml
index 6769ec2..de4032c 100644
--- a/mh-common/pom.xml
+++ b/mh-common/pom.xml
@@ -145,10 +145,10 @@
spring-integration-mqtt
-
- org.springframework
- spring-messaging
-
+
+
+
+
org.projectlombok
diff --git a/mh-common/src/main/java/com/mh/common/constant/ChannelName.java b/mh-common/src/main/java/com/mh/common/constant/ChannelName.java
index 8c94500..530d08a 100644
--- a/mh-common/src/main/java/com/mh/common/constant/ChannelName.java
+++ b/mh-common/src/main/java/com/mh/common/constant/ChannelName.java
@@ -12,7 +12,7 @@ public class ChannelName {
/**
* 默认通道名称(防止出错)
*/
- public static final String DEFAULT = "default";
+ public static final String DEFAULT_BOUND = "default_bound";
/**
* 主动上报入站
diff --git a/mh-common/src/main/java/com/mh/common/constant/TopicConst.java b/mh-common/src/main/java/com/mh/common/constant/TopicConst.java
index daec78d..45224c1 100644
--- a/mh-common/src/main/java/com/mh/common/constant/TopicConst.java
+++ b/mh-common/src/main/java/com/mh/common/constant/TopicConst.java
@@ -3,9 +3,9 @@ package com.mh.common.constant;
/**
* All the topics that need to be used in the project.
*
- * @author sean.zhou
+ * @author ljf
* @version 0.1
- * @date 2021/11/10
+ * @date 2025-01-22
*/
public class TopicConst {
diff --git a/mh-common/src/main/java/com/mh/common/constant/TopicEnum.java b/mh-common/src/main/java/com/mh/common/constant/TopicEnum.java
new file mode 100644
index 0000000..8569098
--- /dev/null
+++ b/mh-common/src/main/java/com/mh/common/constant/TopicEnum.java
@@ -0,0 +1,59 @@
+package com.mh.common.constant;
+
+import java.util.Arrays;
+import java.util.regex.Pattern;
+
+import static com.mh.common.constant.TopicConst.*;
+
+
+/**
+ * @author ljf
+ * @version 1.0
+ * @description: TODO
+ * @date 2024/11/06 14:28
+ */
+public enum TopicEnum {
+
+ /**
+ * 客户端主动上报数据
+ */
+ CLIENT_UPLOAD_DATA(Pattern.compile("^" + MH_UPLOAD + EVENTS_UPLOAD + REGEX_SN + "$"), ChannelName.EVENTS_UPLOAD_INBOUND),
+
+ /**
+ * 服务端采集数据
+ */
+ SERVER_COLLECTION_DATA(Pattern.compile("^" + MH_COLLECTION + EVENTS_COLLECTION + REGEX_SN + "$"), ChannelName.EVENTS_COLLECTION_INBOUND),
+
+ /**
+ * 服务端控制指令
+ */
+ SERVER_CONTROL_DATA(Pattern.compile("^" + MH_CONTROL + EVENTS_CONTROL + REGEX_SN + "$"), ChannelName.EVENTS_CONTROL_INBOUND),
+
+ /**
+ * 订阅服务端发送的主题命令
+ */
+ SERVER_SEND_DATA(Pattern.compile("^A/cmd/ctl/send" + "$"), ChannelName.EVENTS_SEND_INBOUND),
+
+ UNKNOWN(Pattern.compile("^.*$"), ChannelName.DEFAULT_BOUND);
+
+ final Pattern pattern;
+
+ final String beanName;
+
+ TopicEnum(Pattern pattern, String beanName) {
+ this.pattern = pattern;
+ this.beanName = beanName;
+ }
+
+ public Pattern getPattern() {
+ return pattern;
+ }
+
+ public String getBeanName() {
+ return beanName;
+ }
+
+ public static TopicEnum find(String topic) {
+ return Arrays.stream(TopicEnum.values()).filter(topicEnum -> topicEnum.pattern.matcher(topic).matches()).findAny().orElse(UNKNOWN);
+ }
+}
diff --git a/mh-common/src/main/java/com/mh/common/model/request/AdvantechDatas.java b/mh-common/src/main/java/com/mh/common/model/request/AdvantechDatas.java
new file mode 100644
index 0000000..8f27aab
--- /dev/null
+++ b/mh-common/src/main/java/com/mh/common/model/request/AdvantechDatas.java
@@ -0,0 +1,25 @@
+package com.mh.common.model.request;
+
+import lombok.Data;
+
+/**
+ * @author LJF
+ * @version 1.0
+ * @project EEMCS
+ * @description 研华数据体
+ * @date 2025-01-22 14:47:25
+ */
+@Data
+public class AdvantechDatas {
+
+ /**
+ * 对应研华的标签值
+ */
+ private String tag;
+
+ /**
+ * 上报值
+ */
+ private String value;
+
+}
diff --git a/mh-common/src/main/java/com/mh/common/model/request/AdvantechReceiver.java b/mh-common/src/main/java/com/mh/common/model/request/AdvantechReceiver.java
new file mode 100644
index 0000000..5fae9ec
--- /dev/null
+++ b/mh-common/src/main/java/com/mh/common/model/request/AdvantechReceiver.java
@@ -0,0 +1,29 @@
+package com.mh.common.model.request;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import lombok.Data;
+
+import java.util.List;
+
+/**
+ * @author LJF
+ * @version 1.0
+ * @project EEMCS
+ * @description 研华网关发送接收数据
+ * @date 2025-01-22 14:43:15
+ */
+@Data
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class AdvantechReceiver {
+
+ /**
+ * 数据集合
+ */
+ private List d;
+
+ /**
+ * 主动上报数据时间(带T类型)
+ */
+ private String ts;
+
+}
diff --git a/mh-common/src/main/java/com/mh/common/model/response/AdvantechResponse.java b/mh-common/src/main/java/com/mh/common/model/response/AdvantechResponse.java
new file mode 100644
index 0000000..bcae0ba
--- /dev/null
+++ b/mh-common/src/main/java/com/mh/common/model/response/AdvantechResponse.java
@@ -0,0 +1,29 @@
+package com.mh.common.model.response;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import lombok.Data;
+
+import java.util.List;
+
+/**
+ * @author LJF
+ * @version 1.0
+ * @project EEMCS
+ * @description 研华网关发送接收数据
+ * @date 2025-01-22 14:43:15
+ */
+@Data
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class AdvantechResponse {
+
+ /**
+ * 数据集合
+ */
+ private List w;
+
+ /**
+ * 主动上报数据时间(带T类型)
+ */
+ private String ts;
+
+}
diff --git a/mh-framework/src/main/java/com/mh/framework/manager/factory/AsyncFactory.java b/mh-framework/src/main/java/com/mh/framework/manager/factory/AsyncFactory.java
index a3c16df..af7b983 100644
--- a/mh-framework/src/main/java/com/mh/framework/manager/factory/AsyncFactory.java
+++ b/mh-framework/src/main/java/com/mh/framework/manager/factory/AsyncFactory.java
@@ -1,6 +1,8 @@
package com.mh.framework.manager.factory;
import java.util.TimerTask;
+
+import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.mh.common.constant.Constants;
@@ -21,6 +23,7 @@ import eu.bitwalker.useragentutils.UserAgent;
*
* @author mh
*/
+@Slf4j
public class AsyncFactory
{
private static final Logger sys_user_logger = LoggerFactory.getLogger("sys-user");
@@ -99,4 +102,22 @@ public class AsyncFactory
}
};
}
+
+ /**
+ * mqtt线程开启日志记录
+ *
+ * @return 任务task
+ */
+ public static TimerTask mqttRecord()
+ {
+ return new TimerTask()
+ {
+ @Override
+ public void run()
+ {
+ // 远程查询操作地点
+ log.info("mqtt开启日志记录");
+ }
+ };
+ }
}
diff --git a/mh-common/src/main/java/com/mh/common/config/mqtt/MqttConfig.java b/mh-framework/src/main/java/com/mh/framework/mqtt/config/MqttConfig.java
similarity index 98%
rename from mh-common/src/main/java/com/mh/common/config/mqtt/MqttConfig.java
rename to mh-framework/src/main/java/com/mh/framework/mqtt/config/MqttConfig.java
index d06a3cc..3488b5c 100644
--- a/mh-common/src/main/java/com/mh/common/config/mqtt/MqttConfig.java
+++ b/mh-framework/src/main/java/com/mh/framework/mqtt/config/MqttConfig.java
@@ -1,4 +1,4 @@
-package com.mh.common.config.mqtt;
+package com.mh.framework.mqtt.config;
import com.mh.common.enums.MqttClientOptions;
import com.mh.common.enums.MqttProtocolEnum;
diff --git a/mh-common/src/main/java/com/mh/common/config/mqtt/MqttInboundConfig.java b/mh-framework/src/main/java/com/mh/framework/mqtt/config/MqttInboundConfig.java
similarity index 96%
rename from mh-common/src/main/java/com/mh/common/config/mqtt/MqttInboundConfig.java
rename to mh-framework/src/main/java/com/mh/framework/mqtt/config/MqttInboundConfig.java
index 4ef5a26..c72e245 100644
--- a/mh-common/src/main/java/com/mh/common/config/mqtt/MqttInboundConfig.java
+++ b/mh-framework/src/main/java/com/mh/framework/mqtt/config/MqttInboundConfig.java
@@ -1,4 +1,4 @@
-package com.mh.common.config.mqtt;
+package com.mh.framework.mqtt.config;
import com.mh.common.constant.ChannelName;
import com.mh.common.enums.MqttClientOptions;
@@ -69,7 +69,7 @@ public class MqttInboundConfig {
* @return
*/
@Bean
- @ServiceActivator(inputChannel = ChannelName.DEFAULT)
+ @ServiceActivator(inputChannel = ChannelName.DEFAULT_BOUND)
public MessageHandler handler() {
return message -> {
log.info("The default channel does not handle messages." +
diff --git a/mh-common/src/main/java/com/mh/common/config/mqtt/MqttMessageChannel.java b/mh-framework/src/main/java/com/mh/framework/mqtt/config/MqttMessageChannel.java
similarity index 84%
rename from mh-common/src/main/java/com/mh/common/config/mqtt/MqttMessageChannel.java
rename to mh-framework/src/main/java/com/mh/framework/mqtt/config/MqttMessageChannel.java
index c3e26bb..7ceda6c 100644
--- a/mh-common/src/main/java/com/mh/common/config/mqtt/MqttMessageChannel.java
+++ b/mh-framework/src/main/java/com/mh/framework/mqtt/config/MqttMessageChannel.java
@@ -1,16 +1,13 @@
-package com.mh.common.config.mqtt;
+package com.mh.framework.mqtt.config;
import com.mh.common.constant.ChannelName;
import lombok.extern.slf4j.Slf4j;
-import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.ExecutorChannel;
import org.springframework.messaging.MessageChannel;
-import java.util.concurrent.Executor;
-
/**
* @author LJF
* @version 1.0
@@ -22,12 +19,9 @@ import java.util.concurrent.Executor;
@Configuration
public class MqttMessageChannel {
- @Autowired
- private Executor threadPool;
-
@Bean(name = ChannelName.INBOUND)
public MessageChannel inboundChannel() {
- return new ExecutorChannel(threadPool);
+ return new DirectChannel();
}
@Bean(name = ChannelName.EVENTS_UPLOAD_INBOUND)
diff --git a/mh-common/src/main/java/com/mh/common/config/mqtt/MqttOutboundConfig.java b/mh-framework/src/main/java/com/mh/framework/mqtt/config/MqttOutboundConfig.java
similarity index 97%
rename from mh-common/src/main/java/com/mh/common/config/mqtt/MqttOutboundConfig.java
rename to mh-framework/src/main/java/com/mh/framework/mqtt/config/MqttOutboundConfig.java
index 287e0b9..2c73e2f 100644
--- a/mh-common/src/main/java/com/mh/common/config/mqtt/MqttOutboundConfig.java
+++ b/mh-framework/src/main/java/com/mh/framework/mqtt/config/MqttOutboundConfig.java
@@ -1,4 +1,4 @@
-package com.mh.common.config.mqtt;
+package com.mh.framework.mqtt.config;
import com.mh.common.constant.ChannelName;
import lombok.extern.slf4j.Slf4j;
diff --git a/mh-framework/src/main/java/com/mh/framework/mqtt/handler/InboundMessageRouter.java b/mh-framework/src/main/java/com/mh/framework/mqtt/handler/InboundMessageRouter.java
new file mode 100644
index 0000000..99fb838
--- /dev/null
+++ b/mh-framework/src/main/java/com/mh/framework/mqtt/handler/InboundMessageRouter.java
@@ -0,0 +1,47 @@
+package com.mh.framework.mqtt.handler;
+
+import com.mh.common.constant.ChannelName;
+import com.mh.common.constant.TopicEnum;
+import com.mh.common.utils.spring.SpringUtils;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.integration.annotation.Router;
+import org.springframework.integration.mqtt.support.MqttHeaders;
+import org.springframework.integration.router.AbstractMessageRouter;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.MessageChannel;
+import org.springframework.messaging.MessageHeaders;
+import org.springframework.stereotype.Component;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Objects;
+
+/**
+ * @author LJF
+ * @version 1.0
+ * @project springboot-mqtt-demo
+ * @description 入站消息路由分发中心
+ * @date 2024-10-29 17:04:17
+ */
+@Slf4j
+@Component
+public class InboundMessageRouter extends AbstractMessageRouter {
+
+ /**
+ * 目前只需要这个方式,后期在拓展使用IntegrationFlow方式
+ * @param message
+ * @return
+ */
+ @Router(inputChannel = ChannelName.INBOUND)
+ @Override
+ protected Collection determineTargetChannels(Message> message) {
+ MessageHeaders headers = message.getHeaders();
+ String topic = Objects.requireNonNull(headers.get(MqttHeaders.RECEIVED_TOPIC)).toString();
+ byte[] payload = (byte[]) message.getPayload();
+ log.info("从当前主题 topic: {}, 接收到的消息:{}", topic, new String(payload));
+ // 找到对应的主题消息通道
+ TopicEnum topicEnum = TopicEnum.find(topic);
+ MessageChannel bean = (MessageChannel) SpringUtils.getBean(topicEnum.getBeanName());
+ return Collections.singleton(bean);
+ }
+}
diff --git a/mh-system/src/main/java/com/mh/system/service/mqtt/IEventsService.java b/mh-framework/src/main/java/com/mh/framework/mqtt/service/IEventsService.java
similarity index 86%
rename from mh-system/src/main/java/com/mh/system/service/mqtt/IEventsService.java
rename to mh-framework/src/main/java/com/mh/framework/mqtt/service/IEventsService.java
index 18ffdb5..240c674 100644
--- a/mh-system/src/main/java/com/mh/system/service/mqtt/IEventsService.java
+++ b/mh-framework/src/main/java/com/mh/framework/mqtt/service/IEventsService.java
@@ -1,4 +1,4 @@
-package com.mh.system.service.mqtt;
+package com.mh.framework.mqtt.service;
import org.springframework.messaging.MessageHeaders;
@@ -39,11 +39,11 @@ public interface IEventsService {
void handleInboundControl(byte[] receiver, MessageHeaders headers) throws IOException;
/**
- * 处理接收上报
+ * 处理发送
* @param receiver
* @param headers
* @throws IOException
*/
- void handleInboundReceive(byte[] receiver, MessageHeaders headers) throws IOException;
+ void handleInboundSend(byte[] receiver, MessageHeaders headers) throws IOException;
}
diff --git a/mh-system/src/main/java/com/mh/system/service/mqtt/IMqttMessageGateway.java b/mh-framework/src/main/java/com/mh/framework/mqtt/service/IMqttGatewayService.java
similarity index 84%
rename from mh-system/src/main/java/com/mh/system/service/mqtt/IMqttMessageGateway.java
rename to mh-framework/src/main/java/com/mh/framework/mqtt/service/IMqttGatewayService.java
index 58a1a67..e2cdf50 100644
--- a/mh-system/src/main/java/com/mh/system/service/mqtt/IMqttMessageGateway.java
+++ b/mh-framework/src/main/java/com/mh/framework/mqtt/service/IMqttGatewayService.java
@@ -1,6 +1,7 @@
-package com.mh.system.service.mqtt;
+package com.mh.framework.mqtt.service;
import com.mh.common.constant.ChannelName;
+import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
@@ -9,13 +10,14 @@ import org.springframework.stereotype.Component;
/**
* @author LJF
* @version 1.0
- * @project springboot-mqtt-demo
+ * @project springboot-mqtt
* @description 消息网关
* @date 2024-10-30 14:43:55
*/
@Component
+@Configuration
@MessagingGateway(defaultRequestChannel = ChannelName.OUTBOUND)
-public interface IMqttMessageGateway {
+public interface IMqttGatewayService {
/**
* 发送消息
diff --git a/mh-system/src/main/java/com/mh/system/service/mqtt/IMqttMsgSenderService.java b/mh-framework/src/main/java/com/mh/framework/mqtt/service/IMqttMsgSenderService.java
similarity index 94%
rename from mh-system/src/main/java/com/mh/system/service/mqtt/IMqttMsgSenderService.java
rename to mh-framework/src/main/java/com/mh/framework/mqtt/service/IMqttMsgSenderService.java
index 050fd9b..11a06ac 100644
--- a/mh-system/src/main/java/com/mh/system/service/mqtt/IMqttMsgSenderService.java
+++ b/mh-framework/src/main/java/com/mh/framework/mqtt/service/IMqttMsgSenderService.java
@@ -1,8 +1,8 @@
-package com.mh.system.service.mqtt;
+package com.mh.framework.mqtt.service;
import com.fasterxml.jackson.core.type.TypeReference;
-import com.mh.user.model.response.CommonTopicResponse;
-import com.mh.user.model.response.ServiceReply;
+import com.mh.common.model.response.CommonTopicResponse;
+import com.mh.common.model.response.ServiceReply;
/**
* @author LJF
diff --git a/mh-system/src/main/java/com/mh/system/service/mqtt/IMqttTopicService.java b/mh-framework/src/main/java/com/mh/framework/mqtt/service/IMqttTopicService.java
similarity index 95%
rename from mh-system/src/main/java/com/mh/system/service/mqtt/IMqttTopicService.java
rename to mh-framework/src/main/java/com/mh/framework/mqtt/service/IMqttTopicService.java
index 7e74089..04edcd9 100644
--- a/mh-system/src/main/java/com/mh/system/service/mqtt/IMqttTopicService.java
+++ b/mh-framework/src/main/java/com/mh/framework/mqtt/service/IMqttTopicService.java
@@ -1,4 +1,4 @@
-package com.mh.system.service.mqtt;
+package com.mh.framework.mqtt.service;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
diff --git a/mh-system/src/main/java/com/mh/system/service/mqtt/impl/EventsServiceImpl.java b/mh-framework/src/main/java/com/mh/framework/mqtt/service/impl/EventsServiceImpl.java
similarity index 67%
rename from mh-system/src/main/java/com/mh/system/service/mqtt/impl/EventsServiceImpl.java
rename to mh-framework/src/main/java/com/mh/framework/mqtt/service/impl/EventsServiceImpl.java
index a003006..adc35fc 100644
--- a/mh-system/src/main/java/com/mh/system/service/mqtt/impl/EventsServiceImpl.java
+++ b/mh-framework/src/main/java/com/mh/framework/mqtt/service/impl/EventsServiceImpl.java
@@ -1,10 +1,9 @@
-package com.mh.system.service.mqtt.impl;
+package com.mh.framework.mqtt.service.impl;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.mh.common.constant.ChannelName;
-import com.mh.common.core.redis.RedisCache;
-import com.mh.common.model.request.CommonTopicReceiver;
-import com.mh.system.service.mqtt.IEventsService;
+import com.mh.common.model.request.AdvantechDatas;
+import com.mh.framework.mqtt.service.IEventsService;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@@ -13,7 +12,6 @@ import org.springframework.messaging.MessageHeaders;
import org.springframework.stereotype.Service;
import java.io.IOException;
-import java.util.concurrent.TimeUnit;
/**
* @author LJF
@@ -29,9 +27,6 @@ public class EventsServiceImpl implements IEventsService {
@Autowired
private ObjectMapper mapper;
- @Autowired
- private RedisCache redisCache;
-
@ServiceActivator(inputChannel = ChannelName.EVENTS_UPLOAD_INBOUND)
@Override
public void handleInboundUpload(byte[] receiver, MessageHeaders headers) {
@@ -50,22 +45,21 @@ public class EventsServiceImpl implements IEventsService {
handleInboundData(receiver, "控制指令下发");
}
- @ServiceActivator(inputChannel = ChannelName.EVENTS_RECEIVE_INBOUND)
+ @ServiceActivator(inputChannel = ChannelName.EVENTS_SEND_INBOUND)
@Override
- public void handleInboundReceive(byte[] receiver, MessageHeaders headers) {
- String receiveStr = new String(receiver, CharsetUtil.UTF_8);
- log.info("接收到控制指令返回=>{}", receiveStr);
- // 设置成redis,15秒时间响应
- redisCache.setCacheObject("receiveCmd", receiveStr, 15, TimeUnit.SECONDS);
+ public void handleInboundSend(byte[] receiver, MessageHeaders headers) throws IOException {
+ String sendStr = new String(receiver, CharsetUtil.UTF_8);
+ log.info("接收到控制指令下发=>{}", sendStr);
}
private void handleInboundData(byte[] receiver, String logMessage) {
try {
- CommonTopicReceiver commonTopicReceiver = mapper.readValue(receiver, CommonTopicReceiver.class);
+ AdvantechDatas commonTopicReceiver = mapper.readValue(receiver, AdvantechDatas.class);
log.info("{}: {}", logMessage, commonTopicReceiver);
} catch (IOException e) {
log.error("处理数据时发生错误: ", e);
}
}
+
}
diff --git a/mh-system/src/main/java/com/mh/system/service/mqtt/impl/MqttMsgSenderServiceImpl.java b/mh-framework/src/main/java/com/mh/framework/mqtt/service/impl/MqttMsgSenderServiceImpl.java
similarity index 87%
rename from mh-system/src/main/java/com/mh/system/service/mqtt/impl/MqttMsgSenderServiceImpl.java
rename to mh-framework/src/main/java/com/mh/framework/mqtt/service/impl/MqttMsgSenderServiceImpl.java
index b4c7bac..f4bddd5 100644
--- a/mh-system/src/main/java/com/mh/system/service/mqtt/impl/MqttMsgSenderServiceImpl.java
+++ b/mh-framework/src/main/java/com/mh/framework/mqtt/service/impl/MqttMsgSenderServiceImpl.java
@@ -1,13 +1,14 @@
-package com.mh.system.service.mqtt.impl;
+package com.mh.framework.mqtt.service.impl;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.mh.common.constant.TopicConst;
-import com.mh.user.model.response.CommonTopicResponse;
-import com.mh.user.model.response.ServiceReply;
-import com.mh.user.service.mqtt.IMqttMessageGateway;
-import com.mh.user.service.mqtt.IMqttMsgSenderService;
+import com.mh.common.model.response.CommonTopicResponse;
+import com.mh.common.model.response.ServiceReply;
+import com.mh.common.utils.spring.SpringUtils;
+import com.mh.framework.mqtt.service.IMqttGatewayService;
+import com.mh.framework.mqtt.service.IMqttMsgSenderService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@@ -27,8 +28,12 @@ import java.util.UUID;
@Service
public class MqttMsgSenderServiceImpl implements IMqttMsgSenderService {
+ private final IMqttGatewayService iMqttGatewayService;
+
@Autowired
- private IMqttMessageGateway mqttMessageGateway;
+ public MqttMsgSenderServiceImpl(IMqttGatewayService myGateway) {
+ this.iMqttGatewayService = myGateway;
+ }
@Autowired
private ObjectMapper mapper;
@@ -43,10 +48,11 @@ public class MqttMsgSenderServiceImpl implements IMqttMsgSenderService {
public void publish(String topic, String pushMessage) {
synchronized (this) {//注意:这里一定要同步,否则,在多线程publish的情况下,线程会发生死锁,分析见文章最后补充
try {
- mqttMessageGateway.publish(topic, pushMessage, 0);
+ iMqttGatewayService.publish(topic, pushMessage, 0);
log.info("发送主题:{},消息:{}", topic, pushMessage);
} catch (Exception e) {
log.error("发送主题异常:{},消息:{}", topic, pushMessage, e);
+ throw new RuntimeException(e);
}
}
}
@@ -60,7 +66,7 @@ public class MqttMsgSenderServiceImpl implements IMqttMsgSenderService {
public void publish(String topic, int qos, CommonTopicResponse response) {
try {
log.info("发送主题:{},消息:{}", topic, response.toString());
- mqttMessageGateway.publish(topic, mapper.writeValueAsBytes(response), qos);
+ iMqttGatewayService.publish(topic, mapper.writeValueAsBytes(response), qos);
} catch (JsonProcessingException e) {
log.error("发送主题:{},消息:{}", topic, response.toString(), e);
throw new RuntimeException(e);
diff --git a/mh-system/src/main/java/com/mh/system/service/mqtt/impl/MqttTopicServiceImpl.java b/mh-framework/src/main/java/com/mh/framework/mqtt/service/impl/MqttTopicServiceImpl.java
similarity index 85%
rename from mh-system/src/main/java/com/mh/system/service/mqtt/impl/MqttTopicServiceImpl.java
rename to mh-framework/src/main/java/com/mh/framework/mqtt/service/impl/MqttTopicServiceImpl.java
index 11efee3..1a6eeb7 100644
--- a/mh-system/src/main/java/com/mh/system/service/mqtt/impl/MqttTopicServiceImpl.java
+++ b/mh-framework/src/main/java/com/mh/framework/mqtt/service/impl/MqttTopicServiceImpl.java
@@ -1,11 +1,10 @@
-package com.mh.system.service.mqtt.impl;
+package com.mh.framework.mqtt.service.impl;
-import com.mh.user.service.mqtt.IMqttTopicService;
+import com.mh.framework.mqtt.service.IMqttTopicService;
+import jakarta.annotation.Resource;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.stereotype.Service;
-import javax.annotation.Resource;
-
/**
* @author LJF
* @version 1.0