From c04d960ad3c542945e75692af4155abc74465ed6 Mon Sep 17 00:00:00 2001 From: 25604 Date: Tue, 9 Dec 2025 16:23:32 +0800 Subject: [PATCH] =?UTF-8?q?1=E3=80=81=E5=85=BC=E5=AE=B9MQTT=E5=8D=8F?= =?UTF-8?q?=E8=AE=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 19 +++ .../java/com/mh/user/config/MHConfig.java | 122 +++++++++++++++ .../com/mh/user/constants/ChannelName.java | 59 ++++++++ .../user/constants/CommonTopicResponse.java | 36 +++++ .../mh/user/constants/MqttClientOptions.java | 108 +++++++++++++ .../mh/user/constants/MqttProtocolEnum.java | 33 ++++ .../com/mh/user/constants/MqttUseEnum.java | 16 ++ .../com/mh/user/constants/ServiceReply.java | 20 +++ .../com/mh/user/constants/TopicConst.java | 31 ++++ .../java/com/mh/user/constants/TopicEnum.java | 60 ++++++++ .../user/entity/MqttSubscriptionEntity.java | 48 ++++++ .../com/mh/user/job/CollectionLoopRunner.java | 35 ++++- .../user/mapper/MqttSubscriptionMapper.java | 32 ++++ .../com/mh/user/model/SanShiFengDatas.java | 30 ++++ .../com/mh/user/model/SanShiFengReceiver.java | 29 ++++ .../user/service/MqttSubscriptionService.java | 26 ++++ .../impl/DeviceControlServiceImpl.java | 24 +-- .../impl/MqttSubscriptionServiceImpl.java | 61 ++++++++ .../user/service/mqtt/config/MqttConfig.java | 99 ++++++++++++ .../mqtt/config/MqttInboundConfig.java | 91 +++++++++++ .../mqtt/config/MqttMessageChannel.java | 55 +++++++ .../mqtt/config/MqttOutboundConfig.java | 51 +++++++ .../mqtt/handler/InboundMessageRouter.java | 62 ++++++++ .../service/mqtt/service/IEventsService.java | 49 ++++++ .../mqtt/service/IMqttGatewayService.java | 36 +++++ .../mqtt/service/IMqttMsgSenderService.java | 96 ++++++++++++ .../mqtt/service/IMqttTopicService.java | 40 +++++ .../mqtt/service/impl/EventsServiceImpl.java | 75 +++++++++ .../service/impl/MqttGatewayServiceImpl.java | 54 +++++++ .../impl/MqttMsgSenderServiceImpl.java | 142 ++++++++++++++++++ .../service/impl/MqttTopicServiceImpl.java | 40 +++++ .../strategy/BackTempControlStrategy.java | 33 ++++ .../com/mh/user/strategy/DeviceStrategy.java | 16 ++ .../mh/user/strategy/EleMeterStrategy.java | 32 ++++ .../user/strategy/HeatPumpStatusStrategy.java | 6 + .../mh/user/strategy/HeatPumpStrategy.java | 6 + .../user/strategy/MultiControlStrategy.java | 5 + .../user/strategy/PressureTransStrategy.java | 40 ++++- .../mh/user/strategy/StatusCheckStrategy.java | 6 + .../mh/user/strategy/TempControlStrategy.java | 27 ++++ .../mh/user/strategy/TempTransStrategy.java | 8 +- .../mh/user/strategy/TimeControlStrategy.java | 19 ++- .../strategy/WaterLevelSwitchStrategy.java | 5 + .../com/mh/user/strategy/WtMeterStrategy.java | 36 ++++- .../com/mh/user/utils/GetReadOrder485.java | 2 +- .../src/main/resources/application-dev.yml | 23 +++ .../src/main/resources/application-prod.yml | 18 +-- 47 files changed, 1931 insertions(+), 30 deletions(-) create mode 100644 user-service/src/main/java/com/mh/user/config/MHConfig.java create mode 100644 user-service/src/main/java/com/mh/user/constants/ChannelName.java create mode 100644 user-service/src/main/java/com/mh/user/constants/CommonTopicResponse.java create mode 100644 user-service/src/main/java/com/mh/user/constants/MqttClientOptions.java create mode 100644 user-service/src/main/java/com/mh/user/constants/MqttProtocolEnum.java create mode 100644 user-service/src/main/java/com/mh/user/constants/MqttUseEnum.java create mode 100644 user-service/src/main/java/com/mh/user/constants/ServiceReply.java create mode 100644 user-service/src/main/java/com/mh/user/constants/TopicConst.java create mode 100644 user-service/src/main/java/com/mh/user/constants/TopicEnum.java create mode 100644 user-service/src/main/java/com/mh/user/entity/MqttSubscriptionEntity.java create mode 100644 user-service/src/main/java/com/mh/user/mapper/MqttSubscriptionMapper.java create mode 100644 user-service/src/main/java/com/mh/user/model/SanShiFengDatas.java create mode 100644 user-service/src/main/java/com/mh/user/model/SanShiFengReceiver.java create mode 100644 user-service/src/main/java/com/mh/user/service/MqttSubscriptionService.java create mode 100644 user-service/src/main/java/com/mh/user/service/impl/MqttSubscriptionServiceImpl.java create mode 100644 user-service/src/main/java/com/mh/user/service/mqtt/config/MqttConfig.java create mode 100644 user-service/src/main/java/com/mh/user/service/mqtt/config/MqttInboundConfig.java create mode 100644 user-service/src/main/java/com/mh/user/service/mqtt/config/MqttMessageChannel.java create mode 100644 user-service/src/main/java/com/mh/user/service/mqtt/config/MqttOutboundConfig.java create mode 100644 user-service/src/main/java/com/mh/user/service/mqtt/handler/InboundMessageRouter.java create mode 100644 user-service/src/main/java/com/mh/user/service/mqtt/service/IEventsService.java create mode 100644 user-service/src/main/java/com/mh/user/service/mqtt/service/IMqttGatewayService.java create mode 100644 user-service/src/main/java/com/mh/user/service/mqtt/service/IMqttMsgSenderService.java create mode 100644 user-service/src/main/java/com/mh/user/service/mqtt/service/IMqttTopicService.java create mode 100644 user-service/src/main/java/com/mh/user/service/mqtt/service/impl/EventsServiceImpl.java create mode 100644 user-service/src/main/java/com/mh/user/service/mqtt/service/impl/MqttGatewayServiceImpl.java create mode 100644 user-service/src/main/java/com/mh/user/service/mqtt/service/impl/MqttMsgSenderServiceImpl.java create mode 100644 user-service/src/main/java/com/mh/user/service/mqtt/service/impl/MqttTopicServiceImpl.java diff --git a/pom.xml b/pom.xml index b864d0e..8706cff 100644 --- a/pom.xml +++ b/pom.xml @@ -46,6 +46,25 @@ 1.0.1.RELEASE + + + org.springframework + spring-messaging + 6.2.1 + + + + + org.springframework.integration + spring-integration-mqtt + 6.3.4 + + + org.springframework.boot + spring-boot-starter-integration + 3.4.2 + + diff --git a/user-service/src/main/java/com/mh/user/config/MHConfig.java b/user-service/src/main/java/com/mh/user/config/MHConfig.java new file mode 100644 index 0000000..7031388 --- /dev/null +++ b/user-service/src/main/java/com/mh/user/config/MHConfig.java @@ -0,0 +1,122 @@ +package com.mh.user.config; + +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; + +/** + * 读取项目相关配置 + * + * @author mh + */ +@Component +@ConfigurationProperties(prefix = "mh") +public class MHConfig +{ + /** 项目名称 */ + private String name; + + /** 版本 */ + private String version; + + /** 版权年份 */ + private String copyrightYear; + + /** 上传路径 */ + private static String profile; + + /** 获取地址开关 */ + private static boolean addressEnabled; + + /** 验证码类型 */ + private static String captchaType; + + public String getName() + { + return name; + } + + public void setName(String name) + { + this.name = name; + } + + public String getVersion() + { + return version; + } + + public void setVersion(String version) + { + this.version = version; + } + + public String getCopyrightYear() + { + return copyrightYear; + } + + public void setCopyrightYear(String copyrightYear) + { + this.copyrightYear = copyrightYear; + } + + public static String getProfile() + { + return profile; + } + + public void setProfile(String profile) + { + MHConfig.profile = profile; + } + + public static boolean isAddressEnabled() + { + return addressEnabled; + } + + public void setAddressEnabled(boolean addressEnabled) + { + MHConfig.addressEnabled = addressEnabled; + } + + public static String getCaptchaType() { + return captchaType; + } + + public void setCaptchaType(String captchaType) { + MHConfig.captchaType = captchaType; + } + + /** + * 获取导入上传路径 + */ + public static String getImportPath() + { + return getProfile() + "/import"; + } + + /** + * 获取头像上传路径 + */ + public static String getAvatarPath() + { + return getProfile() + "/avatar"; + } + + /** + * 获取下载路径 + */ + public static String getDownloadPath() + { + return getProfile() + "/download/"; + } + + /** + * 获取上传路径 + */ + public static String getUploadPath() + { + return getProfile() + "/upload"; + } +} diff --git a/user-service/src/main/java/com/mh/user/constants/ChannelName.java b/user-service/src/main/java/com/mh/user/constants/ChannelName.java new file mode 100644 index 0000000..8a1f901 --- /dev/null +++ b/user-service/src/main/java/com/mh/user/constants/ChannelName.java @@ -0,0 +1,59 @@ +package com.mh.user.constants; + +/** + * @author LJF + * @version 1.0 + * @project springboot-mqtt-demo + * @description 声明所有通道 + * @date 2024-10-29 16:04:19 + */ +public class ChannelName { + + /** + * 默认通道名称(防止出错) + */ + public static final String DEFAULT_BOUND = "default_bound"; + + /** + * 主动上报入站 + */ + public static final String INBOUND = "inbound"; + + /** + * 出站 + */ + public static final String OUTBOUND = "outbound"; + + + /** + * 入站主动上报 + */ + public static final String EVENTS_UPLOAD_INBOUND = "events_upload_inbound"; + + /** + * 入站主动采集 + */ + public static final String EVENTS_COLLECTION_INBOUND = "events_collection_inbound"; + + /** + * 入站主动控制 + */ + public static final String EVENTS_CONTROL_INBOUND = "events_control_inbound"; + + /** + * 默认进站处理 + */ + public static final String EVENTS_DEFAULT_INBOUND = "events_default_inbound"; + + public static final String REPLY_EVENTS_OUTBOUND = "reply_events_outbound"; + + /** + * 新珠江收到的信息 + */ + public static final String EVENTS_RECEIVE_INBOUND = "events_receive_inbound"; + + /** + * 接收服务端的数据报文 + */ + public static final String EVENTS_SEND_INBOUND = "events_send_inbound"; +} diff --git a/user-service/src/main/java/com/mh/user/constants/CommonTopicResponse.java b/user-service/src/main/java/com/mh/user/constants/CommonTopicResponse.java new file mode 100644 index 0000000..d123179 --- /dev/null +++ b/user-service/src/main/java/com/mh/user/constants/CommonTopicResponse.java @@ -0,0 +1,36 @@ +package com.mh.user.constants; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * Unified Topic response format + * + * @author sean.zhou + * @version 0.1 + * @date 2021/11/15 + */ +@Data +@AllArgsConstructor +@NoArgsConstructor +@Builder +@JsonIgnoreProperties(ignoreUnknown = true) +public class CommonTopicResponse { + + /** + * The command is sent and the response is matched by the tid and bid fields in the message, + * and the reply should keep the tid and bid the same. + */ + private String tid; + + private String bid; + + private String method; + + private T data; + + private Long timestamp; +} diff --git a/user-service/src/main/java/com/mh/user/constants/MqttClientOptions.java b/user-service/src/main/java/com/mh/user/constants/MqttClientOptions.java new file mode 100644 index 0000000..a2ec1a2 --- /dev/null +++ b/user-service/src/main/java/com/mh/user/constants/MqttClientOptions.java @@ -0,0 +1,108 @@ +package com.mh.user.constants; + +/** + * @author LJF + * @version 1.0 + * @project springboot-mqtt-demo + * @description mqtt连接的参数 + * @date 2024-10-29 14:46:24 + */ +public class MqttClientOptions { + + private MqttProtocolEnum protocol; + + private String host; + + private Integer port; + + private String username; + + private String password; + + private String clientId; + + private String path; + + /** + * 客户端连接的时候,订阅的主题 + */ + private String inboundTopic; + + public MqttProtocolEnum getProtocol() { + return protocol; + } + + public void setProtocol(MqttProtocolEnum protocol) { + this.protocol = protocol; + } + + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + + public Integer getPort() { + return port; + } + + public void setPort(Integer port) { + this.port = port; + } + + public String getUsername() { + return username; + } + + public void setUsername(String username) { + this.username = username; + } + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } + + public String getClientId() { + return clientId; + } + + public void setClientId(String clientId) { + this.clientId = clientId; + } + + public String getPath() { + return path; + } + + public void setPath(String path) { + this.path = path; + } + + public String getInboundTopic() { + return inboundTopic; + } + + public void setInboundTopic(String inboundTopic) { + this.inboundTopic = inboundTopic; + } + + @Override + public String toString() { + return "MqttClientOptions{" + + "protocol=" + protocol + + ", host='" + host + '\'' + + ", port=" + port + + ", username='" + username + '\'' + + ", password='" + password + '\'' + + ", clientId='" + clientId + '\'' + + ", path='" + path + '\'' + + ", inboundTopic='" + inboundTopic + '\'' + + '}'; + } +} diff --git a/user-service/src/main/java/com/mh/user/constants/MqttProtocolEnum.java b/user-service/src/main/java/com/mh/user/constants/MqttProtocolEnum.java new file mode 100644 index 0000000..ca80fac --- /dev/null +++ b/user-service/src/main/java/com/mh/user/constants/MqttProtocolEnum.java @@ -0,0 +1,33 @@ +package com.mh.user.constants; + +/** + * @author LJF + * @version 1.0 + * @project springboot-mqtt-demo + * @description 采用哪种协议进行数据交互 + * @date 2024-10-29 15:21:07 + */ +public enum MqttProtocolEnum { + + MQTT("tcp"), + + MQTTS("tcp"), + + WS("ws"), + + WSS("wss"); + + final String protocol; + + MqttProtocolEnum(String protocol) { + this.protocol = protocol; + } + + public String getProtocolAddr() { + return protocol + "://"; + } + + public String getProtocol() { + return protocol; + } +} diff --git a/user-service/src/main/java/com/mh/user/constants/MqttUseEnum.java b/user-service/src/main/java/com/mh/user/constants/MqttUseEnum.java new file mode 100644 index 0000000..5a9384d --- /dev/null +++ b/user-service/src/main/java/com/mh/user/constants/MqttUseEnum.java @@ -0,0 +1,16 @@ +package com.mh.user.constants; + +/** + * @author LJF + * @version 1.0 + * @project springboot-mqtt-demo + * @description mqtt选择模式 + * @date 2024-10-29 15:19:21 + */ +public enum MqttUseEnum { + + BASIC, + + DRC + +} diff --git a/user-service/src/main/java/com/mh/user/constants/ServiceReply.java b/user-service/src/main/java/com/mh/user/constants/ServiceReply.java new file mode 100644 index 0000000..40fdf4e --- /dev/null +++ b/user-service/src/main/java/com/mh/user/constants/ServiceReply.java @@ -0,0 +1,20 @@ +package com.mh.user.constants; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.Data; + +/** + * @author sean.zhou + * @version 0.1 + * @date 2021/11/22 + */ +@Data +@JsonIgnoreProperties(ignoreUnknown = true) +public class ServiceReply { + + private Integer result; + + private T info; + + private T output; +} diff --git a/user-service/src/main/java/com/mh/user/constants/TopicConst.java b/user-service/src/main/java/com/mh/user/constants/TopicConst.java new file mode 100644 index 0000000..2725a06 --- /dev/null +++ b/user-service/src/main/java/com/mh/user/constants/TopicConst.java @@ -0,0 +1,31 @@ +package com.mh.user.constants; + +/** + * All the topics that need to be used in the project. + * + * @author ljf + * @version 0.1 + * @date 2025-01-22 + */ +public class TopicConst { + + public static final String MH_UPLOAD = "mh_upload/"; + + public static final String EVENTS_UPLOAD = "events_upload/"; + + public static final String MH_COLLECTION = "mh_collection/"; + + public static final String EVENTS_COLLECTION = "events_collection/"; + + public static final String MH_CONTROL = "mh_control/"; + + public static final String EVENTS_CONTROL = "events_control/"; + + public static final String REGEX_SN = "[A-Za-z0-9]+"; + + public static final String THING_MODEL_PRE = "thing/"; + + public static final String PRODUCT = "product/"; + + public static final String SERVICES_SUF = "/services"; +} diff --git a/user-service/src/main/java/com/mh/user/constants/TopicEnum.java b/user-service/src/main/java/com/mh/user/constants/TopicEnum.java new file mode 100644 index 0000000..8f41357 --- /dev/null +++ b/user-service/src/main/java/com/mh/user/constants/TopicEnum.java @@ -0,0 +1,60 @@ +package com.mh.user.constants; + +import java.util.Arrays; +import java.util.regex.Pattern; + +import static com.mh.user.constants.TopicConst.*; + +/** + * @author ljf + * @version 1.0 + * @description: TODO + * @date 2024/11/06 14:28 + */ +public enum TopicEnum { + + /** + * 冷水机组客户端主动上报数据 + */ + CLIENT_UPLOAD_DATA(Pattern.compile("^" + MH_UPLOAD + EVENTS_UPLOAD + REGEX_SN + "$"), ChannelName.EVENTS_UPLOAD_INBOUND), + + /** + * 服务端采集数据 + */ + SERVER_COLLECTION_DATA(Pattern.compile("^" + MH_COLLECTION + EVENTS_COLLECTION + REGEX_SN + "$"), ChannelName.EVENTS_COLLECTION_INBOUND), + + /** + * 服务端控制指令 + */ + SERVER_CONTROL_DATA(Pattern.compile("^" + MH_CONTROL + EVENTS_CONTROL + REGEX_SN + "$"), ChannelName.EVENTS_CONTROL_INBOUND), + + /** + * 订阅服务端发送的主题命令 + */ + SERVER_SEND_DATA(Pattern.compile("^A/cmd/ctl/send" + "$"), ChannelName.EVENTS_SEND_INBOUND), + + UNKNOWN(Pattern.compile("^.*$"), ChannelName.DEFAULT_BOUND); + + final Pattern pattern; + + final String beanName; + + TopicEnum(Pattern pattern, String beanName) { + this.pattern = pattern; + this.beanName = beanName; + } + + public Pattern getPattern() { + return pattern; + } + + public String getBeanName() { + return beanName; + } + + public static TopicEnum find(String proName, String topic) { + // 去掉第一个"/"以及之前数据 + String finalTopic = topic.replaceFirst("^"+proName, "");; + return Arrays.stream(TopicEnum.values()).filter(topicEnum -> topicEnum.pattern.matcher(finalTopic).matches()).findAny().orElse(UNKNOWN); + } +} diff --git a/user-service/src/main/java/com/mh/user/entity/MqttSubscriptionEntity.java b/user-service/src/main/java/com/mh/user/entity/MqttSubscriptionEntity.java new file mode 100644 index 0000000..0c526f6 --- /dev/null +++ b/user-service/src/main/java/com/mh/user/entity/MqttSubscriptionEntity.java @@ -0,0 +1,48 @@ +package com.mh.user.entity; + +import com.fasterxml.jackson.annotation.JsonFormat; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonInclude; +import lombok.Data; + +import java.util.Date; +import java.util.Map; + +/** + * @author LJF + * @version 1.0 + * @project EEMCS + * @description mqtt订阅管理 + * @date 2025-02-14 13:47:07 + */ +@Data +public class MqttSubscriptionEntity { + + private Long id; + + private String topic; + + private Short qos; + + private String clientId; + + private String status; + + /** 创建者 */ + private String createBy; + + /** 创建时间 */ + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") + private Date createTime; + + /** 更新者 */ + private String updateBy; + + /** 更新时间 */ + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") + private Date updateTime; + + /** 备注 */ + private String remark; + +} diff --git a/user-service/src/main/java/com/mh/user/job/CollectionLoopRunner.java b/user-service/src/main/java/com/mh/user/job/CollectionLoopRunner.java index 66534b8..144f81f 100644 --- a/user-service/src/main/java/com/mh/user/job/CollectionLoopRunner.java +++ b/user-service/src/main/java/com/mh/user/job/CollectionLoopRunner.java @@ -1,13 +1,17 @@ package com.mh.user.job; +import com.mh.common.utils.StringUtils; import com.mh.user.constants.Constant; import com.mh.user.entity.AddCronJobReq; +import com.mh.user.entity.MqttSubscriptionEntity; import com.mh.user.manage.QuartzManager; import com.mh.user.netty.NettyEchoServer; import com.mh.user.serialport.SerialPortListener; import com.mh.user.serialport.SerialPortUtil; import com.mh.user.serialport.SerialTool; import com.mh.user.service.DeviceCodeParamService; +import com.mh.user.service.MqttSubscriptionService; +import com.mh.user.service.mqtt.service.IMqttTopicService; import com.mh.user.utils.CacheUtil; import com.mh.user.utils.ExchangeStringUtil; import com.mh.user.utils.GetReadOrder485; @@ -19,6 +23,7 @@ import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; /** @@ -42,6 +47,12 @@ public class CollectionLoopRunner implements ApplicationRunner { @Resource private GetWeatherInfoJob getWeatherInfoJob; + @Resource + private MqttSubscriptionService iMqttSubscriptionService; + + @Resource + private IMqttTopicService iMqttTopicService; + @Override public void run(ApplicationArguments args) throws Exception { // collectionMeterAndCloud();//采集 @@ -53,8 +64,28 @@ public class CollectionLoopRunner implements ApplicationRunner { // 获取天气数据 getWeatherInfoJob.getWeatherInfo(); // 启动netty端口 - NettyEchoServer nettyEchoServer = new NettyEchoServer(); - nettyEchoServer.bind(8098); +// NettyEchoServer nettyEchoServer = new NettyEchoServer(); +// nettyEchoServer.bind(8098); + // 初始化mqtt订阅记录 + initializeMqttSubscription(); + } + + /** + * 初始化mqtt订阅记录 + */ + private void initializeMqttSubscription() { + MqttSubscriptionEntity mqttSubscription = new MqttSubscriptionEntity(); + mqttSubscription.setStatus("0"); + List mqttSubscriptions = iMqttSubscriptionService.selectMqttSubList(mqttSubscription); + for (MqttSubscriptionEntity subscription : mqttSubscriptions) { + try { + if (!StringUtils.isBlank(subscription.getTopic())) { + iMqttTopicService.subscribe(subscription.getTopic(), subscription.getQos()); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } } private void simulationCollection() throws Exception { diff --git a/user-service/src/main/java/com/mh/user/mapper/MqttSubscriptionMapper.java b/user-service/src/main/java/com/mh/user/mapper/MqttSubscriptionMapper.java new file mode 100644 index 0000000..cccb641 --- /dev/null +++ b/user-service/src/main/java/com/mh/user/mapper/MqttSubscriptionMapper.java @@ -0,0 +1,32 @@ +package com.mh.user.mapper; + +import com.mh.user.entity.MqttSubscriptionEntity; +import org.apache.ibatis.annotations.Mapper; +import org.apache.ibatis.annotations.Param; +import org.apache.ibatis.annotations.Select; +import tk.mybatis.mapper.common.BaseMapper; + +import java.util.List; + +/** + * @author LJF + * @version 1.0 + * @project EEMCS + * @description mqtt订阅mapper类 + * @date 2025-02-14 14:00:58 + */ +@Mapper +public interface MqttSubscriptionMapper extends BaseMapper { + + @Select("") + List selectListByTopic(@Param("topic") String topic, + @Param("status") String status); + + @Select("SELECT top 1 * FROM mqtt_subscription WHERE id = #{id}") + MqttSubscriptionEntity selectById(@Param("id") String id); +} diff --git a/user-service/src/main/java/com/mh/user/model/SanShiFengDatas.java b/user-service/src/main/java/com/mh/user/model/SanShiFengDatas.java new file mode 100644 index 0000000..cc9c63e --- /dev/null +++ b/user-service/src/main/java/com/mh/user/model/SanShiFengDatas.java @@ -0,0 +1,30 @@ +package com.mh.user.model; + +import lombok.Data; + +/** + * @author LJF + * @version 1.0 + * @project EEMCS + * @description 研华数据体 + * @date 2025-01-22 14:47:25 + */ +@Data +public class SanShiFengDatas { + + /** + * 对应研华的标签值 + */ + private String tag; + + /** + * 上报值 + */ + private T value; + + /** + * 质量值 + */ + private T quality; + +} diff --git a/user-service/src/main/java/com/mh/user/model/SanShiFengReceiver.java b/user-service/src/main/java/com/mh/user/model/SanShiFengReceiver.java new file mode 100644 index 0000000..b76c17b --- /dev/null +++ b/user-service/src/main/java/com/mh/user/model/SanShiFengReceiver.java @@ -0,0 +1,29 @@ +package com.mh.user.model; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.Data; + +import java.util.List; + +/** + * @author LJF + * @version 1.0 + * @project EEMCS + * @description 研华网关发送接收数据 + * @date 2025-01-22 14:43:15 + */ +@Data +@JsonIgnoreProperties(ignoreUnknown = true) +public class SanShiFengReceiver { + + /** + * 数据集合 + */ + private List d; + + /** + * 主动上报数据时间(带T类型) + */ + private String ts; + +} diff --git a/user-service/src/main/java/com/mh/user/service/MqttSubscriptionService.java b/user-service/src/main/java/com/mh/user/service/MqttSubscriptionService.java new file mode 100644 index 0000000..9ff23a8 --- /dev/null +++ b/user-service/src/main/java/com/mh/user/service/MqttSubscriptionService.java @@ -0,0 +1,26 @@ +package com.mh.user.service; + +import com.mh.user.entity.MqttSubscriptionEntity; + +import java.util.List; + +/** + * @author LJF + * @version 1.0 + * @project EEMCS + * @description mqtt订阅管理 + * @date 2025-02-14 13:58:37 + */ +public interface MqttSubscriptionService { + + List selectMqttSubList(MqttSubscriptionEntity mqttSubscription); + + MqttSubscriptionEntity selectMqttSubById(String msId); + + int insertMqttSub(MqttSubscriptionEntity mqttSubscription); + + int updateMqttSub(MqttSubscriptionEntity mqttSubscription); + + int deleteMqttSubByIds(String[] msIds); + +} diff --git a/user-service/src/main/java/com/mh/user/service/impl/DeviceControlServiceImpl.java b/user-service/src/main/java/com/mh/user/service/impl/DeviceControlServiceImpl.java index 7e04c57..f3d9812 100644 --- a/user-service/src/main/java/com/mh/user/service/impl/DeviceControlServiceImpl.java +++ b/user-service/src/main/java/com/mh/user/service/impl/DeviceControlServiceImpl.java @@ -401,7 +401,7 @@ public class DeviceControlServiceImpl implements DeviceControlService { if (Constant.WRITE.equals(type)) { deviceCodeParam.setFunCode("06"); String time1 = split[1].substring(0, 2); - String time2 = split[1].substring(2, 4); + String time2 = split[1].substring(3, 5); deviceCodeParam.setDataValue(String.valueOf(Integer.parseInt(time1)*60 + Integer.parseInt(time2))); serialPortModel.setDataValue(String.valueOf(Integer.parseInt(time1)*60 + Integer.parseInt(time2))); } @@ -409,11 +409,11 @@ public class DeviceControlServiceImpl implements DeviceControlService { case "timeSetClose1": // 关时间设置\读取1 registerStr = ExchangeStringUtil.addZeroForNum(ExchangeStringUtil.decToHex(String.valueOf(31 + (scene - 1) * 18)), 4); - deviceCodeParam.setRegisterSize(2); + deviceCodeParam.setRegisterSize(1); if (Constant.WRITE.equals(type)) { deviceCodeParam.setFunCode("06"); String time1 = split[1].substring(0, 2); - String time2 = split[1].substring(2, 4); + String time2 = split[1].substring(3, 5); deviceCodeParam.setDataValue(String.valueOf(Integer.parseInt(time1)*60 + Integer.parseInt(time2))); serialPortModel.setDataValue(String.valueOf(Integer.parseInt(time1)*60 + Integer.parseInt(time2))); } @@ -425,7 +425,7 @@ public class DeviceControlServiceImpl implements DeviceControlService { if (Constant.WRITE.equals(type)) { deviceCodeParam.setFunCode("06"); String time1 = split[1].substring(0, 2); - String time2 = split[1].substring(2, 4); + String time2 = split[1].substring(3, 5); deviceCodeParam.setDataValue(String.valueOf(Integer.parseInt(time1)*60 + Integer.parseInt(time2))); serialPortModel.setDataValue(String.valueOf(Integer.parseInt(time1)*60 + Integer.parseInt(time2))); } @@ -433,11 +433,11 @@ public class DeviceControlServiceImpl implements DeviceControlService { case "timeSetClose2": // 关时间设置\读取2 registerStr = ExchangeStringUtil.addZeroForNum(ExchangeStringUtil.decToHex(String.valueOf(33 + (scene - 1) * 18)), 4); - deviceCodeParam.setRegisterSize(2); + deviceCodeParam.setRegisterSize(1); if (Constant.WRITE.equals(type)) { deviceCodeParam.setFunCode("06"); String time1 = split[1].substring(0, 2); - String time2 = split[1].substring(2, 4); + String time2 = split[1].substring(3, 5); deviceCodeParam.setDataValue(String.valueOf(Integer.parseInt(time1)*60 + Integer.parseInt(time2))); serialPortModel.setDataValue(String.valueOf(Integer.parseInt(time1)*60 + Integer.parseInt(time2))); } @@ -449,7 +449,7 @@ public class DeviceControlServiceImpl implements DeviceControlService { if (Constant.WRITE.equals(type)) { deviceCodeParam.setFunCode("06"); String time1 = split[1].substring(0, 2); - String time2 = split[1].substring(2, 4); + String time2 = split[1].substring(3, 5); deviceCodeParam.setDataValue(String.valueOf(Integer.parseInt(time1)*60 + Integer.parseInt(time2))); serialPortModel.setDataValue(String.valueOf(Integer.parseInt(time1)*60 + Integer.parseInt(time2))); } @@ -457,11 +457,11 @@ public class DeviceControlServiceImpl implements DeviceControlService { case "timeSetClose3": // 关时间设置\读取3 registerStr = ExchangeStringUtil.addZeroForNum(ExchangeStringUtil.decToHex(String.valueOf(35 + (scene - 1) * 18)), 4); - deviceCodeParam.setRegisterSize(2); + deviceCodeParam.setRegisterSize(1); if (Constant.WRITE.equals(type)) { deviceCodeParam.setFunCode("06"); String time1 = split[1].substring(0, 2); - String time2 = split[1].substring(2, 4); + String time2 = split[1].substring(3, 5); deviceCodeParam.setDataValue(String.valueOf(Integer.parseInt(time1)*60 + Integer.parseInt(time2))); serialPortModel.setDataValue(String.valueOf(Integer.parseInt(time1)*60 + Integer.parseInt(time2))); } @@ -473,7 +473,7 @@ public class DeviceControlServiceImpl implements DeviceControlService { if (Constant.WRITE.equals(type)) { deviceCodeParam.setFunCode("06"); String time1 = split[1].substring(0, 2); - String time2 = split[1].substring(2, 4); + String time2 = split[1].substring(3, 5); deviceCodeParam.setDataValue(String.valueOf(Integer.parseInt(time1)*60 + Integer.parseInt(time2))); serialPortModel.setDataValue(String.valueOf(Integer.parseInt(time1)*60 + Integer.parseInt(time2))); } @@ -481,11 +481,11 @@ public class DeviceControlServiceImpl implements DeviceControlService { case "timeSetClose4": // 关时间设置\读取4 registerStr = ExchangeStringUtil.addZeroForNum(ExchangeStringUtil.decToHex(String.valueOf(37 + (scene - 1) * 18)), 4); - deviceCodeParam.setRegisterSize(2); + deviceCodeParam.setRegisterSize(1); if (Constant.WRITE.equals(type)) { deviceCodeParam.setFunCode("06"); String time1 = split[1].substring(0, 2); - String time2 = split[1].substring(2, 4); + String time2 = split[1].substring(3, 5); deviceCodeParam.setDataValue(String.valueOf(Integer.parseInt(time1)*60 + Integer.parseInt(time2))); serialPortModel.setDataValue(String.valueOf(Integer.parseInt(time1)*60 + Integer.parseInt(time2))); } diff --git a/user-service/src/main/java/com/mh/user/service/impl/MqttSubscriptionServiceImpl.java b/user-service/src/main/java/com/mh/user/service/impl/MqttSubscriptionServiceImpl.java new file mode 100644 index 0000000..3722dcd --- /dev/null +++ b/user-service/src/main/java/com/mh/user/service/impl/MqttSubscriptionServiceImpl.java @@ -0,0 +1,61 @@ +package com.mh.user.service.impl; + +import com.mh.user.entity.MqttSubscriptionEntity; +import com.mh.user.mapper.MqttSubscriptionMapper; +import com.mh.user.service.MqttSubscriptionService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.List; + +/** + * @author LJF + * @version 1.0 + * @project EEMCS + * @description mqtt订阅实现类 + * @date 2025-02-14 13:59:27 + */ +@Service +public class MqttSubscriptionServiceImpl implements MqttSubscriptionService { + + private final MqttSubscriptionMapper mqttSubscriptionMapper; + + @Autowired + public MqttSubscriptionServiceImpl(MqttSubscriptionMapper mqttSubscriptionMapper) { + this.mqttSubscriptionMapper = mqttSubscriptionMapper; + } + + @Override + public List selectMqttSubList(MqttSubscriptionEntity mqttSubscription) { + if (mqttSubscription == null) { + return null; + } + return mqttSubscriptionMapper.selectListByTopic(mqttSubscription.getTopic(), mqttSubscription.getStatus()); + } + + @Override + public MqttSubscriptionEntity selectMqttSubById(String msId) { + return mqttSubscriptionMapper.selectById(msId); + } + + @Override + public int insertMqttSub(MqttSubscriptionEntity mqttSubscription) { + return mqttSubscriptionMapper.insert(mqttSubscription); + } + + @Override + public int updateMqttSub(MqttSubscriptionEntity mqttSubscription) { + return mqttSubscriptionMapper.updateByPrimaryKey(mqttSubscription); + } + + @Override + public int deleteMqttSubByIds(String[] msIds) { + if (msIds != null && msIds.length > 0) { + for (String msId : msIds) { + mqttSubscriptionMapper.deleteByPrimaryKey(msId); + } + return msIds.length; + } + return 0; + } +} diff --git a/user-service/src/main/java/com/mh/user/service/mqtt/config/MqttConfig.java b/user-service/src/main/java/com/mh/user/service/mqtt/config/MqttConfig.java new file mode 100644 index 0000000..a307097 --- /dev/null +++ b/user-service/src/main/java/com/mh/user/service/mqtt/config/MqttConfig.java @@ -0,0 +1,99 @@ +package com.mh.user.service.mqtt.config; + +import com.mh.user.constants.MqttClientOptions; +import com.mh.user.constants.MqttProtocolEnum; +import com.mh.user.constants.MqttUseEnum; +import lombok.Data; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; +import org.springframework.integration.mqtt.core.MqttPahoClientFactory; +import org.springframework.util.StringUtils; + +import java.util.Map; + +/** + * @author LJF + * @version 1.0 + * @project springboot-mqtt-demo + * @description mqtt连接配置 + * @date 2024-10-29 14:44:51 + */ +@Configuration +@Data +@ConfigurationProperties +public class MqttConfig { + + private static Map mqttSpring; + + public void setMqttSpring(Map mqtt) { + MqttConfig.mqttSpring = mqtt; + } + + /** + * 获取mqtt基本配置 + * @return + */ + static MqttClientOptions getBasicMqttClientOptions() { + if (!mqttSpring.containsKey(MqttUseEnum.BASIC)) { + throw new Error("请先配置MQTT的基本连接参数,否则无法启动项目"); + } + return mqttSpring.get(MqttUseEnum.BASIC); + } + + /** + * 拼接获取对应mqtt的连接地址 + * @param options + * @return + */ + public static String getMqttAddress(MqttClientOptions options) { + StringBuilder addr = new StringBuilder(); + addr.append(options.getProtocol().getProtocolAddr()) + .append(options.getHost().trim()) + .append(":") + .append(options.getPort()); + if ((options.getProtocol() == MqttProtocolEnum.WS || options.getProtocol() == MqttProtocolEnum.WSS) + && StringUtils.hasText(options.getPath())) { + addr.append(options.getPath()); + } + return addr.toString(); + } + + public static String getBasicMqttAddress() { + return getMqttAddress(getBasicMqttClientOptions()); + } + + /** + * 获取连接参数,注入到spring中 + * @return + */ + @Bean + public MqttConnectOptions mqttConnectionOptions() { + + MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); + + MqttClientOptions customizeOptions = getBasicMqttClientOptions(); + String basicMqttAddress = getBasicMqttAddress(); + mqttConnectOptions.setServerURIs(new String[]{basicMqttAddress}); + mqttConnectOptions.setUserName(StringUtils.hasText(customizeOptions.getUsername()) ? + customizeOptions.getUsername() : ""); + mqttConnectOptions.setPassword(StringUtils.hasText(customizeOptions.getPassword()) ? + customizeOptions.getPassword().toCharArray() : new char[0]); + // 直接进行自动连接 + mqttConnectOptions.setAutomaticReconnect(true); + // 时间间隔时间10s + mqttConnectOptions.setKeepAliveInterval(10); + + return mqttConnectOptions; + } + + @Bean + public MqttPahoClientFactory mqttClientFactory() { + DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); + factory.setConnectionOptions(mqttConnectionOptions()); + return factory; + } + +} diff --git a/user-service/src/main/java/com/mh/user/service/mqtt/config/MqttInboundConfig.java b/user-service/src/main/java/com/mh/user/service/mqtt/config/MqttInboundConfig.java new file mode 100644 index 0000000..509f764 --- /dev/null +++ b/user-service/src/main/java/com/mh/user/service/mqtt/config/MqttInboundConfig.java @@ -0,0 +1,91 @@ +package com.mh.user.service.mqtt.config; + +import com.mh.user.constants.ChannelName; +import com.mh.user.constants.MqttClientOptions; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.annotation.IntegrationComponentScan; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.endpoint.MessageProducerSupport; +import org.springframework.integration.mqtt.core.MqttPahoClientFactory; +import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; +import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; +import org.springframework.integration.mqtt.support.MqttHeaders; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHandler; + +import javax.annotation.Resource; + +/** + * @author LJF + * @version 1.0 + * @project springboot-mqtt-demo + * @description 入站配置 + * @date 2024-10-29 16:22:10 + */ +@Slf4j +@Configuration +@IntegrationComponentScan +public class MqttInboundConfig { + + @Autowired + private MqttPahoClientFactory mqttClientFactory; + + @Resource(name = ChannelName.INBOUND) + private MessageChannel inboundChannel; + + private String clientId; + + /** + * 入站适配器 + * @return + */ + @Bean(name = "adapter") + public MessageProducerSupport mqttInbound() { + MqttClientOptions options = MqttConfig.getBasicMqttClientOptions(); + // 此处初始化的时候,默认订阅了配置文件中已经写好的topic + // 如果需要订阅多个,可以自己手动订阅,会写一个addTopic()进行添加订阅 + clientId = options.getClientId() + "_consumer_" + System.currentTimeMillis(); + MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter( + clientId, + mqttClientFactory, + options.getInboundTopic().split(",")); + // System.out.println("每一次都会入站适配器吗?"+clientId); + DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter(); + // 统一是字节处理 + converter.setPayloadAsBytes(true); + // 设置消息转换器 + adapter.setConverter(converter); + // 设置qos(quality of service) + // 0:最多一次传输(消息会丢失), + // 1:至少一次传输(消息会重复), + // 2:只有当消息发送成功时才确认(消息不回丢,但延迟高)。 + adapter.setQos(0); + // 设置在接收已经订阅的主题信息后,发送给哪个通道,具体的发送方法需要翻上层的抽象类 + adapter.setOutputChannel(inboundChannel); + return adapter; + } + + /** + * 默认声明一个消息处理器,用于处理无效的消息 + * @return + */ + @Bean + @ServiceActivator(inputChannel = ChannelName.DEFAULT_BOUND) + public MessageHandler handler() { + return message -> { + log.info("The default channel does not handle messages." + + "\nTopic: {}" + + "\nPayload: {}", + message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC), + message.getPayload()); + }; + } + + public String getClientId() { + return clientId; + } + +} diff --git a/user-service/src/main/java/com/mh/user/service/mqtt/config/MqttMessageChannel.java b/user-service/src/main/java/com/mh/user/service/mqtt/config/MqttMessageChannel.java new file mode 100644 index 0000000..a3322e3 --- /dev/null +++ b/user-service/src/main/java/com/mh/user/service/mqtt/config/MqttMessageChannel.java @@ -0,0 +1,55 @@ +package com.mh.user.service.mqtt.config; + +import com.mh.user.constants.ChannelName; +import lombok.extern.slf4j.Slf4j; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.messaging.MessageChannel; + +/** + * @author LJF + * @version 1.0 + * @project springboot-mqtt-demo + * @description 声明所有通道的定义类 + * @date 2024-10-29 16:23:32 + */ +@Slf4j +@Configuration +public class MqttMessageChannel { + + @Bean(name = ChannelName.OUTBOUND) + public MessageChannel outboundChannel() { + return new DirectChannel(); + } + + @Bean(name = ChannelName.INBOUND) + public MessageChannel inboundChannel() { + return new DirectChannel(); + } + + /** + * 事件主动上报通道 + * @return + */ + @Bean(name = ChannelName.EVENTS_UPLOAD_INBOUND) + public MessageChannel eventsUploadInbound() { + return new DirectChannel(); + } + + @Bean(name = ChannelName.EVENTS_COLLECTION_INBOUND) + public MessageChannel eventsCollectionInbound() { + return new DirectChannel(); + } + + @Bean(name = ChannelName.EVENTS_CONTROL_INBOUND) + public MessageChannel eventsControlInbound() { + return new DirectChannel(); + } + + @Bean(name = ChannelName.EVENTS_SEND_INBOUND) + public MessageChannel eventsSendInbound() { + return new DirectChannel(); + } + +} diff --git a/user-service/src/main/java/com/mh/user/service/mqtt/config/MqttOutboundConfig.java b/user-service/src/main/java/com/mh/user/service/mqtt/config/MqttOutboundConfig.java new file mode 100644 index 0000000..dc4f34d --- /dev/null +++ b/user-service/src/main/java/com/mh/user/service/mqtt/config/MqttOutboundConfig.java @@ -0,0 +1,51 @@ +package com.mh.user.service.mqtt.config; + +import com.mh.user.constants.ChannelName; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.annotation.IntegrationComponentScan; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.mqtt.core.MqttPahoClientFactory; +import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; +import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; +import org.springframework.messaging.MessageHandler; + +/** + * @author LJF + * @version 1.0 + * @project springboot-mqtt-demo + * @description 入站配置 + * @date 2024-10-29 16:22:10 + */ +@Slf4j +@Configuration +@IntegrationComponentScan +public class MqttOutboundConfig { + + @Autowired + private MqttPahoClientFactory mqttClientFactory; + + /** + * 默认声明一个出站处理器,用于处理无效的消息 + * @return + */ + @Bean + @ServiceActivator(inputChannel = ChannelName.OUTBOUND) + public MessageHandler mqttOutbound() { + MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler( + MqttConfig.getBasicMqttClientOptions().getClientId() + "_producer_" + System.currentTimeMillis(), + mqttClientFactory); + DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter(); + // use byte types uniformly + converter.setPayloadAsBytes(true); + + messageHandler.setAsync(true); + messageHandler.setDefaultQos(0); + messageHandler.setConverter(converter); + return messageHandler; + } + + +} diff --git a/user-service/src/main/java/com/mh/user/service/mqtt/handler/InboundMessageRouter.java b/user-service/src/main/java/com/mh/user/service/mqtt/handler/InboundMessageRouter.java new file mode 100644 index 0000000..d17bacb --- /dev/null +++ b/user-service/src/main/java/com/mh/user/service/mqtt/handler/InboundMessageRouter.java @@ -0,0 +1,62 @@ +package com.mh.user.service.mqtt.handler; + +import com.mh.user.config.MHConfig; +import com.mh.user.constants.ChannelName; +import com.mh.user.constants.TopicEnum; +import com.mh.user.utils.SpringContextUtils; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.integration.annotation.Router; +import org.springframework.integration.mqtt.support.MqttHeaders; +import org.springframework.integration.router.AbstractMessageRouter; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHeaders; +import org.springframework.stereotype.Component; + +import java.util.Collection; +import java.util.Collections; +import java.util.Objects; + +/** + * @author LJF + * @version 1.0 + * @project springboot-mqtt-demo + * @description 入站消息路由分发中心 + * @date 2024-10-29 17:04:17 + */ +@Slf4j +@Component +public class InboundMessageRouter extends AbstractMessageRouter { + + /** 系统基础配置 */ + @Autowired + private MHConfig mHConfig; + + /** + * 目前只需要这个方式,后期在拓展使用IntegrationFlow方式 + * @param message + * @return + */ + @Router(inputChannel = ChannelName.INBOUND) + @Override + protected Collection determineTargetChannels(Message message) { + MessageHeaders headers = message.getHeaders(); + String topic = Objects.requireNonNull(headers.get(MqttHeaders.RECEIVED_TOPIC)).toString(); +// byte[] payload = (byte[]) message.getPayload(); +// log.info("从当前主题 topic: {}, 接收到的消息:{}", topic, new String(payload)); + // 判断当前主题是否是当前项目的,温湿度目前写死的 +// if (!topic.startsWith(mHConfig.getName()) && !topic.contains("/temp")) { +// log.info("当前主题 topic: {} 不是当前项目的,直接丢弃", topic); +// return Collections.singleton((MessageChannel) SpringContextUtils.getBean(ChannelName.DEFAULT_BOUND)); +// } + // 找到对应的主题消息通道 + if (topic.contains("/temp")) { + return Collections.singleton((MessageChannel) SpringContextUtils.getBean(ChannelName.EVENTS_UPLOAD_INBOUND)); + } else { + TopicEnum topicEnum = TopicEnum.find(mHConfig.getName() + "/", topic); + MessageChannel bean = (MessageChannel) SpringContextUtils.getBean(topicEnum.getBeanName()); + return Collections.singleton(bean); + } + } +} diff --git a/user-service/src/main/java/com/mh/user/service/mqtt/service/IEventsService.java b/user-service/src/main/java/com/mh/user/service/mqtt/service/IEventsService.java new file mode 100644 index 0000000..5b89468 --- /dev/null +++ b/user-service/src/main/java/com/mh/user/service/mqtt/service/IEventsService.java @@ -0,0 +1,49 @@ +package com.mh.user.service.mqtt.service; + +import org.springframework.messaging.MessageHeaders; + +import java.io.IOException; + +/** + * @author LJF + * @version 1.0 + * @project springboot-mqtt-demo + * @description 通道处理类 + * @date 2024-11-05 11:30:26 + */ +public interface IEventsService { + + /** + * 处理主动上报 + * @param receiver + * @param headers + * @return + */ + void handleInboundUpload(byte[] receiver, MessageHeaders headers) throws IOException; + + + /** + * 处理服务器主动采集上报 + * @param receiver + * @param headers + * @return + */ + void handleInboundCollection(byte[] receiver, MessageHeaders headers) throws IOException; + + /** + * 处理控制上报 + * @param receiver + * @param headers + * @return + */ + void handleInboundControl(byte[] receiver, MessageHeaders headers) throws IOException; + + /** + * 处理发送 + * @param receiver + * @param headers + * @throws IOException + */ + void handleInboundSend(byte[] receiver, MessageHeaders headers) throws IOException; + +} diff --git a/user-service/src/main/java/com/mh/user/service/mqtt/service/IMqttGatewayService.java b/user-service/src/main/java/com/mh/user/service/mqtt/service/IMqttGatewayService.java new file mode 100644 index 0000000..f13f5f4 --- /dev/null +++ b/user-service/src/main/java/com/mh/user/service/mqtt/service/IMqttGatewayService.java @@ -0,0 +1,36 @@ +package com.mh.user.service.mqtt.service; + +/** + * @author LJF + * @version 1.0 + * @project springboot-mqtt + * @description 消息网关 + * @date 2024-10-30 14:43:55 + */ +//@Component +//@Configuration +//@MessagingGateway(defaultRequestChannel = ChannelName.OUTBOUND) +public interface IMqttGatewayService { + + /** + * 发送消息 + * @param topic + * @param payload + */ + void publish(String topic, String payload, int qos); + + /** + * 发送消息 + * @param topic + * @param payload + */ + void publish(String topic, byte[] payload); + + /** + * 发送消息并带上qos + * @param topic + * @param payload + * @param qos + */ + void publish(String topic, byte[] payload, int qos); +} diff --git a/user-service/src/main/java/com/mh/user/service/mqtt/service/IMqttMsgSenderService.java b/user-service/src/main/java/com/mh/user/service/mqtt/service/IMqttMsgSenderService.java new file mode 100644 index 0000000..38d0998 --- /dev/null +++ b/user-service/src/main/java/com/mh/user/service/mqtt/service/IMqttMsgSenderService.java @@ -0,0 +1,96 @@ +package com.mh.user.service.mqtt.service; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.mh.user.constants.CommonTopicResponse; +import com.mh.user.constants.ServiceReply; + +/** + * @author LJF + * @version 1.0 + * @project springboot-mqtt-demo + * @description Mqtt发送消息 + * @date 2024-10-29 17:31:40 + */ +public interface IMqttMsgSenderService { + + void publish(String topic, String pushMessage); + + /** + * 发布消息 + * @param topic target + * @param response message + */ + void publish(String topic, CommonTopicResponse response); + + /** + * 使用qos发布消息 + * + * @param topic target + * @param qos qos + * @param response message + */ + void publish(String topic, int qos, CommonTopicResponse response); + + /** + * 发送消息并同时接收响应 + * @param clazz + * @param topic + * @param response 通知启动是否成功 + * @return + */ + T publishWithReply(Class clazz, String topic, CommonTopicResponse response); + + /** + * 发送消息并同时接收响应 + * @param clazz + * @param topic + * @param response + * @param retryTime + * @param + * @return + */ + T publishWithReply(Class clazz, String topic, CommonTopicResponse response, int retryTime); + + /** + * 专门用于发送服务消息 + * @param clazz The generic class for ServiceReply. + * @param sn + * @param method + * @param data + * @param bid + * @param + * @return + */ + ServiceReply publishServicesTopic(TypeReference clazz, String sn, String method, Object data, String bid); + + /** + * 仅用于为服务发送消息,不设置接收到的子类型 + * @param sn + * @param method + * @param data + * @param bid + * @return + */ + ServiceReply publishServicesTopic(String sn, String method, Object data, String bid); + + /** + * 专门用于发送服务消息 + * @param clazz The generic class for ServiceReply. + * @param sn + * @param method + * @param data + * @param + * @return + */ + ServiceReply publishServicesTopic(TypeReference clazz, String sn, String method, Object data); + + /** + * 仅用于为服务发送消息,不设置接收到的子类型 + * @param sn + * @param method + * @param data + * @return + */ + ServiceReply publishServicesTopic(String sn, String method, Object data); + +} diff --git a/user-service/src/main/java/com/mh/user/service/mqtt/service/IMqttTopicService.java b/user-service/src/main/java/com/mh/user/service/mqtt/service/IMqttTopicService.java new file mode 100644 index 0000000..b97b182 --- /dev/null +++ b/user-service/src/main/java/com/mh/user/service/mqtt/service/IMqttTopicService.java @@ -0,0 +1,40 @@ +package com.mh.user.service.mqtt.service; + +import org.springframework.integration.mqtt.support.MqttHeaders; +import org.springframework.messaging.handler.annotation.Header; + +/** + * @author LJF + * @version 1.0 + * @project springboot-mqtt-demo + * @description mqtt订阅主题 + * @date 2024-10-29 17:29:52 + */ +public interface IMqttTopicService { + + /** + * 订阅主题 + * @param topic + */ + void subscribe(@Header(MqttHeaders.TOPIC) String topic); + + /** + * 订阅主题并设置qos + * @param topic + * @param qos + */ + void subscribe(@Header(MqttHeaders.TOPIC) String topic, int qos); + + /** + * 解绑主题 + * @param topic + */ + void unsubscribe(@Header(MqttHeaders.TOPIC) String topic); + + /** + * 获取已订阅的主题 + * @return + */ + String[] getSubscribedTopics(); + +} diff --git a/user-service/src/main/java/com/mh/user/service/mqtt/service/impl/EventsServiceImpl.java b/user-service/src/main/java/com/mh/user/service/mqtt/service/impl/EventsServiceImpl.java new file mode 100644 index 0000000..29f04ca --- /dev/null +++ b/user-service/src/main/java/com/mh/user/service/mqtt/service/impl/EventsServiceImpl.java @@ -0,0 +1,75 @@ +package com.mh.user.service.mqtt.service.impl; + +import com.alibaba.fastjson2.JSONObject; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.mh.user.constants.ChannelName; +import com.mh.user.model.SanShiFengReceiver; +import com.mh.user.service.mqtt.service.IEventsService; +import io.netty.util.CharsetUtil; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.messaging.MessageHeaders; +import org.springframework.stereotype.Service; + +import java.io.IOException; +import java.util.Objects; + +/** + * @author LJF + * @version 1.0 + * @project springboot-mqtt-demo + * @description 通道消息处理 + * @date 2024-11-05 11:42:30 + */ +@Slf4j +@Service +public class EventsServiceImpl implements IEventsService { + + @Autowired + private ObjectMapper mapper; + + @ServiceActivator(inputChannel = ChannelName.EVENTS_UPLOAD_INBOUND) + @Override + public void handleInboundUpload(byte[] receiver, MessageHeaders headers) { + // 获取当前的主题 + String topic = Objects.requireNonNull(headers.get("mqtt_receivedTopic")).toString(); + handleInboundData(receiver, topic, "主动上报数据"); + } + + @ServiceActivator(inputChannel = ChannelName.EVENTS_COLLECTION_INBOUND) + @Override + public void handleInboundCollection(byte[] receiver, MessageHeaders headers) { + // 获取当前的主题 + String topic = Objects.requireNonNull(headers.get("mqtt_receivedTopic")).toString(); + handleInboundData(receiver, topic, "主动下发采集数据"); + } + + @ServiceActivator(inputChannel = ChannelName.EVENTS_CONTROL_INBOUND) + @Override + public void handleInboundControl(byte[] receiver, MessageHeaders headers) { + // 获取当前的主题 + String topic = Objects.requireNonNull(headers.get("mqtt_receivedTopic")).toString(); + handleInboundData(receiver, topic, "控制指令下发"); + } + + @ServiceActivator(inputChannel = ChannelName.EVENTS_SEND_INBOUND) + @Override + public void handleInboundSend(byte[] receiver, MessageHeaders headers) throws IOException { + String sendStr = new String(receiver, CharsetUtil.UTF_8); + log.info("接收到控制指令下发=>{}", sendStr); + } + + private void handleInboundData(byte[] receiver,String topic, String logMessage) { + try { + SanShiFengReceiver commonTopicReceiver = new SanShiFengReceiver(); + commonTopicReceiver = mapper.readValue(receiver, SanShiFengReceiver.class); + log.info("主题:{},类型:{}: ,数据:{}", topic, logMessage, commonTopicReceiver.toString()); + // 不使用消息队列,查询属于哪种设备类型,然后通过策略进行数据解析 + } catch (IOException e) { + log.error("处理数据时发生错误: ", e); + } + } + + +} diff --git a/user-service/src/main/java/com/mh/user/service/mqtt/service/impl/MqttGatewayServiceImpl.java b/user-service/src/main/java/com/mh/user/service/mqtt/service/impl/MqttGatewayServiceImpl.java new file mode 100644 index 0000000..73e935c --- /dev/null +++ b/user-service/src/main/java/com/mh/user/service/mqtt/service/impl/MqttGatewayServiceImpl.java @@ -0,0 +1,54 @@ +package com.mh.user.service.mqtt.service.impl; + +import com.mh.user.service.mqtt.service.IMqttGatewayService; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.integration.mqtt.support.MqttHeaders; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.stereotype.Service; + +/** + * @author LJF + * @version 1.0 + * @project EEMCS + * @description 网关实现类 + * @date 2025-02-07 08:44:55 + */ +@Service +public class MqttGatewayServiceImpl implements IMqttGatewayService { + + private final MessageChannel outboundChannel; + + public MqttGatewayServiceImpl(@Qualifier("outbound") MessageChannel outboundChannel) { + this.outboundChannel = outboundChannel; + } + + @Override + public synchronized void publish(String topic, String payload, int qos) { + outboundChannel.send( + MessageBuilder + .withPayload(payload) + .setHeader(MqttHeaders.TOPIC, topic) + .setHeader(MqttHeaders.QOS, qos) + .build()); + } + + @Override + public void publish(String topic, byte[] payload) { + outboundChannel.send( + MessageBuilder + .withPayload(payload) + .setHeader(MqttHeaders.TOPIC, topic) + .build()); + } + + @Override + public void publish(String topic, byte[] payload, int qos) { + outboundChannel.send( + MessageBuilder + .withPayload(payload) + .setHeader(MqttHeaders.TOPIC, topic) + .setHeader(MqttHeaders.QOS, qos) + .build()); + } +} diff --git a/user-service/src/main/java/com/mh/user/service/mqtt/service/impl/MqttMsgSenderServiceImpl.java b/user-service/src/main/java/com/mh/user/service/mqtt/service/impl/MqttMsgSenderServiceImpl.java new file mode 100644 index 0000000..01e1a49 --- /dev/null +++ b/user-service/src/main/java/com/mh/user/service/mqtt/service/impl/MqttMsgSenderServiceImpl.java @@ -0,0 +1,142 @@ +package com.mh.user.service.mqtt.service.impl; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.mh.user.constants.CommonTopicResponse; +import com.mh.user.constants.ServiceReply; +import com.mh.user.constants.TopicConst; +import com.mh.user.service.mqtt.service.IMqttGatewayService; +import com.mh.user.service.mqtt.service.IMqttMsgSenderService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import org.springframework.util.StringUtils; + +import java.util.Objects; +import java.util.UUID; + +/** + * @author LJF + * @version 1.0 + * @project springboot-mqtt-demo + * @description mqtt不同类型发送消息 + * @date 2024-10-30 14:30:03 + */ +@Slf4j +@Service +public class MqttMsgSenderServiceImpl implements IMqttMsgSenderService { + + @Autowired + private IMqttGatewayService mqttGatewayService; + + @Autowired + private ObjectMapper mapper; + + + /** + * 发布,默认qos为0,非持久化 + * + * @param pushMessage + * @param topic + */ + @Override + public void publish(String topic, String pushMessage) { + synchronized (this) {//注意:这里一定要同步,否则,在多线程publish的情况下,线程会发生死锁,分析见文章最后补充 + try { + mqttGatewayService.publish(topic, pushMessage, 0); + log.info("发送主题:{},消息:{}", topic, pushMessage); + } catch (Exception e) { + log.error("发送主题异常:{},消息:{}", topic, pushMessage, e); + throw new RuntimeException(e); + } + } + } + + @Override + public void publish(String topic, CommonTopicResponse response) { + this.publish(topic, 1, response); + } + + @Override + public void publish(String topic, int qos, CommonTopicResponse response) { + try { + log.info("发送主题:{},消息:{}", topic, response.toString()); + mqttGatewayService.publish(topic, mapper.writeValueAsBytes(response), qos); + } catch (JsonProcessingException e) { + log.error("发送主题:{},消息:{}", topic, response.toString(), e); + throw new RuntimeException(e); + } + } + + @Override + public T publishWithReply(Class clazz, String topic, CommonTopicResponse response) { + return this.publishWithReply(clazz, topic, response, 2); + } + + @Override + public T publishWithReply(Class clazz, String topic, CommonTopicResponse response, int retryTime) { +// AtomicInteger time = new AtomicInteger(0); +// // Retry three times +// while (time.getAndIncrement() <= retryTime) { +// this.publish(topic, response); +// +// Chan> chan = Chan.getInstance(); +// // If the message is not received in 0.5 seconds then resend it again. +// CommonTopicReceiver receiver = chan.get(response.getTid()); +// +// // Need to match tid and bid. +// if (Objects.nonNull(receiver) && receiver.getTid().equals(response.getTid()) && +// receiver.getBid().equals(response.getBid())) { +// if (clazz.isAssignableFrom(receiver.getData().getClass())) { +// return receiver.getData(); +// } +// throw new TypeMismatchException(receiver.getData(), clazz); +// } +// // It must be guaranteed that the tid and bid of each message are different. +// response.setBid(UUID.randomUUID().toString()); +// response.setTid(UUID.randomUUID().toString()); +// } +// throw new RuntimeException("没有消息接收到"); + return null; + } + + @Override + public ServiceReply publishServicesTopic(TypeReference clazz, String sn, String method, Object data, String bid) { + String topic = TopicConst.THING_MODEL_PRE + TopicConst.PRODUCT + sn + TopicConst.SERVICES_SUF; + ServiceReply reply = this.publishWithReply(ServiceReply.class, topic, + CommonTopicResponse.builder() + .tid(UUID.randomUUID().toString()) + .bid(StringUtils.hasText(bid) ? bid : UUID.randomUUID().toString()) + .timestamp(System.currentTimeMillis()) + .method(method) + .data(Objects.requireNonNull(data, "")) + .build()); + if (Objects.isNull(clazz)) { + return reply; + } + // put together in "output" + if (Objects.nonNull(reply.getInfo())) { + reply.setOutput(mapper.convertValue(reply.getInfo(), clazz)); + } + if (Objects.nonNull(reply.getOutput())) { + reply.setOutput(mapper.convertValue(reply.getOutput(), clazz)); + } + return reply; + } + + @Override + public ServiceReply publishServicesTopic(String sn, String method, Object data, String bid) { + return this.publishServicesTopic(null, sn, method, data, bid); + } + + @Override + public ServiceReply publishServicesTopic(TypeReference clazz, String sn, String method, Object data) { + return this.publishServicesTopic(clazz, sn, method, data, null); + } + + @Override + public ServiceReply publishServicesTopic(String sn, String method, Object data) { + return this.publishServicesTopic(null, sn, method, data, null); + } +} diff --git a/user-service/src/main/java/com/mh/user/service/mqtt/service/impl/MqttTopicServiceImpl.java b/user-service/src/main/java/com/mh/user/service/mqtt/service/impl/MqttTopicServiceImpl.java new file mode 100644 index 0000000..5eba529 --- /dev/null +++ b/user-service/src/main/java/com/mh/user/service/mqtt/service/impl/MqttTopicServiceImpl.java @@ -0,0 +1,40 @@ +package com.mh.user.service.mqtt.service.impl; + +import com.mh.framework.mqtt.service.IMqttTopicService; +import jakarta.annotation.Resource; +import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; +import org.springframework.stereotype.Service; + +/** + * @author LJF + * @version 1.0 + * @project springboot-mqtt-demo + * @description 订阅主题实现类 + * @date 2024-10-29 17:36:31 + */ +@Service +public class MqttTopicServiceImpl implements IMqttTopicService { + + @Resource + private MqttPahoMessageDrivenChannelAdapter adapter; + + @Override + public void subscribe(String topic) { + adapter.addTopic(topic); + } + + @Override + public void subscribe(String topic, int qos) { + adapter.addTopic(topic, qos); + } + + @Override + public void unsubscribe(String topic) { + adapter.removeTopic(topic); + } + + @Override + public String[] getSubscribedTopics() { + return adapter.getTopic(); + } +} diff --git a/user-service/src/main/java/com/mh/user/strategy/BackTempControlStrategy.java b/user-service/src/main/java/com/mh/user/strategy/BackTempControlStrategy.java index ad81db6..94df221 100644 --- a/user-service/src/main/java/com/mh/user/strategy/BackTempControlStrategy.java +++ b/user-service/src/main/java/com/mh/user/strategy/BackTempControlStrategy.java @@ -2,6 +2,7 @@ package com.mh.user.strategy; import com.mh.user.constants.Constant; import com.mh.user.entity.DeviceCodeParamEntity; +import com.mh.user.entity.DeviceInstallEntity; import com.mh.user.entity.NowPublicDataEntity; import com.mh.user.service.BuildingService; import com.mh.user.service.NowDataService; @@ -100,4 +101,36 @@ public class BackTempControlStrategy implements DeviceStrategy { } return result; } + + @Override + public String analysisMQTTReceiveData(String dateStr, + String registerAddr, + String dataStr, + String operateType, + DeviceInstallEntity deviceInstallEntity) { + String result = Constant.FAIL; + if (Integer.parseInt(dataStr) < 0) { + log.info("回水温控报文检验失败: " + dataStr); + return result; + } + String addr = deviceInstallEntity.getDeviceAddr();//地址 + String data = ""; + if (operateType.equalsIgnoreCase(Constant.READ)) {// 读 + Double fdata = Double.parseDouble(dataStr); + nowDataService.saveNowHistoryData2(addr, "回水温控", String.valueOf(fdata), "waterTemp", deviceInstallEntity.getBuildingId()); + nowDataService.proWaterTemp(dateStr, deviceInstallEntity.getBuildingId(), "");//保存时间点温度 + String avgTemp = nowDataService.selectAve(deviceInstallEntity.getBuildingId()); + NowPublicDataEntity publicData = new NowPublicDataEntity(); + publicData.setBuildingId(deviceInstallEntity.getBuildingId()); + publicData.setUseWaterTemp(avgTemp); + publicData.setBackWaterTemp(avgTemp); + publicData.setSingleTemp(String.valueOf(fdata));//单箱温度 + nowPublicDataService.saveNowHistoryPublicData(publicData); + log.info("回水温控号:" + addr + ",温度值:" + fdata + ",保存数据库成功!楼栋名称:" + deviceInstallEntity.getBuildingName()); + return String.valueOf(fdata); + } else if (operateType.equalsIgnoreCase(Constant.WRITE)) {// 写 + result = Constant.SUCCESS; + } + return result; + } } diff --git a/user-service/src/main/java/com/mh/user/strategy/DeviceStrategy.java b/user-service/src/main/java/com/mh/user/strategy/DeviceStrategy.java index b316ba0..1a848c2 100644 --- a/user-service/src/main/java/com/mh/user/strategy/DeviceStrategy.java +++ b/user-service/src/main/java/com/mh/user/strategy/DeviceStrategy.java @@ -1,6 +1,7 @@ package com.mh.user.strategy; import com.mh.user.entity.DeviceCodeParamEntity; +import com.mh.user.entity.DeviceInstallEntity; /** * @author LJF @@ -17,4 +18,19 @@ public interface DeviceStrategy { String createOrders(DeviceCodeParamEntity deviceCodeParamEntity); String analysisReceiveData(String dateStr, String deviceType, String registerAddr, String brand, String buildingId, String buildingName, String dataStr, DeviceCodeParamEntity deviceCodeParamEntity); + + /** + * 解析MQTT报文 + * @param dateStr + * @param registerAddr + * @param dataStr 已经是解析好的数据 + * @param operateType 操作类型,读取/设置 + * @param deviceInstallEntity + * @return + */ + String analysisMQTTReceiveData(String dateStr, + String registerAddr, + String dataStr, + String operateType, + DeviceInstallEntity deviceInstallEntity); } diff --git a/user-service/src/main/java/com/mh/user/strategy/EleMeterStrategy.java b/user-service/src/main/java/com/mh/user/strategy/EleMeterStrategy.java index 4ee8f26..d3feb43 100644 --- a/user-service/src/main/java/com/mh/user/strategy/EleMeterStrategy.java +++ b/user-service/src/main/java/com/mh/user/strategy/EleMeterStrategy.java @@ -1,8 +1,10 @@ package com.mh.user.strategy; import com.mh.common.utils.StringUtils; +import com.mh.user.constants.Constant; import com.mh.user.entity.DataResultEntity; import com.mh.user.entity.DeviceCodeParamEntity; +import com.mh.user.entity.DeviceInstallEntity; import com.mh.user.service.DataResultService; import com.mh.user.utils.ExchangeStringUtil; import com.mh.user.utils.SpringBeanUtil; @@ -156,4 +158,34 @@ public class EleMeterStrategy implements DeviceStrategy { } return data; } + + @Override + public String analysisMQTTReceiveData(String dateStr, String registerAddr, String dataStr, String operateType, DeviceInstallEntity deviceInstallEntity) { + String data = Constant.FAIL; + if (Integer.parseInt(dataStr) < 0) { + return data; + } + log.info("电表表号:{},电表读数:{}", deviceInstallEntity.getDeviceAddr(), dataStr); + // 考虑dataStr是否走大数或者走小数 + if (Double.parseDouble(dataStr)-deviceInstallEntity.getLastValue()>1000 || Double.parseDouble(dataStr)-deviceInstallEntity.getLastValue()<0) { + dataStr = String.valueOf(deviceInstallEntity.getLastValue()); + } + try { + DataResultEntity dataResultEntity = new DataResultEntity(); + dataResultEntity.setDeviceAddr(deviceInstallEntity.getDeviceAddr());//通讯编号 + dataResultEntity.setDeviceType("电表"); + dataResultEntity.setBuildingId(deviceInstallEntity.getBuildingId()); + dataResultEntity.setCurValue(Double.parseDouble(dataStr)); //当前读数 + Date date = new Date(); + dataResultEntity.setCurDate(date); //当前日期 + dataResultService.saveDataResult(dataResultEntity); + log.info("电表数据保存数据库成功! 楼栋名称:{}", deviceInstallEntity.getBuildingName()); + } catch (Exception e) { + log.error("电表数据保存数据库失败!楼栋名称:{}", deviceInstallEntity.getBuildingName(), e); + } + if (!StringUtils.isBlank(dataStr)) { + data = String.valueOf(Double.valueOf(dataStr)); //00010.76,去除读数前面带0的情况 + } + return data; + } } diff --git a/user-service/src/main/java/com/mh/user/strategy/HeatPumpStatusStrategy.java b/user-service/src/main/java/com/mh/user/strategy/HeatPumpStatusStrategy.java index fc776c7..e3a35d8 100644 --- a/user-service/src/main/java/com/mh/user/strategy/HeatPumpStatusStrategy.java +++ b/user-service/src/main/java/com/mh/user/strategy/HeatPumpStatusStrategy.java @@ -4,6 +4,7 @@ import com.alibaba.fastjson2.JSON; import com.mh.common.utils.StringUtils; import com.mh.user.constants.Constant; import com.mh.user.entity.DeviceCodeParamEntity; +import com.mh.user.entity.DeviceInstallEntity; import com.mh.user.service.BuildingService; import com.mh.user.service.NowDataService; import com.mh.user.service.NowPublicDataService; @@ -238,4 +239,9 @@ public class HeatPumpStatusStrategy implements DeviceStrategy { } return result; } + + @Override + public String analysisMQTTReceiveData(String dateStr, String registerAddr, String dataStr, String operateType, DeviceInstallEntity deviceInstallEntity) { + return ""; + } } diff --git a/user-service/src/main/java/com/mh/user/strategy/HeatPumpStrategy.java b/user-service/src/main/java/com/mh/user/strategy/HeatPumpStrategy.java index 718e87d..e5afbc3 100644 --- a/user-service/src/main/java/com/mh/user/strategy/HeatPumpStrategy.java +++ b/user-service/src/main/java/com/mh/user/strategy/HeatPumpStrategy.java @@ -3,6 +3,7 @@ package com.mh.user.strategy; import com.mh.common.utils.StringUtils; import com.mh.user.constants.Constant; import com.mh.user.entity.DeviceCodeParamEntity; +import com.mh.user.entity.DeviceInstallEntity; import com.mh.user.entity.NowPublicDataEntity; import com.mh.user.service.*; import com.mh.user.utils.ExchangeStringUtil; @@ -670,4 +671,9 @@ public class HeatPumpStrategy implements DeviceStrategy { stringBuffer.append(dataType); return sValue; } + + @Override + public String analysisMQTTReceiveData(String dateStr, String registerAddr, String dataStr, String operateType, DeviceInstallEntity deviceInstallEntity) { + return ""; + } } diff --git a/user-service/src/main/java/com/mh/user/strategy/MultiControlStrategy.java b/user-service/src/main/java/com/mh/user/strategy/MultiControlStrategy.java index 8208c3b..b7f90b8 100644 --- a/user-service/src/main/java/com/mh/user/strategy/MultiControlStrategy.java +++ b/user-service/src/main/java/com/mh/user/strategy/MultiControlStrategy.java @@ -390,4 +390,9 @@ public class MultiControlStrategy implements DeviceStrategy { } return l1; } + + @Override + public String analysisMQTTReceiveData(String dateStr, String registerAddr, String dataStr, String operateType, DeviceInstallEntity deviceInstallEntity) { + return ""; + } } diff --git a/user-service/src/main/java/com/mh/user/strategy/PressureTransStrategy.java b/user-service/src/main/java/com/mh/user/strategy/PressureTransStrategy.java index fdffa4b..0a9a1d4 100644 --- a/user-service/src/main/java/com/mh/user/strategy/PressureTransStrategy.java +++ b/user-service/src/main/java/com/mh/user/strategy/PressureTransStrategy.java @@ -38,7 +38,7 @@ public class PressureTransStrategy implements DeviceStrategy { } public static PressureTransStrategy getInstance() { - return PressureTransStrategy.SingletonHolder.INSTANCE; + return SingletonHolder.INSTANCE; } @Override @@ -128,4 +128,42 @@ public class PressureTransStrategy implements DeviceStrategy { } return result; } + + @Override + public String analysisMQTTReceiveData(String dateStr, String registerAddr, String dataStr, String operateType, DeviceInstallEntity deviceInstallEntity) { + String result = "fail"; + if (Integer.parseInt(dataStr) < 0) { + return result; + } + if (operateType.equalsIgnoreCase(Constant.READ)) {// 读 + Double tankHeight = 0.0; + // 查询当前压变有低区 + if (null != deviceInstallEntity + && !StringUtils.isBlank(deviceInstallEntity.getDeviceName())) { + if (deviceInstallEntity.getDeviceName().contains("低")) { + tankHeight = buildingService.queryLowTankHeight(deviceInstallEntity.getBuildingId());//水箱高,从数据库获取 + if (tankHeight == null) { + tankHeight = 2.0; + } + } + } + Double wtLevel = Double.parseDouble(dataStr)/tankHeight * 100; //水箱水位 + log.info("------水箱水高:" + wtLevel + "------"); + if (wtLevel <= 0) { + wtLevel = 0.0; + } else if (wtLevel >= 100) { + wtLevel = 100.0; + } + DecimalFormat df = new DecimalFormat("0.0"); + String strWtLevel = df.format(wtLevel); + // 更新device_install数据 + assert deviceInstallEntity != null; + deviceInstallService.updateLastValueByOther(deviceInstallEntity.getDeviceAddr(), strWtLevel, "压变", deviceInstallEntity.getBuildingId()); + nowDataService.saveNowHistoryData2(deviceInstallEntity.getDeviceAddr(), "压变", strWtLevel, "waterLevel", deviceInstallEntity.getBuildingId()); + nowDataService.proWaterLevel(dateStr, deviceInstallEntity.getBuildingId(), deviceInstallEntity.getDeviceAddr()); //楼栋水位 + log.info("时间:"+ dateStr +"压变号:" + deviceInstallEntity.getDeviceAddr() + ",保存数据库成功!楼栋名称:" + deviceInstallEntity.getBuildingName()); + result = strWtLevel; + } + return result; + } } diff --git a/user-service/src/main/java/com/mh/user/strategy/StatusCheckStrategy.java b/user-service/src/main/java/com/mh/user/strategy/StatusCheckStrategy.java index 7ed1e00..7632bfd 100644 --- a/user-service/src/main/java/com/mh/user/strategy/StatusCheckStrategy.java +++ b/user-service/src/main/java/com/mh/user/strategy/StatusCheckStrategy.java @@ -2,6 +2,7 @@ package com.mh.user.strategy; import com.mh.user.constants.Constant; import com.mh.user.entity.DeviceCodeParamEntity; +import com.mh.user.entity.DeviceInstallEntity; import com.mh.user.entity.NowPublicDataEntity; import com.mh.user.service.*; import com.mh.user.utils.ExchangeStringUtil; @@ -124,4 +125,9 @@ public class StatusCheckStrategy implements DeviceStrategy { } return Constant.FAIL; } + + @Override + public String analysisMQTTReceiveData(String dateStr, String registerAddr, String dataStr, String operateType, DeviceInstallEntity deviceInstallEntity) { + return ""; + } } diff --git a/user-service/src/main/java/com/mh/user/strategy/TempControlStrategy.java b/user-service/src/main/java/com/mh/user/strategy/TempControlStrategy.java index a8dbeed..5572c2f 100644 --- a/user-service/src/main/java/com/mh/user/strategy/TempControlStrategy.java +++ b/user-service/src/main/java/com/mh/user/strategy/TempControlStrategy.java @@ -2,6 +2,7 @@ package com.mh.user.strategy; import com.mh.user.constants.Constant; import com.mh.user.entity.DeviceCodeParamEntity; +import com.mh.user.entity.DeviceInstallEntity; import com.mh.user.entity.NowPublicDataEntity; import com.mh.user.service.BuildingService; import com.mh.user.service.NowDataService; @@ -113,4 +114,30 @@ public class TempControlStrategy implements DeviceStrategy { } return result; } + + @Override + public String analysisMQTTReceiveData(String dateStr, String registerAddr, String dataStr, String operateType, DeviceInstallEntity deviceInstallEntity) { + String result = Constant.FAIL; + if (Integer.parseInt(dataStr) < 0) { + log.info("温控报文检验失败: " + dataStr); + return result; + } + String data = ""; + if (operateType.equalsIgnoreCase(Constant.READ)) {// 读 + nowDataService.saveNowHistoryData2(deviceInstallEntity.getDeviceAddr(), "温控", dataStr, "waterTemp", deviceInstallEntity.getBuildingId()); + nowDataService.proWaterTemp(dateStr, deviceInstallEntity.getBuildingId(), "");//保存时间点温度 + String avgTemp = nowDataService.selectAve(deviceInstallEntity.getBuildingId()); + NowPublicDataEntity publicData = new NowPublicDataEntity(); + publicData.setBuildingId(deviceInstallEntity.getBuildingId()); + publicData.setUseWaterTemp(avgTemp); + publicData.setBackWaterTemp(avgTemp); + publicData.setSingleTemp(dataStr);//单箱温度 + nowPublicDataService.saveNowHistoryPublicData(publicData); + log.info("温控号:" + deviceInstallEntity.getDeviceAddr() + ",温度值:" + dataStr + ",保存数据库成功!楼栋名称:" + deviceInstallEntity.getBuildingName()); + return dataStr; + } else {// 写 + result = Constant.SUCCESS; + } + return result; + } } diff --git a/user-service/src/main/java/com/mh/user/strategy/TempTransStrategy.java b/user-service/src/main/java/com/mh/user/strategy/TempTransStrategy.java index 3d77cfe..e860274 100644 --- a/user-service/src/main/java/com/mh/user/strategy/TempTransStrategy.java +++ b/user-service/src/main/java/com/mh/user/strategy/TempTransStrategy.java @@ -3,6 +3,7 @@ package com.mh.user.strategy; import com.alibaba.fastjson2.JSON; import com.mh.user.constants.Constant; import com.mh.user.entity.DeviceCodeParamEntity; +import com.mh.user.entity.DeviceInstallEntity; import com.mh.user.entity.NowPublicDataEntity; import com.mh.user.service.BuildingService; import com.mh.user.service.NowDataService; @@ -19,7 +20,7 @@ import java.util.Map; * @author LJF * @version 1.0 * @project CHWS - * @description 压力变送器策略 + * @description 温度变送器策略 * @date 2024-03-18 09:51:17 */ @Slf4j @@ -117,4 +118,9 @@ public class TempTransStrategy implements DeviceStrategy { } return result; } + + @Override + public String analysisMQTTReceiveData(String dateStr, String registerAddr, String dataStr, String operateType, DeviceInstallEntity deviceInstallEntity) { + return ""; + } } diff --git a/user-service/src/main/java/com/mh/user/strategy/TimeControlStrategy.java b/user-service/src/main/java/com/mh/user/strategy/TimeControlStrategy.java index 4d4d398..61dcb05 100644 --- a/user-service/src/main/java/com/mh/user/strategy/TimeControlStrategy.java +++ b/user-service/src/main/java/com/mh/user/strategy/TimeControlStrategy.java @@ -3,6 +3,7 @@ package com.mh.user.strategy; import com.mh.common.utils.StringUtils; import com.mh.user.constants.Constant; import com.mh.user.entity.DeviceCodeParamEntity; +import com.mh.user.entity.DeviceInstallEntity; import com.mh.user.service.NowDataService; import com.mh.user.service.NowPublicDataService; import com.mh.user.utils.ExchangeStringUtil; @@ -181,14 +182,22 @@ public class TimeControlStrategy implements DeviceStrategy { // 开关时间 // 发送:0603001E0001E5BB // 返回:0603020041CDB4 - if (rec == 14 && isExactlyDivisible("001E", registerAddr)) { + if (rec == 14 && (isExactlyDivisible("001E", registerAddr) + || isExactlyDivisible("001F", registerAddr) + || isExactlyDivisible("0020", registerAddr) + || isExactlyDivisible("0021", registerAddr) + || isExactlyDivisible("0022", registerAddr) + || isExactlyDivisible("0023", registerAddr) + || isExactlyDivisible("0024", registerAddr) + || isExactlyDivisible("0025", registerAddr) + )) { // 开关时间 - data = ExchangeStringUtil.hexToDec(checkStr.substring(8, 10)); + data = ExchangeStringUtil.hexToDec(checkStr.substring(6, 10)); // 得出整数,然后拆分时分,比如data="65",换算成时分就是01:05 int totalMinutes = Integer.parseInt(data); int hours = totalMinutes / 60; int minutes = totalMinutes % 60; - data = String.format("%02d:%02d", hours, minutes); + data = String.format("%02d%02d", hours, minutes); } else if (rec == 14 && isExactlyDivisible("0018", registerAddr)) { // 星期掩码 // 截取时间 @@ -284,4 +293,8 @@ public class TimeControlStrategy implements DeviceStrategy { return false; } + @Override + public String analysisMQTTReceiveData(String dateStr, String registerAddr, String dataStr, String operateType, DeviceInstallEntity deviceInstallEntity) { + return ""; + } } diff --git a/user-service/src/main/java/com/mh/user/strategy/WaterLevelSwitchStrategy.java b/user-service/src/main/java/com/mh/user/strategy/WaterLevelSwitchStrategy.java index f36a384..1320dd4 100644 --- a/user-service/src/main/java/com/mh/user/strategy/WaterLevelSwitchStrategy.java +++ b/user-service/src/main/java/com/mh/user/strategy/WaterLevelSwitchStrategy.java @@ -576,4 +576,9 @@ public class WaterLevelSwitchStrategy implements DeviceStrategy { } return Constant.FAIL; } + + @Override + public String analysisMQTTReceiveData(String dateStr, String registerAddr, String dataStr, String operateType, DeviceInstallEntity deviceInstallEntity) { + return ""; + } } diff --git a/user-service/src/main/java/com/mh/user/strategy/WtMeterStrategy.java b/user-service/src/main/java/com/mh/user/strategy/WtMeterStrategy.java index d4c2c20..5f7d50b 100644 --- a/user-service/src/main/java/com/mh/user/strategy/WtMeterStrategy.java +++ b/user-service/src/main/java/com/mh/user/strategy/WtMeterStrategy.java @@ -1,8 +1,10 @@ package com.mh.user.strategy; import com.mh.common.utils.StringUtils; +import com.mh.user.constants.Constant; import com.mh.user.entity.DataResultEntity; import com.mh.user.entity.DeviceCodeParamEntity; +import com.mh.user.entity.DeviceInstallEntity; import com.mh.user.service.DataResultService; import com.mh.user.utils.CRC16; import com.mh.user.utils.ExchangeStringUtil; @@ -45,7 +47,7 @@ public class WtMeterStrategy implements DeviceStrategy { String str = ""; if (deviceAddr != null && deviceAddr.length() > 0) { try { - if (StringUtils.isBlank(brand) || brand.equals("埃美柯")) { + if (StringUtils.isBlank(brand) || brand.equals("埃美柯") || brand.equals("艾美柯")) { // 0 代表前面补充0,14 代表长度为14,d 代表参数为正数型 str = String.format("%014d", Long.parseLong(deviceAddr));//基表通讯号 // 转换位置 @@ -132,4 +134,36 @@ public class WtMeterStrategy implements DeviceStrategy { } return data; } + + @Override + public String analysisMQTTReceiveData(String dateStr, String registerAddr, String dataStr, String operateType, DeviceInstallEntity deviceInstallEntity) { + String data = Constant.FAIL; + if (Integer.parseInt(dataStr) < 0) { + return data; + } + log.info("水表表号: " + deviceInstallEntity.getDeviceAddr() + ",水表读数:" + dataStr); + // 考虑dataStr是否走大数或者走小数 + if (Double.parseDouble(dataStr)-deviceInstallEntity.getLastValue()>100 || Double.parseDouble(dataStr)-deviceInstallEntity.getLastValue()<0) { + dataStr = String.valueOf(deviceInstallEntity.getLastValue()); + } + try { + if (!StringUtils.isBlank(dataStr)) { + DataResultEntity dataResultEntity = new DataResultEntity(); + dataResultEntity.setDeviceAddr(deviceInstallEntity.getDeviceAddr());//通讯编号 + dataResultEntity.setDeviceType("水表"); + dataResultEntity.setCurValue(Double.parseDouble(dataStr)); //当前读数 + Date date = new Date(); + dataResultEntity.setCurDate(date); //当前日期 + dataResultEntity.setBuildingId(deviceInstallEntity.getBuildingId()); + dataResultService.saveDataResult(dataResultEntity); + log.info("水表数据保存数据库成功!楼栋名称:" + deviceInstallEntity.getBuildingName()); + } + } catch (Exception e) { + log.error("水表数据保存数据库失败!楼栋名称:{}", deviceInstallEntity.getBuildingName(), e); + } + if (!StringUtils.isBlank(data)) { + data = String.valueOf(Double.valueOf(data)); + } + return data; + } } diff --git a/user-service/src/main/java/com/mh/user/utils/GetReadOrder485.java b/user-service/src/main/java/com/mh/user/utils/GetReadOrder485.java index d95d3e9..7b24b07 100644 --- a/user-service/src/main/java/com/mh/user/utils/GetReadOrder485.java +++ b/user-service/src/main/java/com/mh/user/utils/GetReadOrder485.java @@ -60,7 +60,7 @@ public class GetReadOrder485 { String str = ""; if (deviceAddr != null && deviceAddr.length() > 0) { try { - if (StringUtils.isBlank(brand) || brand.equals("艾美柯")) { + if (StringUtils.isBlank(brand) || brand.equals("艾美柯") || brand.equals("埃美柯")) { // 0 代表前面补充0,14 代表长度为14,d 代表参数为正数型 str = String.format("%014d", Long.parseLong(deviceAddr));//基表通讯号 // 转换位置 diff --git a/user-service/src/main/resources/application-dev.yml b/user-service/src/main/resources/application-dev.yml index 3e9f819..90fa9f5 100644 --- a/user-service/src/main/resources/application-dev.yml +++ b/user-service/src/main/resources/application-dev.yml @@ -96,5 +96,28 @@ logging: amap: key: 984603bf28ef94ac78765a3ea27a6c26 +mqttSpring: + # BASIC parameters are required. + BASIC: + protocol: MQTT + host: 192.168.1.79 + port: 1883 + username: test + password: test123456 + client-id: chws_nfxy_mqtt_dev + # If the protocol is ws/wss, this value is required. + path: + # Topics that need to be subscribed when initially connecting to mqtt, multiple topics are divided by ",". + inbound-topic: chws_nfxy_mqtt_dev/read/events_upload/devices + # 此部分是提供给后端生成token返回给前端,让前端使用websocket方式去和MQTT实现交互的,笔者此文的案例中并没有去实现 + # 无人机远程控制模式(drone remote control) + DRC: + protocol: WS + host: 192.168.1.79 + port: 8083 + path: /mqtt +control: + topic: chws_nfxy_mqtt_dev/control/events_upload/devices + diff --git a/user-service/src/main/resources/application-prod.yml b/user-service/src/main/resources/application-prod.yml index a2a4ab1..ff10893 100644 --- a/user-service/src/main/resources/application-prod.yml +++ b/user-service/src/main/resources/application-prod.yml @@ -14,10 +14,10 @@ spring: # password: mh@803 ## url: jdbc:sqlserver://120.25.220.177:32012;DatabaseName=M_CHWS;allowMultiQueries=true #阿里云服务器-广州理工 -# url: jdbc:sqlserver://111.230.50.186:32012;DatabaseName=CHWS;allowMultiQueries=true -# driver-class-name: com.microsoft.sqlserver.jdbc.SQLServerDriver -# username: test -# password: minghan123456@ + url: jdbc:sqlserver://111.230.50.186:32012;DatabaseName=CHWS;allowMultiQueries=true + driver-class-name: com.microsoft.sqlserver.jdbc.SQLServerDriver + username: test + password: minghan123456@ # #华厦云服务器 # url: jdbc:sqlserver://127.0.0.1:1433;DatabaseName=CHWS;allowMultiQueries=true # driver-class-name: com.microsoft.sqlserver.jdbc.SQLServerDriver @@ -29,10 +29,10 @@ spring: # username: chws_gsh # password: Mhtech@803 #广商服务器 - url: jdbc:sqlserver://175.178.153.91:8033;DatabaseName=chws_gsh;allowMultiQueries=true - driver-class-name: com.microsoft.sqlserver.jdbc.SQLServerDriver - username: chws_gsh - password: Mhtech@803gsh +# url: jdbc:sqlserver://175.178.153.91:8033;DatabaseName=chws_gsh;allowMultiQueries=true +# driver-class-name: com.microsoft.sqlserver.jdbc.SQLServerDriver +# username: chws_gsh +# password: Mhtech@803gsh #本机 # url: jdbc:sqlserver://127.0.0.1:9956;DatabaseName=CHWS;allowMultiQueries=true # driver-class-name: com.microsoft.sqlserver.jdbc.SQLServerDriver @@ -64,7 +64,7 @@ spring: # username: chws_jm # password: Mhtech@803 -# # 华软江门 +# # 珠海北师大 # url: jdbc:sqlserver://127.0.0.1:8033;DatabaseName=chws_bsdz;allowMultiQueries=true # driver-class-name: com.microsoft.sqlserver.jdbc.SQLServerDriver # username: chws_bsdz