diff --git a/pom.xml b/pom.xml
index b864d0e..8706cff 100644
--- a/pom.xml
+++ b/pom.xml
@@ -46,6 +46,25 @@
1.0.1.RELEASE
+
+
+ org.springframework
+ spring-messaging
+ 6.2.1
+
+
+
+
+ org.springframework.integration
+ spring-integration-mqtt
+ 6.3.4
+
+
+ org.springframework.boot
+ spring-boot-starter-integration
+ 3.4.2
+
+
diff --git a/user-service/src/main/java/com/mh/user/config/MHConfig.java b/user-service/src/main/java/com/mh/user/config/MHConfig.java
new file mode 100644
index 0000000..7031388
--- /dev/null
+++ b/user-service/src/main/java/com/mh/user/config/MHConfig.java
@@ -0,0 +1,122 @@
+package com.mh.user.config;
+
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.stereotype.Component;
+
+/**
+ * 读取项目相关配置
+ *
+ * @author mh
+ */
+@Component
+@ConfigurationProperties(prefix = "mh")
+public class MHConfig
+{
+ /** 项目名称 */
+ private String name;
+
+ /** 版本 */
+ private String version;
+
+ /** 版权年份 */
+ private String copyrightYear;
+
+ /** 上传路径 */
+ private static String profile;
+
+ /** 获取地址开关 */
+ private static boolean addressEnabled;
+
+ /** 验证码类型 */
+ private static String captchaType;
+
+ public String getName()
+ {
+ return name;
+ }
+
+ public void setName(String name)
+ {
+ this.name = name;
+ }
+
+ public String getVersion()
+ {
+ return version;
+ }
+
+ public void setVersion(String version)
+ {
+ this.version = version;
+ }
+
+ public String getCopyrightYear()
+ {
+ return copyrightYear;
+ }
+
+ public void setCopyrightYear(String copyrightYear)
+ {
+ this.copyrightYear = copyrightYear;
+ }
+
+ public static String getProfile()
+ {
+ return profile;
+ }
+
+ public void setProfile(String profile)
+ {
+ MHConfig.profile = profile;
+ }
+
+ public static boolean isAddressEnabled()
+ {
+ return addressEnabled;
+ }
+
+ public void setAddressEnabled(boolean addressEnabled)
+ {
+ MHConfig.addressEnabled = addressEnabled;
+ }
+
+ public static String getCaptchaType() {
+ return captchaType;
+ }
+
+ public void setCaptchaType(String captchaType) {
+ MHConfig.captchaType = captchaType;
+ }
+
+ /**
+ * 获取导入上传路径
+ */
+ public static String getImportPath()
+ {
+ return getProfile() + "/import";
+ }
+
+ /**
+ * 获取头像上传路径
+ */
+ public static String getAvatarPath()
+ {
+ return getProfile() + "/avatar";
+ }
+
+ /**
+ * 获取下载路径
+ */
+ public static String getDownloadPath()
+ {
+ return getProfile() + "/download/";
+ }
+
+ /**
+ * 获取上传路径
+ */
+ public static String getUploadPath()
+ {
+ return getProfile() + "/upload";
+ }
+}
diff --git a/user-service/src/main/java/com/mh/user/constants/ChannelName.java b/user-service/src/main/java/com/mh/user/constants/ChannelName.java
new file mode 100644
index 0000000..8a1f901
--- /dev/null
+++ b/user-service/src/main/java/com/mh/user/constants/ChannelName.java
@@ -0,0 +1,59 @@
+package com.mh.user.constants;
+
+/**
+ * @author LJF
+ * @version 1.0
+ * @project springboot-mqtt-demo
+ * @description 声明所有通道
+ * @date 2024-10-29 16:04:19
+ */
+public class ChannelName {
+
+ /**
+ * 默认通道名称(防止出错)
+ */
+ public static final String DEFAULT_BOUND = "default_bound";
+
+ /**
+ * 主动上报入站
+ */
+ public static final String INBOUND = "inbound";
+
+ /**
+ * 出站
+ */
+ public static final String OUTBOUND = "outbound";
+
+
+ /**
+ * 入站主动上报
+ */
+ public static final String EVENTS_UPLOAD_INBOUND = "events_upload_inbound";
+
+ /**
+ * 入站主动采集
+ */
+ public static final String EVENTS_COLLECTION_INBOUND = "events_collection_inbound";
+
+ /**
+ * 入站主动控制
+ */
+ public static final String EVENTS_CONTROL_INBOUND = "events_control_inbound";
+
+ /**
+ * 默认进站处理
+ */
+ public static final String EVENTS_DEFAULT_INBOUND = "events_default_inbound";
+
+ public static final String REPLY_EVENTS_OUTBOUND = "reply_events_outbound";
+
+ /**
+ * 新珠江收到的信息
+ */
+ public static final String EVENTS_RECEIVE_INBOUND = "events_receive_inbound";
+
+ /**
+ * 接收服务端的数据报文
+ */
+ public static final String EVENTS_SEND_INBOUND = "events_send_inbound";
+}
diff --git a/user-service/src/main/java/com/mh/user/constants/CommonTopicResponse.java b/user-service/src/main/java/com/mh/user/constants/CommonTopicResponse.java
new file mode 100644
index 0000000..d123179
--- /dev/null
+++ b/user-service/src/main/java/com/mh/user/constants/CommonTopicResponse.java
@@ -0,0 +1,36 @@
+package com.mh.user.constants;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * Unified Topic response format
+ *
+ * @author sean.zhou
+ * @version 0.1
+ * @date 2021/11/15
+ */
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+@Builder
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class CommonTopicResponse {
+
+ /**
+ * The command is sent and the response is matched by the tid and bid fields in the message,
+ * and the reply should keep the tid and bid the same.
+ */
+ private String tid;
+
+ private String bid;
+
+ private String method;
+
+ private T data;
+
+ private Long timestamp;
+}
diff --git a/user-service/src/main/java/com/mh/user/constants/MqttClientOptions.java b/user-service/src/main/java/com/mh/user/constants/MqttClientOptions.java
new file mode 100644
index 0000000..a2ec1a2
--- /dev/null
+++ b/user-service/src/main/java/com/mh/user/constants/MqttClientOptions.java
@@ -0,0 +1,108 @@
+package com.mh.user.constants;
+
+/**
+ * @author LJF
+ * @version 1.0
+ * @project springboot-mqtt-demo
+ * @description mqtt连接的参数
+ * @date 2024-10-29 14:46:24
+ */
+public class MqttClientOptions {
+
+ private MqttProtocolEnum protocol;
+
+ private String host;
+
+ private Integer port;
+
+ private String username;
+
+ private String password;
+
+ private String clientId;
+
+ private String path;
+
+ /**
+ * 客户端连接的时候,订阅的主题
+ */
+ private String inboundTopic;
+
+ public MqttProtocolEnum getProtocol() {
+ return protocol;
+ }
+
+ public void setProtocol(MqttProtocolEnum protocol) {
+ this.protocol = protocol;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public void setHost(String host) {
+ this.host = host;
+ }
+
+ public Integer getPort() {
+ return port;
+ }
+
+ public void setPort(Integer port) {
+ this.port = port;
+ }
+
+ public String getUsername() {
+ return username;
+ }
+
+ public void setUsername(String username) {
+ this.username = username;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ public String getClientId() {
+ return clientId;
+ }
+
+ public void setClientId(String clientId) {
+ this.clientId = clientId;
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ public void setPath(String path) {
+ this.path = path;
+ }
+
+ public String getInboundTopic() {
+ return inboundTopic;
+ }
+
+ public void setInboundTopic(String inboundTopic) {
+ this.inboundTopic = inboundTopic;
+ }
+
+ @Override
+ public String toString() {
+ return "MqttClientOptions{" +
+ "protocol=" + protocol +
+ ", host='" + host + '\'' +
+ ", port=" + port +
+ ", username='" + username + '\'' +
+ ", password='" + password + '\'' +
+ ", clientId='" + clientId + '\'' +
+ ", path='" + path + '\'' +
+ ", inboundTopic='" + inboundTopic + '\'' +
+ '}';
+ }
+}
diff --git a/user-service/src/main/java/com/mh/user/constants/MqttProtocolEnum.java b/user-service/src/main/java/com/mh/user/constants/MqttProtocolEnum.java
new file mode 100644
index 0000000..ca80fac
--- /dev/null
+++ b/user-service/src/main/java/com/mh/user/constants/MqttProtocolEnum.java
@@ -0,0 +1,33 @@
+package com.mh.user.constants;
+
+/**
+ * @author LJF
+ * @version 1.0
+ * @project springboot-mqtt-demo
+ * @description 采用哪种协议进行数据交互
+ * @date 2024-10-29 15:21:07
+ */
+public enum MqttProtocolEnum {
+
+ MQTT("tcp"),
+
+ MQTTS("tcp"),
+
+ WS("ws"),
+
+ WSS("wss");
+
+ final String protocol;
+
+ MqttProtocolEnum(String protocol) {
+ this.protocol = protocol;
+ }
+
+ public String getProtocolAddr() {
+ return protocol + "://";
+ }
+
+ public String getProtocol() {
+ return protocol;
+ }
+}
diff --git a/user-service/src/main/java/com/mh/user/constants/MqttUseEnum.java b/user-service/src/main/java/com/mh/user/constants/MqttUseEnum.java
new file mode 100644
index 0000000..5a9384d
--- /dev/null
+++ b/user-service/src/main/java/com/mh/user/constants/MqttUseEnum.java
@@ -0,0 +1,16 @@
+package com.mh.user.constants;
+
+/**
+ * @author LJF
+ * @version 1.0
+ * @project springboot-mqtt-demo
+ * @description mqtt选择模式
+ * @date 2024-10-29 15:19:21
+ */
+public enum MqttUseEnum {
+
+ BASIC,
+
+ DRC
+
+}
diff --git a/user-service/src/main/java/com/mh/user/constants/ServiceReply.java b/user-service/src/main/java/com/mh/user/constants/ServiceReply.java
new file mode 100644
index 0000000..40fdf4e
--- /dev/null
+++ b/user-service/src/main/java/com/mh/user/constants/ServiceReply.java
@@ -0,0 +1,20 @@
+package com.mh.user.constants;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import lombok.Data;
+
+/**
+ * @author sean.zhou
+ * @version 0.1
+ * @date 2021/11/22
+ */
+@Data
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class ServiceReply {
+
+ private Integer result;
+
+ private T info;
+
+ private T output;
+}
diff --git a/user-service/src/main/java/com/mh/user/constants/TopicConst.java b/user-service/src/main/java/com/mh/user/constants/TopicConst.java
new file mode 100644
index 0000000..2725a06
--- /dev/null
+++ b/user-service/src/main/java/com/mh/user/constants/TopicConst.java
@@ -0,0 +1,31 @@
+package com.mh.user.constants;
+
+/**
+ * All the topics that need to be used in the project.
+ *
+ * @author ljf
+ * @version 0.1
+ * @date 2025-01-22
+ */
+public class TopicConst {
+
+ public static final String MH_UPLOAD = "mh_upload/";
+
+ public static final String EVENTS_UPLOAD = "events_upload/";
+
+ public static final String MH_COLLECTION = "mh_collection/";
+
+ public static final String EVENTS_COLLECTION = "events_collection/";
+
+ public static final String MH_CONTROL = "mh_control/";
+
+ public static final String EVENTS_CONTROL = "events_control/";
+
+ public static final String REGEX_SN = "[A-Za-z0-9]+";
+
+ public static final String THING_MODEL_PRE = "thing/";
+
+ public static final String PRODUCT = "product/";
+
+ public static final String SERVICES_SUF = "/services";
+}
diff --git a/user-service/src/main/java/com/mh/user/constants/TopicEnum.java b/user-service/src/main/java/com/mh/user/constants/TopicEnum.java
new file mode 100644
index 0000000..8f41357
--- /dev/null
+++ b/user-service/src/main/java/com/mh/user/constants/TopicEnum.java
@@ -0,0 +1,60 @@
+package com.mh.user.constants;
+
+import java.util.Arrays;
+import java.util.regex.Pattern;
+
+import static com.mh.user.constants.TopicConst.*;
+
+/**
+ * @author ljf
+ * @version 1.0
+ * @description: TODO
+ * @date 2024/11/06 14:28
+ */
+public enum TopicEnum {
+
+ /**
+ * 冷水机组客户端主动上报数据
+ */
+ CLIENT_UPLOAD_DATA(Pattern.compile("^" + MH_UPLOAD + EVENTS_UPLOAD + REGEX_SN + "$"), ChannelName.EVENTS_UPLOAD_INBOUND),
+
+ /**
+ * 服务端采集数据
+ */
+ SERVER_COLLECTION_DATA(Pattern.compile("^" + MH_COLLECTION + EVENTS_COLLECTION + REGEX_SN + "$"), ChannelName.EVENTS_COLLECTION_INBOUND),
+
+ /**
+ * 服务端控制指令
+ */
+ SERVER_CONTROL_DATA(Pattern.compile("^" + MH_CONTROL + EVENTS_CONTROL + REGEX_SN + "$"), ChannelName.EVENTS_CONTROL_INBOUND),
+
+ /**
+ * 订阅服务端发送的主题命令
+ */
+ SERVER_SEND_DATA(Pattern.compile("^A/cmd/ctl/send" + "$"), ChannelName.EVENTS_SEND_INBOUND),
+
+ UNKNOWN(Pattern.compile("^.*$"), ChannelName.DEFAULT_BOUND);
+
+ final Pattern pattern;
+
+ final String beanName;
+
+ TopicEnum(Pattern pattern, String beanName) {
+ this.pattern = pattern;
+ this.beanName = beanName;
+ }
+
+ public Pattern getPattern() {
+ return pattern;
+ }
+
+ public String getBeanName() {
+ return beanName;
+ }
+
+ public static TopicEnum find(String proName, String topic) {
+ // 去掉第一个"/"以及之前数据
+ String finalTopic = topic.replaceFirst("^"+proName, "");;
+ return Arrays.stream(TopicEnum.values()).filter(topicEnum -> topicEnum.pattern.matcher(finalTopic).matches()).findAny().orElse(UNKNOWN);
+ }
+}
diff --git a/user-service/src/main/java/com/mh/user/entity/MqttSubscriptionEntity.java b/user-service/src/main/java/com/mh/user/entity/MqttSubscriptionEntity.java
new file mode 100644
index 0000000..0c526f6
--- /dev/null
+++ b/user-service/src/main/java/com/mh/user/entity/MqttSubscriptionEntity.java
@@ -0,0 +1,48 @@
+package com.mh.user.entity;
+
+import com.fasterxml.jackson.annotation.JsonFormat;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import lombok.Data;
+
+import java.util.Date;
+import java.util.Map;
+
+/**
+ * @author LJF
+ * @version 1.0
+ * @project EEMCS
+ * @description mqtt订阅管理
+ * @date 2025-02-14 13:47:07
+ */
+@Data
+public class MqttSubscriptionEntity {
+
+ private Long id;
+
+ private String topic;
+
+ private Short qos;
+
+ private String clientId;
+
+ private String status;
+
+ /** 创建者 */
+ private String createBy;
+
+ /** 创建时间 */
+ @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
+ private Date createTime;
+
+ /** 更新者 */
+ private String updateBy;
+
+ /** 更新时间 */
+ @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
+ private Date updateTime;
+
+ /** 备注 */
+ private String remark;
+
+}
diff --git a/user-service/src/main/java/com/mh/user/job/CollectionLoopRunner.java b/user-service/src/main/java/com/mh/user/job/CollectionLoopRunner.java
index 66534b8..144f81f 100644
--- a/user-service/src/main/java/com/mh/user/job/CollectionLoopRunner.java
+++ b/user-service/src/main/java/com/mh/user/job/CollectionLoopRunner.java
@@ -1,13 +1,17 @@
package com.mh.user.job;
+import com.mh.common.utils.StringUtils;
import com.mh.user.constants.Constant;
import com.mh.user.entity.AddCronJobReq;
+import com.mh.user.entity.MqttSubscriptionEntity;
import com.mh.user.manage.QuartzManager;
import com.mh.user.netty.NettyEchoServer;
import com.mh.user.serialport.SerialPortListener;
import com.mh.user.serialport.SerialPortUtil;
import com.mh.user.serialport.SerialTool;
import com.mh.user.service.DeviceCodeParamService;
+import com.mh.user.service.MqttSubscriptionService;
+import com.mh.user.service.mqtt.service.IMqttTopicService;
import com.mh.user.utils.CacheUtil;
import com.mh.user.utils.ExchangeStringUtil;
import com.mh.user.utils.GetReadOrder485;
@@ -19,6 +23,7 @@ import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
/**
@@ -42,6 +47,12 @@ public class CollectionLoopRunner implements ApplicationRunner {
@Resource
private GetWeatherInfoJob getWeatherInfoJob;
+ @Resource
+ private MqttSubscriptionService iMqttSubscriptionService;
+
+ @Resource
+ private IMqttTopicService iMqttTopicService;
+
@Override
public void run(ApplicationArguments args) throws Exception {
// collectionMeterAndCloud();//采集
@@ -53,8 +64,28 @@ public class CollectionLoopRunner implements ApplicationRunner {
// 获取天气数据
getWeatherInfoJob.getWeatherInfo();
// 启动netty端口
- NettyEchoServer nettyEchoServer = new NettyEchoServer();
- nettyEchoServer.bind(8098);
+// NettyEchoServer nettyEchoServer = new NettyEchoServer();
+// nettyEchoServer.bind(8098);
+ // 初始化mqtt订阅记录
+ initializeMqttSubscription();
+ }
+
+ /**
+ * 初始化mqtt订阅记录
+ */
+ private void initializeMqttSubscription() {
+ MqttSubscriptionEntity mqttSubscription = new MqttSubscriptionEntity();
+ mqttSubscription.setStatus("0");
+ List mqttSubscriptions = iMqttSubscriptionService.selectMqttSubList(mqttSubscription);
+ for (MqttSubscriptionEntity subscription : mqttSubscriptions) {
+ try {
+ if (!StringUtils.isBlank(subscription.getTopic())) {
+ iMqttTopicService.subscribe(subscription.getTopic(), subscription.getQos());
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
}
private void simulationCollection() throws Exception {
diff --git a/user-service/src/main/java/com/mh/user/mapper/MqttSubscriptionMapper.java b/user-service/src/main/java/com/mh/user/mapper/MqttSubscriptionMapper.java
new file mode 100644
index 0000000..cccb641
--- /dev/null
+++ b/user-service/src/main/java/com/mh/user/mapper/MqttSubscriptionMapper.java
@@ -0,0 +1,32 @@
+package com.mh.user.mapper;
+
+import com.mh.user.entity.MqttSubscriptionEntity;
+import org.apache.ibatis.annotations.Mapper;
+import org.apache.ibatis.annotations.Param;
+import org.apache.ibatis.annotations.Select;
+import tk.mybatis.mapper.common.BaseMapper;
+
+import java.util.List;
+
+/**
+ * @author LJF
+ * @version 1.0
+ * @project EEMCS
+ * @description mqtt订阅mapper类
+ * @date 2025-02-14 14:00:58
+ */
+@Mapper
+public interface MqttSubscriptionMapper extends BaseMapper {
+
+ @Select("")
+ List selectListByTopic(@Param("topic") String topic,
+ @Param("status") String status);
+
+ @Select("SELECT top 1 * FROM mqtt_subscription WHERE id = #{id}")
+ MqttSubscriptionEntity selectById(@Param("id") String id);
+}
diff --git a/user-service/src/main/java/com/mh/user/model/SanShiFengDatas.java b/user-service/src/main/java/com/mh/user/model/SanShiFengDatas.java
new file mode 100644
index 0000000..cc9c63e
--- /dev/null
+++ b/user-service/src/main/java/com/mh/user/model/SanShiFengDatas.java
@@ -0,0 +1,30 @@
+package com.mh.user.model;
+
+import lombok.Data;
+
+/**
+ * @author LJF
+ * @version 1.0
+ * @project EEMCS
+ * @description 研华数据体
+ * @date 2025-01-22 14:47:25
+ */
+@Data
+public class SanShiFengDatas {
+
+ /**
+ * 对应研华的标签值
+ */
+ private String tag;
+
+ /**
+ * 上报值
+ */
+ private T value;
+
+ /**
+ * 质量值
+ */
+ private T quality;
+
+}
diff --git a/user-service/src/main/java/com/mh/user/model/SanShiFengReceiver.java b/user-service/src/main/java/com/mh/user/model/SanShiFengReceiver.java
new file mode 100644
index 0000000..b76c17b
--- /dev/null
+++ b/user-service/src/main/java/com/mh/user/model/SanShiFengReceiver.java
@@ -0,0 +1,29 @@
+package com.mh.user.model;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import lombok.Data;
+
+import java.util.List;
+
+/**
+ * @author LJF
+ * @version 1.0
+ * @project EEMCS
+ * @description 研华网关发送接收数据
+ * @date 2025-01-22 14:43:15
+ */
+@Data
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class SanShiFengReceiver {
+
+ /**
+ * 数据集合
+ */
+ private List d;
+
+ /**
+ * 主动上报数据时间(带T类型)
+ */
+ private String ts;
+
+}
diff --git a/user-service/src/main/java/com/mh/user/service/MqttSubscriptionService.java b/user-service/src/main/java/com/mh/user/service/MqttSubscriptionService.java
new file mode 100644
index 0000000..9ff23a8
--- /dev/null
+++ b/user-service/src/main/java/com/mh/user/service/MqttSubscriptionService.java
@@ -0,0 +1,26 @@
+package com.mh.user.service;
+
+import com.mh.user.entity.MqttSubscriptionEntity;
+
+import java.util.List;
+
+/**
+ * @author LJF
+ * @version 1.0
+ * @project EEMCS
+ * @description mqtt订阅管理
+ * @date 2025-02-14 13:58:37
+ */
+public interface MqttSubscriptionService {
+
+ List selectMqttSubList(MqttSubscriptionEntity mqttSubscription);
+
+ MqttSubscriptionEntity selectMqttSubById(String msId);
+
+ int insertMqttSub(MqttSubscriptionEntity mqttSubscription);
+
+ int updateMqttSub(MqttSubscriptionEntity mqttSubscription);
+
+ int deleteMqttSubByIds(String[] msIds);
+
+}
diff --git a/user-service/src/main/java/com/mh/user/service/impl/DeviceControlServiceImpl.java b/user-service/src/main/java/com/mh/user/service/impl/DeviceControlServiceImpl.java
index 7e04c57..f3d9812 100644
--- a/user-service/src/main/java/com/mh/user/service/impl/DeviceControlServiceImpl.java
+++ b/user-service/src/main/java/com/mh/user/service/impl/DeviceControlServiceImpl.java
@@ -401,7 +401,7 @@ public class DeviceControlServiceImpl implements DeviceControlService {
if (Constant.WRITE.equals(type)) {
deviceCodeParam.setFunCode("06");
String time1 = split[1].substring(0, 2);
- String time2 = split[1].substring(2, 4);
+ String time2 = split[1].substring(3, 5);
deviceCodeParam.setDataValue(String.valueOf(Integer.parseInt(time1)*60 + Integer.parseInt(time2)));
serialPortModel.setDataValue(String.valueOf(Integer.parseInt(time1)*60 + Integer.parseInt(time2)));
}
@@ -409,11 +409,11 @@ public class DeviceControlServiceImpl implements DeviceControlService {
case "timeSetClose1":
// 关时间设置\读取1
registerStr = ExchangeStringUtil.addZeroForNum(ExchangeStringUtil.decToHex(String.valueOf(31 + (scene - 1) * 18)), 4);
- deviceCodeParam.setRegisterSize(2);
+ deviceCodeParam.setRegisterSize(1);
if (Constant.WRITE.equals(type)) {
deviceCodeParam.setFunCode("06");
String time1 = split[1].substring(0, 2);
- String time2 = split[1].substring(2, 4);
+ String time2 = split[1].substring(3, 5);
deviceCodeParam.setDataValue(String.valueOf(Integer.parseInt(time1)*60 + Integer.parseInt(time2)));
serialPortModel.setDataValue(String.valueOf(Integer.parseInt(time1)*60 + Integer.parseInt(time2)));
}
@@ -425,7 +425,7 @@ public class DeviceControlServiceImpl implements DeviceControlService {
if (Constant.WRITE.equals(type)) {
deviceCodeParam.setFunCode("06");
String time1 = split[1].substring(0, 2);
- String time2 = split[1].substring(2, 4);
+ String time2 = split[1].substring(3, 5);
deviceCodeParam.setDataValue(String.valueOf(Integer.parseInt(time1)*60 + Integer.parseInt(time2)));
serialPortModel.setDataValue(String.valueOf(Integer.parseInt(time1)*60 + Integer.parseInt(time2)));
}
@@ -433,11 +433,11 @@ public class DeviceControlServiceImpl implements DeviceControlService {
case "timeSetClose2":
// 关时间设置\读取2
registerStr = ExchangeStringUtil.addZeroForNum(ExchangeStringUtil.decToHex(String.valueOf(33 + (scene - 1) * 18)), 4);
- deviceCodeParam.setRegisterSize(2);
+ deviceCodeParam.setRegisterSize(1);
if (Constant.WRITE.equals(type)) {
deviceCodeParam.setFunCode("06");
String time1 = split[1].substring(0, 2);
- String time2 = split[1].substring(2, 4);
+ String time2 = split[1].substring(3, 5);
deviceCodeParam.setDataValue(String.valueOf(Integer.parseInt(time1)*60 + Integer.parseInt(time2)));
serialPortModel.setDataValue(String.valueOf(Integer.parseInt(time1)*60 + Integer.parseInt(time2)));
}
@@ -449,7 +449,7 @@ public class DeviceControlServiceImpl implements DeviceControlService {
if (Constant.WRITE.equals(type)) {
deviceCodeParam.setFunCode("06");
String time1 = split[1].substring(0, 2);
- String time2 = split[1].substring(2, 4);
+ String time2 = split[1].substring(3, 5);
deviceCodeParam.setDataValue(String.valueOf(Integer.parseInt(time1)*60 + Integer.parseInt(time2)));
serialPortModel.setDataValue(String.valueOf(Integer.parseInt(time1)*60 + Integer.parseInt(time2)));
}
@@ -457,11 +457,11 @@ public class DeviceControlServiceImpl implements DeviceControlService {
case "timeSetClose3":
// 关时间设置\读取3
registerStr = ExchangeStringUtil.addZeroForNum(ExchangeStringUtil.decToHex(String.valueOf(35 + (scene - 1) * 18)), 4);
- deviceCodeParam.setRegisterSize(2);
+ deviceCodeParam.setRegisterSize(1);
if (Constant.WRITE.equals(type)) {
deviceCodeParam.setFunCode("06");
String time1 = split[1].substring(0, 2);
- String time2 = split[1].substring(2, 4);
+ String time2 = split[1].substring(3, 5);
deviceCodeParam.setDataValue(String.valueOf(Integer.parseInt(time1)*60 + Integer.parseInt(time2)));
serialPortModel.setDataValue(String.valueOf(Integer.parseInt(time1)*60 + Integer.parseInt(time2)));
}
@@ -473,7 +473,7 @@ public class DeviceControlServiceImpl implements DeviceControlService {
if (Constant.WRITE.equals(type)) {
deviceCodeParam.setFunCode("06");
String time1 = split[1].substring(0, 2);
- String time2 = split[1].substring(2, 4);
+ String time2 = split[1].substring(3, 5);
deviceCodeParam.setDataValue(String.valueOf(Integer.parseInt(time1)*60 + Integer.parseInt(time2)));
serialPortModel.setDataValue(String.valueOf(Integer.parseInt(time1)*60 + Integer.parseInt(time2)));
}
@@ -481,11 +481,11 @@ public class DeviceControlServiceImpl implements DeviceControlService {
case "timeSetClose4":
// 关时间设置\读取4
registerStr = ExchangeStringUtil.addZeroForNum(ExchangeStringUtil.decToHex(String.valueOf(37 + (scene - 1) * 18)), 4);
- deviceCodeParam.setRegisterSize(2);
+ deviceCodeParam.setRegisterSize(1);
if (Constant.WRITE.equals(type)) {
deviceCodeParam.setFunCode("06");
String time1 = split[1].substring(0, 2);
- String time2 = split[1].substring(2, 4);
+ String time2 = split[1].substring(3, 5);
deviceCodeParam.setDataValue(String.valueOf(Integer.parseInt(time1)*60 + Integer.parseInt(time2)));
serialPortModel.setDataValue(String.valueOf(Integer.parseInt(time1)*60 + Integer.parseInt(time2)));
}
diff --git a/user-service/src/main/java/com/mh/user/service/impl/MqttSubscriptionServiceImpl.java b/user-service/src/main/java/com/mh/user/service/impl/MqttSubscriptionServiceImpl.java
new file mode 100644
index 0000000..3722dcd
--- /dev/null
+++ b/user-service/src/main/java/com/mh/user/service/impl/MqttSubscriptionServiceImpl.java
@@ -0,0 +1,61 @@
+package com.mh.user.service.impl;
+
+import com.mh.user.entity.MqttSubscriptionEntity;
+import com.mh.user.mapper.MqttSubscriptionMapper;
+import com.mh.user.service.MqttSubscriptionService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.List;
+
+/**
+ * @author LJF
+ * @version 1.0
+ * @project EEMCS
+ * @description mqtt订阅实现类
+ * @date 2025-02-14 13:59:27
+ */
+@Service
+public class MqttSubscriptionServiceImpl implements MqttSubscriptionService {
+
+ private final MqttSubscriptionMapper mqttSubscriptionMapper;
+
+ @Autowired
+ public MqttSubscriptionServiceImpl(MqttSubscriptionMapper mqttSubscriptionMapper) {
+ this.mqttSubscriptionMapper = mqttSubscriptionMapper;
+ }
+
+ @Override
+ public List selectMqttSubList(MqttSubscriptionEntity mqttSubscription) {
+ if (mqttSubscription == null) {
+ return null;
+ }
+ return mqttSubscriptionMapper.selectListByTopic(mqttSubscription.getTopic(), mqttSubscription.getStatus());
+ }
+
+ @Override
+ public MqttSubscriptionEntity selectMqttSubById(String msId) {
+ return mqttSubscriptionMapper.selectById(msId);
+ }
+
+ @Override
+ public int insertMqttSub(MqttSubscriptionEntity mqttSubscription) {
+ return mqttSubscriptionMapper.insert(mqttSubscription);
+ }
+
+ @Override
+ public int updateMqttSub(MqttSubscriptionEntity mqttSubscription) {
+ return mqttSubscriptionMapper.updateByPrimaryKey(mqttSubscription);
+ }
+
+ @Override
+ public int deleteMqttSubByIds(String[] msIds) {
+ if (msIds != null && msIds.length > 0) {
+ for (String msId : msIds) {
+ mqttSubscriptionMapper.deleteByPrimaryKey(msId);
+ }
+ return msIds.length;
+ }
+ return 0;
+ }
+}
diff --git a/user-service/src/main/java/com/mh/user/service/mqtt/config/MqttConfig.java b/user-service/src/main/java/com/mh/user/service/mqtt/config/MqttConfig.java
new file mode 100644
index 0000000..a307097
--- /dev/null
+++ b/user-service/src/main/java/com/mh/user/service/mqtt/config/MqttConfig.java
@@ -0,0 +1,99 @@
+package com.mh.user.service.mqtt.config;
+
+import com.mh.user.constants.MqttClientOptions;
+import com.mh.user.constants.MqttProtocolEnum;
+import com.mh.user.constants.MqttUseEnum;
+import lombok.Data;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
+import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
+import org.springframework.util.StringUtils;
+
+import java.util.Map;
+
+/**
+ * @author LJF
+ * @version 1.0
+ * @project springboot-mqtt-demo
+ * @description mqtt连接配置
+ * @date 2024-10-29 14:44:51
+ */
+@Configuration
+@Data
+@ConfigurationProperties
+public class MqttConfig {
+
+ private static Map mqttSpring;
+
+ public void setMqttSpring(Map mqtt) {
+ MqttConfig.mqttSpring = mqtt;
+ }
+
+ /**
+ * 获取mqtt基本配置
+ * @return
+ */
+ static MqttClientOptions getBasicMqttClientOptions() {
+ if (!mqttSpring.containsKey(MqttUseEnum.BASIC)) {
+ throw new Error("请先配置MQTT的基本连接参数,否则无法启动项目");
+ }
+ return mqttSpring.get(MqttUseEnum.BASIC);
+ }
+
+ /**
+ * 拼接获取对应mqtt的连接地址
+ * @param options
+ * @return
+ */
+ public static String getMqttAddress(MqttClientOptions options) {
+ StringBuilder addr = new StringBuilder();
+ addr.append(options.getProtocol().getProtocolAddr())
+ .append(options.getHost().trim())
+ .append(":")
+ .append(options.getPort());
+ if ((options.getProtocol() == MqttProtocolEnum.WS || options.getProtocol() == MqttProtocolEnum.WSS)
+ && StringUtils.hasText(options.getPath())) {
+ addr.append(options.getPath());
+ }
+ return addr.toString();
+ }
+
+ public static String getBasicMqttAddress() {
+ return getMqttAddress(getBasicMqttClientOptions());
+ }
+
+ /**
+ * 获取连接参数,注入到spring中
+ * @return
+ */
+ @Bean
+ public MqttConnectOptions mqttConnectionOptions() {
+
+ MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
+
+ MqttClientOptions customizeOptions = getBasicMqttClientOptions();
+ String basicMqttAddress = getBasicMqttAddress();
+ mqttConnectOptions.setServerURIs(new String[]{basicMqttAddress});
+ mqttConnectOptions.setUserName(StringUtils.hasText(customizeOptions.getUsername()) ?
+ customizeOptions.getUsername() : "");
+ mqttConnectOptions.setPassword(StringUtils.hasText(customizeOptions.getPassword()) ?
+ customizeOptions.getPassword().toCharArray() : new char[0]);
+ // 直接进行自动连接
+ mqttConnectOptions.setAutomaticReconnect(true);
+ // 时间间隔时间10s
+ mqttConnectOptions.setKeepAliveInterval(10);
+
+ return mqttConnectOptions;
+ }
+
+ @Bean
+ public MqttPahoClientFactory mqttClientFactory() {
+ DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
+ factory.setConnectionOptions(mqttConnectionOptions());
+ return factory;
+ }
+
+}
diff --git a/user-service/src/main/java/com/mh/user/service/mqtt/config/MqttInboundConfig.java b/user-service/src/main/java/com/mh/user/service/mqtt/config/MqttInboundConfig.java
new file mode 100644
index 0000000..509f764
--- /dev/null
+++ b/user-service/src/main/java/com/mh/user/service/mqtt/config/MqttInboundConfig.java
@@ -0,0 +1,91 @@
+package com.mh.user.service.mqtt.config;
+
+import com.mh.user.constants.ChannelName;
+import com.mh.user.constants.MqttClientOptions;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.integration.annotation.IntegrationComponentScan;
+import org.springframework.integration.annotation.ServiceActivator;
+import org.springframework.integration.endpoint.MessageProducerSupport;
+import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
+import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
+import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
+import org.springframework.integration.mqtt.support.MqttHeaders;
+import org.springframework.messaging.MessageChannel;
+import org.springframework.messaging.MessageHandler;
+
+import javax.annotation.Resource;
+
+/**
+ * @author LJF
+ * @version 1.0
+ * @project springboot-mqtt-demo
+ * @description 入站配置
+ * @date 2024-10-29 16:22:10
+ */
+@Slf4j
+@Configuration
+@IntegrationComponentScan
+public class MqttInboundConfig {
+
+ @Autowired
+ private MqttPahoClientFactory mqttClientFactory;
+
+ @Resource(name = ChannelName.INBOUND)
+ private MessageChannel inboundChannel;
+
+ private String clientId;
+
+ /**
+ * 入站适配器
+ * @return
+ */
+ @Bean(name = "adapter")
+ public MessageProducerSupport mqttInbound() {
+ MqttClientOptions options = MqttConfig.getBasicMqttClientOptions();
+ // 此处初始化的时候,默认订阅了配置文件中已经写好的topic
+ // 如果需要订阅多个,可以自己手动订阅,会写一个addTopic()进行添加订阅
+ clientId = options.getClientId() + "_consumer_" + System.currentTimeMillis();
+ MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
+ clientId,
+ mqttClientFactory,
+ options.getInboundTopic().split(","));
+ // System.out.println("每一次都会入站适配器吗?"+clientId);
+ DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();
+ // 统一是字节处理
+ converter.setPayloadAsBytes(true);
+ // 设置消息转换器
+ adapter.setConverter(converter);
+ // 设置qos(quality of service)
+ // 0:最多一次传输(消息会丢失),
+ // 1:至少一次传输(消息会重复),
+ // 2:只有当消息发送成功时才确认(消息不回丢,但延迟高)。
+ adapter.setQos(0);
+ // 设置在接收已经订阅的主题信息后,发送给哪个通道,具体的发送方法需要翻上层的抽象类
+ adapter.setOutputChannel(inboundChannel);
+ return adapter;
+ }
+
+ /**
+ * 默认声明一个消息处理器,用于处理无效的消息
+ * @return
+ */
+ @Bean
+ @ServiceActivator(inputChannel = ChannelName.DEFAULT_BOUND)
+ public MessageHandler handler() {
+ return message -> {
+ log.info("The default channel does not handle messages." +
+ "\nTopic: {}" +
+ "\nPayload: {}",
+ message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC),
+ message.getPayload());
+ };
+ }
+
+ public String getClientId() {
+ return clientId;
+ }
+
+}
diff --git a/user-service/src/main/java/com/mh/user/service/mqtt/config/MqttMessageChannel.java b/user-service/src/main/java/com/mh/user/service/mqtt/config/MqttMessageChannel.java
new file mode 100644
index 0000000..a3322e3
--- /dev/null
+++ b/user-service/src/main/java/com/mh/user/service/mqtt/config/MqttMessageChannel.java
@@ -0,0 +1,55 @@
+package com.mh.user.service.mqtt.config;
+
+import com.mh.user.constants.ChannelName;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.integration.channel.DirectChannel;
+import org.springframework.messaging.MessageChannel;
+
+/**
+ * @author LJF
+ * @version 1.0
+ * @project springboot-mqtt-demo
+ * @description 声明所有通道的定义类
+ * @date 2024-10-29 16:23:32
+ */
+@Slf4j
+@Configuration
+public class MqttMessageChannel {
+
+ @Bean(name = ChannelName.OUTBOUND)
+ public MessageChannel outboundChannel() {
+ return new DirectChannel();
+ }
+
+ @Bean(name = ChannelName.INBOUND)
+ public MessageChannel inboundChannel() {
+ return new DirectChannel();
+ }
+
+ /**
+ * 事件主动上报通道
+ * @return
+ */
+ @Bean(name = ChannelName.EVENTS_UPLOAD_INBOUND)
+ public MessageChannel eventsUploadInbound() {
+ return new DirectChannel();
+ }
+
+ @Bean(name = ChannelName.EVENTS_COLLECTION_INBOUND)
+ public MessageChannel eventsCollectionInbound() {
+ return new DirectChannel();
+ }
+
+ @Bean(name = ChannelName.EVENTS_CONTROL_INBOUND)
+ public MessageChannel eventsControlInbound() {
+ return new DirectChannel();
+ }
+
+ @Bean(name = ChannelName.EVENTS_SEND_INBOUND)
+ public MessageChannel eventsSendInbound() {
+ return new DirectChannel();
+ }
+
+}
diff --git a/user-service/src/main/java/com/mh/user/service/mqtt/config/MqttOutboundConfig.java b/user-service/src/main/java/com/mh/user/service/mqtt/config/MqttOutboundConfig.java
new file mode 100644
index 0000000..dc4f34d
--- /dev/null
+++ b/user-service/src/main/java/com/mh/user/service/mqtt/config/MqttOutboundConfig.java
@@ -0,0 +1,51 @@
+package com.mh.user.service.mqtt.config;
+
+import com.mh.user.constants.ChannelName;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.integration.annotation.IntegrationComponentScan;
+import org.springframework.integration.annotation.ServiceActivator;
+import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
+import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
+import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
+import org.springframework.messaging.MessageHandler;
+
+/**
+ * @author LJF
+ * @version 1.0
+ * @project springboot-mqtt-demo
+ * @description 入站配置
+ * @date 2024-10-29 16:22:10
+ */
+@Slf4j
+@Configuration
+@IntegrationComponentScan
+public class MqttOutboundConfig {
+
+ @Autowired
+ private MqttPahoClientFactory mqttClientFactory;
+
+ /**
+ * 默认声明一个出站处理器,用于处理无效的消息
+ * @return
+ */
+ @Bean
+ @ServiceActivator(inputChannel = ChannelName.OUTBOUND)
+ public MessageHandler mqttOutbound() {
+ MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
+ MqttConfig.getBasicMqttClientOptions().getClientId() + "_producer_" + System.currentTimeMillis(),
+ mqttClientFactory);
+ DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();
+ // use byte types uniformly
+ converter.setPayloadAsBytes(true);
+
+ messageHandler.setAsync(true);
+ messageHandler.setDefaultQos(0);
+ messageHandler.setConverter(converter);
+ return messageHandler;
+ }
+
+
+}
diff --git a/user-service/src/main/java/com/mh/user/service/mqtt/handler/InboundMessageRouter.java b/user-service/src/main/java/com/mh/user/service/mqtt/handler/InboundMessageRouter.java
new file mode 100644
index 0000000..d17bacb
--- /dev/null
+++ b/user-service/src/main/java/com/mh/user/service/mqtt/handler/InboundMessageRouter.java
@@ -0,0 +1,62 @@
+package com.mh.user.service.mqtt.handler;
+
+import com.mh.user.config.MHConfig;
+import com.mh.user.constants.ChannelName;
+import com.mh.user.constants.TopicEnum;
+import com.mh.user.utils.SpringContextUtils;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.integration.annotation.Router;
+import org.springframework.integration.mqtt.support.MqttHeaders;
+import org.springframework.integration.router.AbstractMessageRouter;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.MessageChannel;
+import org.springframework.messaging.MessageHeaders;
+import org.springframework.stereotype.Component;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Objects;
+
+/**
+ * @author LJF
+ * @version 1.0
+ * @project springboot-mqtt-demo
+ * @description 入站消息路由分发中心
+ * @date 2024-10-29 17:04:17
+ */
+@Slf4j
+@Component
+public class InboundMessageRouter extends AbstractMessageRouter {
+
+ /** 系统基础配置 */
+ @Autowired
+ private MHConfig mHConfig;
+
+ /**
+ * 目前只需要这个方式,后期在拓展使用IntegrationFlow方式
+ * @param message
+ * @return
+ */
+ @Router(inputChannel = ChannelName.INBOUND)
+ @Override
+ protected Collection determineTargetChannels(Message> message) {
+ MessageHeaders headers = message.getHeaders();
+ String topic = Objects.requireNonNull(headers.get(MqttHeaders.RECEIVED_TOPIC)).toString();
+// byte[] payload = (byte[]) message.getPayload();
+// log.info("从当前主题 topic: {}, 接收到的消息:{}", topic, new String(payload));
+ // 判断当前主题是否是当前项目的,温湿度目前写死的
+// if (!topic.startsWith(mHConfig.getName()) && !topic.contains("/temp")) {
+// log.info("当前主题 topic: {} 不是当前项目的,直接丢弃", topic);
+// return Collections.singleton((MessageChannel) SpringContextUtils.getBean(ChannelName.DEFAULT_BOUND));
+// }
+ // 找到对应的主题消息通道
+ if (topic.contains("/temp")) {
+ return Collections.singleton((MessageChannel) SpringContextUtils.getBean(ChannelName.EVENTS_UPLOAD_INBOUND));
+ } else {
+ TopicEnum topicEnum = TopicEnum.find(mHConfig.getName() + "/", topic);
+ MessageChannel bean = (MessageChannel) SpringContextUtils.getBean(topicEnum.getBeanName());
+ return Collections.singleton(bean);
+ }
+ }
+}
diff --git a/user-service/src/main/java/com/mh/user/service/mqtt/service/IEventsService.java b/user-service/src/main/java/com/mh/user/service/mqtt/service/IEventsService.java
new file mode 100644
index 0000000..5b89468
--- /dev/null
+++ b/user-service/src/main/java/com/mh/user/service/mqtt/service/IEventsService.java
@@ -0,0 +1,49 @@
+package com.mh.user.service.mqtt.service;
+
+import org.springframework.messaging.MessageHeaders;
+
+import java.io.IOException;
+
+/**
+ * @author LJF
+ * @version 1.0
+ * @project springboot-mqtt-demo
+ * @description 通道处理类
+ * @date 2024-11-05 11:30:26
+ */
+public interface IEventsService {
+
+ /**
+ * 处理主动上报
+ * @param receiver
+ * @param headers
+ * @return
+ */
+ void handleInboundUpload(byte[] receiver, MessageHeaders headers) throws IOException;
+
+
+ /**
+ * 处理服务器主动采集上报
+ * @param receiver
+ * @param headers
+ * @return
+ */
+ void handleInboundCollection(byte[] receiver, MessageHeaders headers) throws IOException;
+
+ /**
+ * 处理控制上报
+ * @param receiver
+ * @param headers
+ * @return
+ */
+ void handleInboundControl(byte[] receiver, MessageHeaders headers) throws IOException;
+
+ /**
+ * 处理发送
+ * @param receiver
+ * @param headers
+ * @throws IOException
+ */
+ void handleInboundSend(byte[] receiver, MessageHeaders headers) throws IOException;
+
+}
diff --git a/user-service/src/main/java/com/mh/user/service/mqtt/service/IMqttGatewayService.java b/user-service/src/main/java/com/mh/user/service/mqtt/service/IMqttGatewayService.java
new file mode 100644
index 0000000..f13f5f4
--- /dev/null
+++ b/user-service/src/main/java/com/mh/user/service/mqtt/service/IMqttGatewayService.java
@@ -0,0 +1,36 @@
+package com.mh.user.service.mqtt.service;
+
+/**
+ * @author LJF
+ * @version 1.0
+ * @project springboot-mqtt
+ * @description 消息网关
+ * @date 2024-10-30 14:43:55
+ */
+//@Component
+//@Configuration
+//@MessagingGateway(defaultRequestChannel = ChannelName.OUTBOUND)
+public interface IMqttGatewayService {
+
+ /**
+ * 发送消息
+ * @param topic
+ * @param payload
+ */
+ void publish(String topic, String payload, int qos);
+
+ /**
+ * 发送消息
+ * @param topic
+ * @param payload
+ */
+ void publish(String topic, byte[] payload);
+
+ /**
+ * 发送消息并带上qos
+ * @param topic
+ * @param payload
+ * @param qos
+ */
+ void publish(String topic, byte[] payload, int qos);
+}
diff --git a/user-service/src/main/java/com/mh/user/service/mqtt/service/IMqttMsgSenderService.java b/user-service/src/main/java/com/mh/user/service/mqtt/service/IMqttMsgSenderService.java
new file mode 100644
index 0000000..38d0998
--- /dev/null
+++ b/user-service/src/main/java/com/mh/user/service/mqtt/service/IMqttMsgSenderService.java
@@ -0,0 +1,96 @@
+package com.mh.user.service.mqtt.service;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.mh.user.constants.CommonTopicResponse;
+import com.mh.user.constants.ServiceReply;
+
+/**
+ * @author LJF
+ * @version 1.0
+ * @project springboot-mqtt-demo
+ * @description Mqtt发送消息
+ * @date 2024-10-29 17:31:40
+ */
+public interface IMqttMsgSenderService {
+
+ void publish(String topic, String pushMessage);
+
+ /**
+ * 发布消息
+ * @param topic target
+ * @param response message
+ */
+ void publish(String topic, CommonTopicResponse response);
+
+ /**
+ * 使用qos发布消息
+ *
+ * @param topic target
+ * @param qos qos
+ * @param response message
+ */
+ void publish(String topic, int qos, CommonTopicResponse response);
+
+ /**
+ * 发送消息并同时接收响应
+ * @param clazz
+ * @param topic
+ * @param response 通知启动是否成功
+ * @return
+ */
+ T publishWithReply(Class clazz, String topic, CommonTopicResponse response);
+
+ /**
+ * 发送消息并同时接收响应
+ * @param clazz
+ * @param topic
+ * @param response
+ * @param retryTime
+ * @param
+ * @return
+ */
+ T publishWithReply(Class clazz, String topic, CommonTopicResponse response, int retryTime);
+
+ /**
+ * 专门用于发送服务消息
+ * @param clazz The generic class for ServiceReply.
+ * @param sn
+ * @param method
+ * @param data
+ * @param bid
+ * @param
+ * @return
+ */
+ ServiceReply publishServicesTopic(TypeReference clazz, String sn, String method, Object data, String bid);
+
+ /**
+ * 仅用于为服务发送消息,不设置接收到的子类型
+ * @param sn
+ * @param method
+ * @param data
+ * @param bid
+ * @return
+ */
+ ServiceReply publishServicesTopic(String sn, String method, Object data, String bid);
+
+ /**
+ * 专门用于发送服务消息
+ * @param clazz The generic class for ServiceReply.
+ * @param sn
+ * @param method
+ * @param data
+ * @param
+ * @return
+ */
+ ServiceReply publishServicesTopic(TypeReference clazz, String sn, String method, Object data);
+
+ /**
+ * 仅用于为服务发送消息,不设置接收到的子类型
+ * @param sn
+ * @param method
+ * @param data
+ * @return
+ */
+ ServiceReply publishServicesTopic(String sn, String method, Object data);
+
+}
diff --git a/user-service/src/main/java/com/mh/user/service/mqtt/service/IMqttTopicService.java b/user-service/src/main/java/com/mh/user/service/mqtt/service/IMqttTopicService.java
new file mode 100644
index 0000000..b97b182
--- /dev/null
+++ b/user-service/src/main/java/com/mh/user/service/mqtt/service/IMqttTopicService.java
@@ -0,0 +1,40 @@
+package com.mh.user.service.mqtt.service;
+
+import org.springframework.integration.mqtt.support.MqttHeaders;
+import org.springframework.messaging.handler.annotation.Header;
+
+/**
+ * @author LJF
+ * @version 1.0
+ * @project springboot-mqtt-demo
+ * @description mqtt订阅主题
+ * @date 2024-10-29 17:29:52
+ */
+public interface IMqttTopicService {
+
+ /**
+ * 订阅主题
+ * @param topic
+ */
+ void subscribe(@Header(MqttHeaders.TOPIC) String topic);
+
+ /**
+ * 订阅主题并设置qos
+ * @param topic
+ * @param qos
+ */
+ void subscribe(@Header(MqttHeaders.TOPIC) String topic, int qos);
+
+ /**
+ * 解绑主题
+ * @param topic
+ */
+ void unsubscribe(@Header(MqttHeaders.TOPIC) String topic);
+
+ /**
+ * 获取已订阅的主题
+ * @return
+ */
+ String[] getSubscribedTopics();
+
+}
diff --git a/user-service/src/main/java/com/mh/user/service/mqtt/service/impl/EventsServiceImpl.java b/user-service/src/main/java/com/mh/user/service/mqtt/service/impl/EventsServiceImpl.java
new file mode 100644
index 0000000..29f04ca
--- /dev/null
+++ b/user-service/src/main/java/com/mh/user/service/mqtt/service/impl/EventsServiceImpl.java
@@ -0,0 +1,75 @@
+package com.mh.user.service.mqtt.service.impl;
+
+import com.alibaba.fastjson2.JSONObject;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.mh.user.constants.ChannelName;
+import com.mh.user.model.SanShiFengReceiver;
+import com.mh.user.service.mqtt.service.IEventsService;
+import io.netty.util.CharsetUtil;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.integration.annotation.ServiceActivator;
+import org.springframework.messaging.MessageHeaders;
+import org.springframework.stereotype.Service;
+
+import java.io.IOException;
+import java.util.Objects;
+
+/**
+ * @author LJF
+ * @version 1.0
+ * @project springboot-mqtt-demo
+ * @description 通道消息处理
+ * @date 2024-11-05 11:42:30
+ */
+@Slf4j
+@Service
+public class EventsServiceImpl implements IEventsService {
+
+ @Autowired
+ private ObjectMapper mapper;
+
+ @ServiceActivator(inputChannel = ChannelName.EVENTS_UPLOAD_INBOUND)
+ @Override
+ public void handleInboundUpload(byte[] receiver, MessageHeaders headers) {
+ // 获取当前的主题
+ String topic = Objects.requireNonNull(headers.get("mqtt_receivedTopic")).toString();
+ handleInboundData(receiver, topic, "主动上报数据");
+ }
+
+ @ServiceActivator(inputChannel = ChannelName.EVENTS_COLLECTION_INBOUND)
+ @Override
+ public void handleInboundCollection(byte[] receiver, MessageHeaders headers) {
+ // 获取当前的主题
+ String topic = Objects.requireNonNull(headers.get("mqtt_receivedTopic")).toString();
+ handleInboundData(receiver, topic, "主动下发采集数据");
+ }
+
+ @ServiceActivator(inputChannel = ChannelName.EVENTS_CONTROL_INBOUND)
+ @Override
+ public void handleInboundControl(byte[] receiver, MessageHeaders headers) {
+ // 获取当前的主题
+ String topic = Objects.requireNonNull(headers.get("mqtt_receivedTopic")).toString();
+ handleInboundData(receiver, topic, "控制指令下发");
+ }
+
+ @ServiceActivator(inputChannel = ChannelName.EVENTS_SEND_INBOUND)
+ @Override
+ public void handleInboundSend(byte[] receiver, MessageHeaders headers) throws IOException {
+ String sendStr = new String(receiver, CharsetUtil.UTF_8);
+ log.info("接收到控制指令下发=>{}", sendStr);
+ }
+
+ private void handleInboundData(byte[] receiver,String topic, String logMessage) {
+ try {
+ SanShiFengReceiver commonTopicReceiver = new SanShiFengReceiver();
+ commonTopicReceiver = mapper.readValue(receiver, SanShiFengReceiver.class);
+ log.info("主题:{},类型:{}: ,数据:{}", topic, logMessage, commonTopicReceiver.toString());
+ // 不使用消息队列,查询属于哪种设备类型,然后通过策略进行数据解析
+ } catch (IOException e) {
+ log.error("处理数据时发生错误: ", e);
+ }
+ }
+
+
+}
diff --git a/user-service/src/main/java/com/mh/user/service/mqtt/service/impl/MqttGatewayServiceImpl.java b/user-service/src/main/java/com/mh/user/service/mqtt/service/impl/MqttGatewayServiceImpl.java
new file mode 100644
index 0000000..73e935c
--- /dev/null
+++ b/user-service/src/main/java/com/mh/user/service/mqtt/service/impl/MqttGatewayServiceImpl.java
@@ -0,0 +1,54 @@
+package com.mh.user.service.mqtt.service.impl;
+
+import com.mh.user.service.mqtt.service.IMqttGatewayService;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.integration.mqtt.support.MqttHeaders;
+import org.springframework.messaging.MessageChannel;
+import org.springframework.messaging.support.MessageBuilder;
+import org.springframework.stereotype.Service;
+
+/**
+ * @author LJF
+ * @version 1.0
+ * @project EEMCS
+ * @description 网关实现类
+ * @date 2025-02-07 08:44:55
+ */
+@Service
+public class MqttGatewayServiceImpl implements IMqttGatewayService {
+
+ private final MessageChannel outboundChannel;
+
+ public MqttGatewayServiceImpl(@Qualifier("outbound") MessageChannel outboundChannel) {
+ this.outboundChannel = outboundChannel;
+ }
+
+ @Override
+ public synchronized void publish(String topic, String payload, int qos) {
+ outboundChannel.send(
+ MessageBuilder
+ .withPayload(payload)
+ .setHeader(MqttHeaders.TOPIC, topic)
+ .setHeader(MqttHeaders.QOS, qos)
+ .build());
+ }
+
+ @Override
+ public void publish(String topic, byte[] payload) {
+ outboundChannel.send(
+ MessageBuilder
+ .withPayload(payload)
+ .setHeader(MqttHeaders.TOPIC, topic)
+ .build());
+ }
+
+ @Override
+ public void publish(String topic, byte[] payload, int qos) {
+ outboundChannel.send(
+ MessageBuilder
+ .withPayload(payload)
+ .setHeader(MqttHeaders.TOPIC, topic)
+ .setHeader(MqttHeaders.QOS, qos)
+ .build());
+ }
+}
diff --git a/user-service/src/main/java/com/mh/user/service/mqtt/service/impl/MqttMsgSenderServiceImpl.java b/user-service/src/main/java/com/mh/user/service/mqtt/service/impl/MqttMsgSenderServiceImpl.java
new file mode 100644
index 0000000..01e1a49
--- /dev/null
+++ b/user-service/src/main/java/com/mh/user/service/mqtt/service/impl/MqttMsgSenderServiceImpl.java
@@ -0,0 +1,142 @@
+package com.mh.user.service.mqtt.service.impl;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.mh.user.constants.CommonTopicResponse;
+import com.mh.user.constants.ServiceReply;
+import com.mh.user.constants.TopicConst;
+import com.mh.user.service.mqtt.service.IMqttGatewayService;
+import com.mh.user.service.mqtt.service.IMqttMsgSenderService;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+import org.springframework.util.StringUtils;
+
+import java.util.Objects;
+import java.util.UUID;
+
+/**
+ * @author LJF
+ * @version 1.0
+ * @project springboot-mqtt-demo
+ * @description mqtt不同类型发送消息
+ * @date 2024-10-30 14:30:03
+ */
+@Slf4j
+@Service
+public class MqttMsgSenderServiceImpl implements IMqttMsgSenderService {
+
+ @Autowired
+ private IMqttGatewayService mqttGatewayService;
+
+ @Autowired
+ private ObjectMapper mapper;
+
+
+ /**
+ * 发布,默认qos为0,非持久化
+ *
+ * @param pushMessage
+ * @param topic
+ */
+ @Override
+ public void publish(String topic, String pushMessage) {
+ synchronized (this) {//注意:这里一定要同步,否则,在多线程publish的情况下,线程会发生死锁,分析见文章最后补充
+ try {
+ mqttGatewayService.publish(topic, pushMessage, 0);
+ log.info("发送主题:{},消息:{}", topic, pushMessage);
+ } catch (Exception e) {
+ log.error("发送主题异常:{},消息:{}", topic, pushMessage, e);
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ @Override
+ public void publish(String topic, CommonTopicResponse response) {
+ this.publish(topic, 1, response);
+ }
+
+ @Override
+ public void publish(String topic, int qos, CommonTopicResponse response) {
+ try {
+ log.info("发送主题:{},消息:{}", topic, response.toString());
+ mqttGatewayService.publish(topic, mapper.writeValueAsBytes(response), qos);
+ } catch (JsonProcessingException e) {
+ log.error("发送主题:{},消息:{}", topic, response.toString(), e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public T publishWithReply(Class clazz, String topic, CommonTopicResponse response) {
+ return this.publishWithReply(clazz, topic, response, 2);
+ }
+
+ @Override
+ public T publishWithReply(Class clazz, String topic, CommonTopicResponse response, int retryTime) {
+// AtomicInteger time = new AtomicInteger(0);
+// // Retry three times
+// while (time.getAndIncrement() <= retryTime) {
+// this.publish(topic, response);
+//
+// Chan> chan = Chan.getInstance();
+// // If the message is not received in 0.5 seconds then resend it again.
+// CommonTopicReceiver receiver = chan.get(response.getTid());
+//
+// // Need to match tid and bid.
+// if (Objects.nonNull(receiver) && receiver.getTid().equals(response.getTid()) &&
+// receiver.getBid().equals(response.getBid())) {
+// if (clazz.isAssignableFrom(receiver.getData().getClass())) {
+// return receiver.getData();
+// }
+// throw new TypeMismatchException(receiver.getData(), clazz);
+// }
+// // It must be guaranteed that the tid and bid of each message are different.
+// response.setBid(UUID.randomUUID().toString());
+// response.setTid(UUID.randomUUID().toString());
+// }
+// throw new RuntimeException("没有消息接收到");
+ return null;
+ }
+
+ @Override
+ public ServiceReply publishServicesTopic(TypeReference clazz, String sn, String method, Object data, String bid) {
+ String topic = TopicConst.THING_MODEL_PRE + TopicConst.PRODUCT + sn + TopicConst.SERVICES_SUF;
+ ServiceReply reply = this.publishWithReply(ServiceReply.class, topic,
+ CommonTopicResponse.builder()
+ .tid(UUID.randomUUID().toString())
+ .bid(StringUtils.hasText(bid) ? bid : UUID.randomUUID().toString())
+ .timestamp(System.currentTimeMillis())
+ .method(method)
+ .data(Objects.requireNonNull(data, ""))
+ .build());
+ if (Objects.isNull(clazz)) {
+ return reply;
+ }
+ // put together in "output"
+ if (Objects.nonNull(reply.getInfo())) {
+ reply.setOutput(mapper.convertValue(reply.getInfo(), clazz));
+ }
+ if (Objects.nonNull(reply.getOutput())) {
+ reply.setOutput(mapper.convertValue(reply.getOutput(), clazz));
+ }
+ return reply;
+ }
+
+ @Override
+ public ServiceReply publishServicesTopic(String sn, String method, Object data, String bid) {
+ return this.publishServicesTopic(null, sn, method, data, bid);
+ }
+
+ @Override
+ public ServiceReply publishServicesTopic(TypeReference clazz, String sn, String method, Object data) {
+ return this.publishServicesTopic(clazz, sn, method, data, null);
+ }
+
+ @Override
+ public ServiceReply publishServicesTopic(String sn, String method, Object data) {
+ return this.publishServicesTopic(null, sn, method, data, null);
+ }
+}
diff --git a/user-service/src/main/java/com/mh/user/service/mqtt/service/impl/MqttTopicServiceImpl.java b/user-service/src/main/java/com/mh/user/service/mqtt/service/impl/MqttTopicServiceImpl.java
new file mode 100644
index 0000000..5eba529
--- /dev/null
+++ b/user-service/src/main/java/com/mh/user/service/mqtt/service/impl/MqttTopicServiceImpl.java
@@ -0,0 +1,40 @@
+package com.mh.user.service.mqtt.service.impl;
+
+import com.mh.framework.mqtt.service.IMqttTopicService;
+import jakarta.annotation.Resource;
+import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
+import org.springframework.stereotype.Service;
+
+/**
+ * @author LJF
+ * @version 1.0
+ * @project springboot-mqtt-demo
+ * @description 订阅主题实现类
+ * @date 2024-10-29 17:36:31
+ */
+@Service
+public class MqttTopicServiceImpl implements IMqttTopicService {
+
+ @Resource
+ private MqttPahoMessageDrivenChannelAdapter adapter;
+
+ @Override
+ public void subscribe(String topic) {
+ adapter.addTopic(topic);
+ }
+
+ @Override
+ public void subscribe(String topic, int qos) {
+ adapter.addTopic(topic, qos);
+ }
+
+ @Override
+ public void unsubscribe(String topic) {
+ adapter.removeTopic(topic);
+ }
+
+ @Override
+ public String[] getSubscribedTopics() {
+ return adapter.getTopic();
+ }
+}
diff --git a/user-service/src/main/java/com/mh/user/strategy/BackTempControlStrategy.java b/user-service/src/main/java/com/mh/user/strategy/BackTempControlStrategy.java
index ad81db6..94df221 100644
--- a/user-service/src/main/java/com/mh/user/strategy/BackTempControlStrategy.java
+++ b/user-service/src/main/java/com/mh/user/strategy/BackTempControlStrategy.java
@@ -2,6 +2,7 @@ package com.mh.user.strategy;
import com.mh.user.constants.Constant;
import com.mh.user.entity.DeviceCodeParamEntity;
+import com.mh.user.entity.DeviceInstallEntity;
import com.mh.user.entity.NowPublicDataEntity;
import com.mh.user.service.BuildingService;
import com.mh.user.service.NowDataService;
@@ -100,4 +101,36 @@ public class BackTempControlStrategy implements DeviceStrategy {
}
return result;
}
+
+ @Override
+ public String analysisMQTTReceiveData(String dateStr,
+ String registerAddr,
+ String dataStr,
+ String operateType,
+ DeviceInstallEntity deviceInstallEntity) {
+ String result = Constant.FAIL;
+ if (Integer.parseInt(dataStr) < 0) {
+ log.info("回水温控报文检验失败: " + dataStr);
+ return result;
+ }
+ String addr = deviceInstallEntity.getDeviceAddr();//地址
+ String data = "";
+ if (operateType.equalsIgnoreCase(Constant.READ)) {// 读
+ Double fdata = Double.parseDouble(dataStr);
+ nowDataService.saveNowHistoryData2(addr, "回水温控", String.valueOf(fdata), "waterTemp", deviceInstallEntity.getBuildingId());
+ nowDataService.proWaterTemp(dateStr, deviceInstallEntity.getBuildingId(), "");//保存时间点温度
+ String avgTemp = nowDataService.selectAve(deviceInstallEntity.getBuildingId());
+ NowPublicDataEntity publicData = new NowPublicDataEntity();
+ publicData.setBuildingId(deviceInstallEntity.getBuildingId());
+ publicData.setUseWaterTemp(avgTemp);
+ publicData.setBackWaterTemp(avgTemp);
+ publicData.setSingleTemp(String.valueOf(fdata));//单箱温度
+ nowPublicDataService.saveNowHistoryPublicData(publicData);
+ log.info("回水温控号:" + addr + ",温度值:" + fdata + ",保存数据库成功!楼栋名称:" + deviceInstallEntity.getBuildingName());
+ return String.valueOf(fdata);
+ } else if (operateType.equalsIgnoreCase(Constant.WRITE)) {// 写
+ result = Constant.SUCCESS;
+ }
+ return result;
+ }
}
diff --git a/user-service/src/main/java/com/mh/user/strategy/DeviceStrategy.java b/user-service/src/main/java/com/mh/user/strategy/DeviceStrategy.java
index b316ba0..1a848c2 100644
--- a/user-service/src/main/java/com/mh/user/strategy/DeviceStrategy.java
+++ b/user-service/src/main/java/com/mh/user/strategy/DeviceStrategy.java
@@ -1,6 +1,7 @@
package com.mh.user.strategy;
import com.mh.user.entity.DeviceCodeParamEntity;
+import com.mh.user.entity.DeviceInstallEntity;
/**
* @author LJF
@@ -17,4 +18,19 @@ public interface DeviceStrategy {
String createOrders(DeviceCodeParamEntity deviceCodeParamEntity);
String analysisReceiveData(String dateStr, String deviceType, String registerAddr, String brand, String buildingId, String buildingName, String dataStr, DeviceCodeParamEntity deviceCodeParamEntity);
+
+ /**
+ * 解析MQTT报文
+ * @param dateStr
+ * @param registerAddr
+ * @param dataStr 已经是解析好的数据
+ * @param operateType 操作类型,读取/设置
+ * @param deviceInstallEntity
+ * @return
+ */
+ String analysisMQTTReceiveData(String dateStr,
+ String registerAddr,
+ String dataStr,
+ String operateType,
+ DeviceInstallEntity deviceInstallEntity);
}
diff --git a/user-service/src/main/java/com/mh/user/strategy/EleMeterStrategy.java b/user-service/src/main/java/com/mh/user/strategy/EleMeterStrategy.java
index 4ee8f26..d3feb43 100644
--- a/user-service/src/main/java/com/mh/user/strategy/EleMeterStrategy.java
+++ b/user-service/src/main/java/com/mh/user/strategy/EleMeterStrategy.java
@@ -1,8 +1,10 @@
package com.mh.user.strategy;
import com.mh.common.utils.StringUtils;
+import com.mh.user.constants.Constant;
import com.mh.user.entity.DataResultEntity;
import com.mh.user.entity.DeviceCodeParamEntity;
+import com.mh.user.entity.DeviceInstallEntity;
import com.mh.user.service.DataResultService;
import com.mh.user.utils.ExchangeStringUtil;
import com.mh.user.utils.SpringBeanUtil;
@@ -156,4 +158,34 @@ public class EleMeterStrategy implements DeviceStrategy {
}
return data;
}
+
+ @Override
+ public String analysisMQTTReceiveData(String dateStr, String registerAddr, String dataStr, String operateType, DeviceInstallEntity deviceInstallEntity) {
+ String data = Constant.FAIL;
+ if (Integer.parseInt(dataStr) < 0) {
+ return data;
+ }
+ log.info("电表表号:{},电表读数:{}", deviceInstallEntity.getDeviceAddr(), dataStr);
+ // 考虑dataStr是否走大数或者走小数
+ if (Double.parseDouble(dataStr)-deviceInstallEntity.getLastValue()>1000 || Double.parseDouble(dataStr)-deviceInstallEntity.getLastValue()<0) {
+ dataStr = String.valueOf(deviceInstallEntity.getLastValue());
+ }
+ try {
+ DataResultEntity dataResultEntity = new DataResultEntity();
+ dataResultEntity.setDeviceAddr(deviceInstallEntity.getDeviceAddr());//通讯编号
+ dataResultEntity.setDeviceType("电表");
+ dataResultEntity.setBuildingId(deviceInstallEntity.getBuildingId());
+ dataResultEntity.setCurValue(Double.parseDouble(dataStr)); //当前读数
+ Date date = new Date();
+ dataResultEntity.setCurDate(date); //当前日期
+ dataResultService.saveDataResult(dataResultEntity);
+ log.info("电表数据保存数据库成功! 楼栋名称:{}", deviceInstallEntity.getBuildingName());
+ } catch (Exception e) {
+ log.error("电表数据保存数据库失败!楼栋名称:{}", deviceInstallEntity.getBuildingName(), e);
+ }
+ if (!StringUtils.isBlank(dataStr)) {
+ data = String.valueOf(Double.valueOf(dataStr)); //00010.76,去除读数前面带0的情况
+ }
+ return data;
+ }
}
diff --git a/user-service/src/main/java/com/mh/user/strategy/HeatPumpStatusStrategy.java b/user-service/src/main/java/com/mh/user/strategy/HeatPumpStatusStrategy.java
index fc776c7..e3a35d8 100644
--- a/user-service/src/main/java/com/mh/user/strategy/HeatPumpStatusStrategy.java
+++ b/user-service/src/main/java/com/mh/user/strategy/HeatPumpStatusStrategy.java
@@ -4,6 +4,7 @@ import com.alibaba.fastjson2.JSON;
import com.mh.common.utils.StringUtils;
import com.mh.user.constants.Constant;
import com.mh.user.entity.DeviceCodeParamEntity;
+import com.mh.user.entity.DeviceInstallEntity;
import com.mh.user.service.BuildingService;
import com.mh.user.service.NowDataService;
import com.mh.user.service.NowPublicDataService;
@@ -238,4 +239,9 @@ public class HeatPumpStatusStrategy implements DeviceStrategy {
}
return result;
}
+
+ @Override
+ public String analysisMQTTReceiveData(String dateStr, String registerAddr, String dataStr, String operateType, DeviceInstallEntity deviceInstallEntity) {
+ return "";
+ }
}
diff --git a/user-service/src/main/java/com/mh/user/strategy/HeatPumpStrategy.java b/user-service/src/main/java/com/mh/user/strategy/HeatPumpStrategy.java
index 718e87d..e5afbc3 100644
--- a/user-service/src/main/java/com/mh/user/strategy/HeatPumpStrategy.java
+++ b/user-service/src/main/java/com/mh/user/strategy/HeatPumpStrategy.java
@@ -3,6 +3,7 @@ package com.mh.user.strategy;
import com.mh.common.utils.StringUtils;
import com.mh.user.constants.Constant;
import com.mh.user.entity.DeviceCodeParamEntity;
+import com.mh.user.entity.DeviceInstallEntity;
import com.mh.user.entity.NowPublicDataEntity;
import com.mh.user.service.*;
import com.mh.user.utils.ExchangeStringUtil;
@@ -670,4 +671,9 @@ public class HeatPumpStrategy implements DeviceStrategy {
stringBuffer.append(dataType);
return sValue;
}
+
+ @Override
+ public String analysisMQTTReceiveData(String dateStr, String registerAddr, String dataStr, String operateType, DeviceInstallEntity deviceInstallEntity) {
+ return "";
+ }
}
diff --git a/user-service/src/main/java/com/mh/user/strategy/MultiControlStrategy.java b/user-service/src/main/java/com/mh/user/strategy/MultiControlStrategy.java
index 8208c3b..b7f90b8 100644
--- a/user-service/src/main/java/com/mh/user/strategy/MultiControlStrategy.java
+++ b/user-service/src/main/java/com/mh/user/strategy/MultiControlStrategy.java
@@ -390,4 +390,9 @@ public class MultiControlStrategy implements DeviceStrategy {
}
return l1;
}
+
+ @Override
+ public String analysisMQTTReceiveData(String dateStr, String registerAddr, String dataStr, String operateType, DeviceInstallEntity deviceInstallEntity) {
+ return "";
+ }
}
diff --git a/user-service/src/main/java/com/mh/user/strategy/PressureTransStrategy.java b/user-service/src/main/java/com/mh/user/strategy/PressureTransStrategy.java
index fdffa4b..0a9a1d4 100644
--- a/user-service/src/main/java/com/mh/user/strategy/PressureTransStrategy.java
+++ b/user-service/src/main/java/com/mh/user/strategy/PressureTransStrategy.java
@@ -38,7 +38,7 @@ public class PressureTransStrategy implements DeviceStrategy {
}
public static PressureTransStrategy getInstance() {
- return PressureTransStrategy.SingletonHolder.INSTANCE;
+ return SingletonHolder.INSTANCE;
}
@Override
@@ -128,4 +128,42 @@ public class PressureTransStrategy implements DeviceStrategy {
}
return result;
}
+
+ @Override
+ public String analysisMQTTReceiveData(String dateStr, String registerAddr, String dataStr, String operateType, DeviceInstallEntity deviceInstallEntity) {
+ String result = "fail";
+ if (Integer.parseInt(dataStr) < 0) {
+ return result;
+ }
+ if (operateType.equalsIgnoreCase(Constant.READ)) {// 读
+ Double tankHeight = 0.0;
+ // 查询当前压变有低区
+ if (null != deviceInstallEntity
+ && !StringUtils.isBlank(deviceInstallEntity.getDeviceName())) {
+ if (deviceInstallEntity.getDeviceName().contains("低")) {
+ tankHeight = buildingService.queryLowTankHeight(deviceInstallEntity.getBuildingId());//水箱高,从数据库获取
+ if (tankHeight == null) {
+ tankHeight = 2.0;
+ }
+ }
+ }
+ Double wtLevel = Double.parseDouble(dataStr)/tankHeight * 100; //水箱水位
+ log.info("------水箱水高:" + wtLevel + "------");
+ if (wtLevel <= 0) {
+ wtLevel = 0.0;
+ } else if (wtLevel >= 100) {
+ wtLevel = 100.0;
+ }
+ DecimalFormat df = new DecimalFormat("0.0");
+ String strWtLevel = df.format(wtLevel);
+ // 更新device_install数据
+ assert deviceInstallEntity != null;
+ deviceInstallService.updateLastValueByOther(deviceInstallEntity.getDeviceAddr(), strWtLevel, "压变", deviceInstallEntity.getBuildingId());
+ nowDataService.saveNowHistoryData2(deviceInstallEntity.getDeviceAddr(), "压变", strWtLevel, "waterLevel", deviceInstallEntity.getBuildingId());
+ nowDataService.proWaterLevel(dateStr, deviceInstallEntity.getBuildingId(), deviceInstallEntity.getDeviceAddr()); //楼栋水位
+ log.info("时间:"+ dateStr +"压变号:" + deviceInstallEntity.getDeviceAddr() + ",保存数据库成功!楼栋名称:" + deviceInstallEntity.getBuildingName());
+ result = strWtLevel;
+ }
+ return result;
+ }
}
diff --git a/user-service/src/main/java/com/mh/user/strategy/StatusCheckStrategy.java b/user-service/src/main/java/com/mh/user/strategy/StatusCheckStrategy.java
index 7ed1e00..7632bfd 100644
--- a/user-service/src/main/java/com/mh/user/strategy/StatusCheckStrategy.java
+++ b/user-service/src/main/java/com/mh/user/strategy/StatusCheckStrategy.java
@@ -2,6 +2,7 @@ package com.mh.user.strategy;
import com.mh.user.constants.Constant;
import com.mh.user.entity.DeviceCodeParamEntity;
+import com.mh.user.entity.DeviceInstallEntity;
import com.mh.user.entity.NowPublicDataEntity;
import com.mh.user.service.*;
import com.mh.user.utils.ExchangeStringUtil;
@@ -124,4 +125,9 @@ public class StatusCheckStrategy implements DeviceStrategy {
}
return Constant.FAIL;
}
+
+ @Override
+ public String analysisMQTTReceiveData(String dateStr, String registerAddr, String dataStr, String operateType, DeviceInstallEntity deviceInstallEntity) {
+ return "";
+ }
}
diff --git a/user-service/src/main/java/com/mh/user/strategy/TempControlStrategy.java b/user-service/src/main/java/com/mh/user/strategy/TempControlStrategy.java
index a8dbeed..5572c2f 100644
--- a/user-service/src/main/java/com/mh/user/strategy/TempControlStrategy.java
+++ b/user-service/src/main/java/com/mh/user/strategy/TempControlStrategy.java
@@ -2,6 +2,7 @@ package com.mh.user.strategy;
import com.mh.user.constants.Constant;
import com.mh.user.entity.DeviceCodeParamEntity;
+import com.mh.user.entity.DeviceInstallEntity;
import com.mh.user.entity.NowPublicDataEntity;
import com.mh.user.service.BuildingService;
import com.mh.user.service.NowDataService;
@@ -113,4 +114,30 @@ public class TempControlStrategy implements DeviceStrategy {
}
return result;
}
+
+ @Override
+ public String analysisMQTTReceiveData(String dateStr, String registerAddr, String dataStr, String operateType, DeviceInstallEntity deviceInstallEntity) {
+ String result = Constant.FAIL;
+ if (Integer.parseInt(dataStr) < 0) {
+ log.info("温控报文检验失败: " + dataStr);
+ return result;
+ }
+ String data = "";
+ if (operateType.equalsIgnoreCase(Constant.READ)) {// 读
+ nowDataService.saveNowHistoryData2(deviceInstallEntity.getDeviceAddr(), "温控", dataStr, "waterTemp", deviceInstallEntity.getBuildingId());
+ nowDataService.proWaterTemp(dateStr, deviceInstallEntity.getBuildingId(), "");//保存时间点温度
+ String avgTemp = nowDataService.selectAve(deviceInstallEntity.getBuildingId());
+ NowPublicDataEntity publicData = new NowPublicDataEntity();
+ publicData.setBuildingId(deviceInstallEntity.getBuildingId());
+ publicData.setUseWaterTemp(avgTemp);
+ publicData.setBackWaterTemp(avgTemp);
+ publicData.setSingleTemp(dataStr);//单箱温度
+ nowPublicDataService.saveNowHistoryPublicData(publicData);
+ log.info("温控号:" + deviceInstallEntity.getDeviceAddr() + ",温度值:" + dataStr + ",保存数据库成功!楼栋名称:" + deviceInstallEntity.getBuildingName());
+ return dataStr;
+ } else {// 写
+ result = Constant.SUCCESS;
+ }
+ return result;
+ }
}
diff --git a/user-service/src/main/java/com/mh/user/strategy/TempTransStrategy.java b/user-service/src/main/java/com/mh/user/strategy/TempTransStrategy.java
index 3d77cfe..e860274 100644
--- a/user-service/src/main/java/com/mh/user/strategy/TempTransStrategy.java
+++ b/user-service/src/main/java/com/mh/user/strategy/TempTransStrategy.java
@@ -3,6 +3,7 @@ package com.mh.user.strategy;
import com.alibaba.fastjson2.JSON;
import com.mh.user.constants.Constant;
import com.mh.user.entity.DeviceCodeParamEntity;
+import com.mh.user.entity.DeviceInstallEntity;
import com.mh.user.entity.NowPublicDataEntity;
import com.mh.user.service.BuildingService;
import com.mh.user.service.NowDataService;
@@ -19,7 +20,7 @@ import java.util.Map;
* @author LJF
* @version 1.0
* @project CHWS
- * @description 压力变送器策略
+ * @description 温度变送器策略
* @date 2024-03-18 09:51:17
*/
@Slf4j
@@ -117,4 +118,9 @@ public class TempTransStrategy implements DeviceStrategy {
}
return result;
}
+
+ @Override
+ public String analysisMQTTReceiveData(String dateStr, String registerAddr, String dataStr, String operateType, DeviceInstallEntity deviceInstallEntity) {
+ return "";
+ }
}
diff --git a/user-service/src/main/java/com/mh/user/strategy/TimeControlStrategy.java b/user-service/src/main/java/com/mh/user/strategy/TimeControlStrategy.java
index 4d4d398..61dcb05 100644
--- a/user-service/src/main/java/com/mh/user/strategy/TimeControlStrategy.java
+++ b/user-service/src/main/java/com/mh/user/strategy/TimeControlStrategy.java
@@ -3,6 +3,7 @@ package com.mh.user.strategy;
import com.mh.common.utils.StringUtils;
import com.mh.user.constants.Constant;
import com.mh.user.entity.DeviceCodeParamEntity;
+import com.mh.user.entity.DeviceInstallEntity;
import com.mh.user.service.NowDataService;
import com.mh.user.service.NowPublicDataService;
import com.mh.user.utils.ExchangeStringUtil;
@@ -181,14 +182,22 @@ public class TimeControlStrategy implements DeviceStrategy {
// 开关时间
// 发送:0603001E0001E5BB
// 返回:0603020041CDB4
- if (rec == 14 && isExactlyDivisible("001E", registerAddr)) {
+ if (rec == 14 && (isExactlyDivisible("001E", registerAddr)
+ || isExactlyDivisible("001F", registerAddr)
+ || isExactlyDivisible("0020", registerAddr)
+ || isExactlyDivisible("0021", registerAddr)
+ || isExactlyDivisible("0022", registerAddr)
+ || isExactlyDivisible("0023", registerAddr)
+ || isExactlyDivisible("0024", registerAddr)
+ || isExactlyDivisible("0025", registerAddr)
+ )) {
// 开关时间
- data = ExchangeStringUtil.hexToDec(checkStr.substring(8, 10));
+ data = ExchangeStringUtil.hexToDec(checkStr.substring(6, 10));
// 得出整数,然后拆分时分,比如data="65",换算成时分就是01:05
int totalMinutes = Integer.parseInt(data);
int hours = totalMinutes / 60;
int minutes = totalMinutes % 60;
- data = String.format("%02d:%02d", hours, minutes);
+ data = String.format("%02d%02d", hours, minutes);
} else if (rec == 14 && isExactlyDivisible("0018", registerAddr)) {
// 星期掩码
// 截取时间
@@ -284,4 +293,8 @@ public class TimeControlStrategy implements DeviceStrategy {
return false;
}
+ @Override
+ public String analysisMQTTReceiveData(String dateStr, String registerAddr, String dataStr, String operateType, DeviceInstallEntity deviceInstallEntity) {
+ return "";
+ }
}
diff --git a/user-service/src/main/java/com/mh/user/strategy/WaterLevelSwitchStrategy.java b/user-service/src/main/java/com/mh/user/strategy/WaterLevelSwitchStrategy.java
index f36a384..1320dd4 100644
--- a/user-service/src/main/java/com/mh/user/strategy/WaterLevelSwitchStrategy.java
+++ b/user-service/src/main/java/com/mh/user/strategy/WaterLevelSwitchStrategy.java
@@ -576,4 +576,9 @@ public class WaterLevelSwitchStrategy implements DeviceStrategy {
}
return Constant.FAIL;
}
+
+ @Override
+ public String analysisMQTTReceiveData(String dateStr, String registerAddr, String dataStr, String operateType, DeviceInstallEntity deviceInstallEntity) {
+ return "";
+ }
}
diff --git a/user-service/src/main/java/com/mh/user/strategy/WtMeterStrategy.java b/user-service/src/main/java/com/mh/user/strategy/WtMeterStrategy.java
index d4c2c20..5f7d50b 100644
--- a/user-service/src/main/java/com/mh/user/strategy/WtMeterStrategy.java
+++ b/user-service/src/main/java/com/mh/user/strategy/WtMeterStrategy.java
@@ -1,8 +1,10 @@
package com.mh.user.strategy;
import com.mh.common.utils.StringUtils;
+import com.mh.user.constants.Constant;
import com.mh.user.entity.DataResultEntity;
import com.mh.user.entity.DeviceCodeParamEntity;
+import com.mh.user.entity.DeviceInstallEntity;
import com.mh.user.service.DataResultService;
import com.mh.user.utils.CRC16;
import com.mh.user.utils.ExchangeStringUtil;
@@ -45,7 +47,7 @@ public class WtMeterStrategy implements DeviceStrategy {
String str = "";
if (deviceAddr != null && deviceAddr.length() > 0) {
try {
- if (StringUtils.isBlank(brand) || brand.equals("埃美柯")) {
+ if (StringUtils.isBlank(brand) || brand.equals("埃美柯") || brand.equals("艾美柯")) {
// 0 代表前面补充0,14 代表长度为14,d 代表参数为正数型
str = String.format("%014d", Long.parseLong(deviceAddr));//基表通讯号
// 转换位置
@@ -132,4 +134,36 @@ public class WtMeterStrategy implements DeviceStrategy {
}
return data;
}
+
+ @Override
+ public String analysisMQTTReceiveData(String dateStr, String registerAddr, String dataStr, String operateType, DeviceInstallEntity deviceInstallEntity) {
+ String data = Constant.FAIL;
+ if (Integer.parseInt(dataStr) < 0) {
+ return data;
+ }
+ log.info("水表表号: " + deviceInstallEntity.getDeviceAddr() + ",水表读数:" + dataStr);
+ // 考虑dataStr是否走大数或者走小数
+ if (Double.parseDouble(dataStr)-deviceInstallEntity.getLastValue()>100 || Double.parseDouble(dataStr)-deviceInstallEntity.getLastValue()<0) {
+ dataStr = String.valueOf(deviceInstallEntity.getLastValue());
+ }
+ try {
+ if (!StringUtils.isBlank(dataStr)) {
+ DataResultEntity dataResultEntity = new DataResultEntity();
+ dataResultEntity.setDeviceAddr(deviceInstallEntity.getDeviceAddr());//通讯编号
+ dataResultEntity.setDeviceType("水表");
+ dataResultEntity.setCurValue(Double.parseDouble(dataStr)); //当前读数
+ Date date = new Date();
+ dataResultEntity.setCurDate(date); //当前日期
+ dataResultEntity.setBuildingId(deviceInstallEntity.getBuildingId());
+ dataResultService.saveDataResult(dataResultEntity);
+ log.info("水表数据保存数据库成功!楼栋名称:" + deviceInstallEntity.getBuildingName());
+ }
+ } catch (Exception e) {
+ log.error("水表数据保存数据库失败!楼栋名称:{}", deviceInstallEntity.getBuildingName(), e);
+ }
+ if (!StringUtils.isBlank(data)) {
+ data = String.valueOf(Double.valueOf(data));
+ }
+ return data;
+ }
}
diff --git a/user-service/src/main/java/com/mh/user/utils/GetReadOrder485.java b/user-service/src/main/java/com/mh/user/utils/GetReadOrder485.java
index d95d3e9..7b24b07 100644
--- a/user-service/src/main/java/com/mh/user/utils/GetReadOrder485.java
+++ b/user-service/src/main/java/com/mh/user/utils/GetReadOrder485.java
@@ -60,7 +60,7 @@ public class GetReadOrder485 {
String str = "";
if (deviceAddr != null && deviceAddr.length() > 0) {
try {
- if (StringUtils.isBlank(brand) || brand.equals("艾美柯")) {
+ if (StringUtils.isBlank(brand) || brand.equals("艾美柯") || brand.equals("埃美柯")) {
// 0 代表前面补充0,14 代表长度为14,d 代表参数为正数型
str = String.format("%014d", Long.parseLong(deviceAddr));//基表通讯号
// 转换位置
diff --git a/user-service/src/main/resources/application-dev.yml b/user-service/src/main/resources/application-dev.yml
index 3e9f819..90fa9f5 100644
--- a/user-service/src/main/resources/application-dev.yml
+++ b/user-service/src/main/resources/application-dev.yml
@@ -96,5 +96,28 @@ logging:
amap:
key: 984603bf28ef94ac78765a3ea27a6c26
+mqttSpring:
+ # BASIC parameters are required.
+ BASIC:
+ protocol: MQTT
+ host: 192.168.1.79
+ port: 1883
+ username: test
+ password: test123456
+ client-id: chws_nfxy_mqtt_dev
+ # If the protocol is ws/wss, this value is required.
+ path:
+ # Topics that need to be subscribed when initially connecting to mqtt, multiple topics are divided by ",".
+ inbound-topic: chws_nfxy_mqtt_dev/read/events_upload/devices
+ # 此部分是提供给后端生成token返回给前端,让前端使用websocket方式去和MQTT实现交互的,笔者此文的案例中并没有去实现
+ # 无人机远程控制模式(drone remote control)
+ DRC:
+ protocol: WS
+ host: 192.168.1.79
+ port: 8083
+ path: /mqtt
+control:
+ topic: chws_nfxy_mqtt_dev/control/events_upload/devices
+
diff --git a/user-service/src/main/resources/application-prod.yml b/user-service/src/main/resources/application-prod.yml
index a2a4ab1..ff10893 100644
--- a/user-service/src/main/resources/application-prod.yml
+++ b/user-service/src/main/resources/application-prod.yml
@@ -14,10 +14,10 @@ spring:
# password: mh@803
## url: jdbc:sqlserver://120.25.220.177:32012;DatabaseName=M_CHWS;allowMultiQueries=true
#阿里云服务器-广州理工
-# url: jdbc:sqlserver://111.230.50.186:32012;DatabaseName=CHWS;allowMultiQueries=true
-# driver-class-name: com.microsoft.sqlserver.jdbc.SQLServerDriver
-# username: test
-# password: minghan123456@
+ url: jdbc:sqlserver://111.230.50.186:32012;DatabaseName=CHWS;allowMultiQueries=true
+ driver-class-name: com.microsoft.sqlserver.jdbc.SQLServerDriver
+ username: test
+ password: minghan123456@
# #华厦云服务器
# url: jdbc:sqlserver://127.0.0.1:1433;DatabaseName=CHWS;allowMultiQueries=true
# driver-class-name: com.microsoft.sqlserver.jdbc.SQLServerDriver
@@ -29,10 +29,10 @@ spring:
# username: chws_gsh
# password: Mhtech@803
#广商服务器
- url: jdbc:sqlserver://175.178.153.91:8033;DatabaseName=chws_gsh;allowMultiQueries=true
- driver-class-name: com.microsoft.sqlserver.jdbc.SQLServerDriver
- username: chws_gsh
- password: Mhtech@803gsh
+# url: jdbc:sqlserver://175.178.153.91:8033;DatabaseName=chws_gsh;allowMultiQueries=true
+# driver-class-name: com.microsoft.sqlserver.jdbc.SQLServerDriver
+# username: chws_gsh
+# password: Mhtech@803gsh
#本机
# url: jdbc:sqlserver://127.0.0.1:9956;DatabaseName=CHWS;allowMultiQueries=true
# driver-class-name: com.microsoft.sqlserver.jdbc.SQLServerDriver
@@ -64,7 +64,7 @@ spring:
# username: chws_jm
# password: Mhtech@803
-# # 华软江门
+# # 珠海北师大
# url: jdbc:sqlserver://127.0.0.1:8033;DatabaseName=chws_bsdz;allowMultiQueries=true
# driver-class-name: com.microsoft.sqlserver.jdbc.SQLServerDriver
# username: chws_bsdz