Compare commits

..

No commits in common. '62c8a6fc87a9ece2a4b7553bc166d989889c931a' and 'ada1d2ec2fd70f153cb49074c73542f4f8068931' have entirely different histories.

  1. 19
      pom.xml
  2. 122
      user-service/src/main/java/com/mh/user/config/MHConfig.java
  3. 62
      user-service/src/main/java/com/mh/user/config/mqtt/InboundMessageRouter.java
  4. 99
      user-service/src/main/java/com/mh/user/config/mqtt/MqttConfig.java
  5. 91
      user-service/src/main/java/com/mh/user/config/mqtt/MqttInboundConfig.java
  6. 55
      user-service/src/main/java/com/mh/user/config/mqtt/MqttMessageChannel.java
  7. 51
      user-service/src/main/java/com/mh/user/config/mqtt/MqttOutboundConfig.java
  8. 59
      user-service/src/main/java/com/mh/user/constants/ChannelName.java
  9. 36
      user-service/src/main/java/com/mh/user/constants/CommonTopicResponse.java
  10. 108
      user-service/src/main/java/com/mh/user/constants/MqttClientOptions.java
  11. 33
      user-service/src/main/java/com/mh/user/constants/MqttProtocolEnum.java
  12. 16
      user-service/src/main/java/com/mh/user/constants/MqttUseEnum.java
  13. 20
      user-service/src/main/java/com/mh/user/constants/ServiceReply.java
  14. 31
      user-service/src/main/java/com/mh/user/constants/TopicConst.java
  15. 60
      user-service/src/main/java/com/mh/user/constants/TopicEnum.java
  16. 48
      user-service/src/main/java/com/mh/user/entity/MqttSubscriptionEntity.java
  17. 35
      user-service/src/main/java/com/mh/user/job/CollectionLoopRunner.java
  18. 32
      user-service/src/main/java/com/mh/user/mapper/MqttSubscriptionMapper.java
  19. 30
      user-service/src/main/java/com/mh/user/model/SanShiFengDatas.java
  20. 29
      user-service/src/main/java/com/mh/user/model/SanShiFengReceiver.java
  21. 26
      user-service/src/main/java/com/mh/user/service/MqttSubscriptionService.java
  22. 24
      user-service/src/main/java/com/mh/user/service/impl/DeviceControlServiceImpl.java
  23. 61
      user-service/src/main/java/com/mh/user/service/impl/MqttSubscriptionServiceImpl.java
  24. 99
      user-service/src/main/java/com/mh/user/service/mqtt/config/MqttConfig.java
  25. 91
      user-service/src/main/java/com/mh/user/service/mqtt/config/MqttInboundConfig.java
  26. 55
      user-service/src/main/java/com/mh/user/service/mqtt/config/MqttMessageChannel.java
  27. 51
      user-service/src/main/java/com/mh/user/service/mqtt/config/MqttOutboundConfig.java
  28. 62
      user-service/src/main/java/com/mh/user/service/mqtt/handler/InboundMessageRouter.java
  29. 49
      user-service/src/main/java/com/mh/user/service/mqtt/service/IEventsService.java
  30. 36
      user-service/src/main/java/com/mh/user/service/mqtt/service/IMqttGatewayService.java
  31. 96
      user-service/src/main/java/com/mh/user/service/mqtt/service/IMqttMsgSenderService.java
  32. 40
      user-service/src/main/java/com/mh/user/service/mqtt/service/IMqttTopicService.java
  33. 75
      user-service/src/main/java/com/mh/user/service/mqtt/service/impl/EventsServiceImpl.java
  34. 54
      user-service/src/main/java/com/mh/user/service/mqtt/service/impl/MqttGatewayServiceImpl.java
  35. 142
      user-service/src/main/java/com/mh/user/service/mqtt/service/impl/MqttMsgSenderServiceImpl.java
  36. 40
      user-service/src/main/java/com/mh/user/service/mqtt/service/impl/MqttTopicServiceImpl.java
  37. 33
      user-service/src/main/java/com/mh/user/strategy/BackTempControlStrategy.java
  38. 16
      user-service/src/main/java/com/mh/user/strategy/DeviceStrategy.java
  39. 32
      user-service/src/main/java/com/mh/user/strategy/EleMeterStrategy.java
  40. 6
      user-service/src/main/java/com/mh/user/strategy/HeatPumpStatusStrategy.java
  41. 6
      user-service/src/main/java/com/mh/user/strategy/HeatPumpStrategy.java
  42. 5
      user-service/src/main/java/com/mh/user/strategy/MultiControlStrategy.java
  43. 40
      user-service/src/main/java/com/mh/user/strategy/PressureTransStrategy.java
  44. 6
      user-service/src/main/java/com/mh/user/strategy/StatusCheckStrategy.java
  45. 27
      user-service/src/main/java/com/mh/user/strategy/TempControlStrategy.java
  46. 8
      user-service/src/main/java/com/mh/user/strategy/TempTransStrategy.java
  47. 19
      user-service/src/main/java/com/mh/user/strategy/TimeControlStrategy.java
  48. 5
      user-service/src/main/java/com/mh/user/strategy/WaterLevelSwitchStrategy.java
  49. 36
      user-service/src/main/java/com/mh/user/strategy/WtMeterStrategy.java
  50. 2
      user-service/src/main/java/com/mh/user/utils/GetReadOrder485.java
  51. 23
      user-service/src/main/resources/application-dev.yml
  52. 18
      user-service/src/main/resources/application-prod.yml

19
pom.xml

@ -46,25 +46,6 @@
<version>1.0.1.RELEASE</version>
</dependency>
<!-- spring-messaging -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-messaging</artifactId>
<version>6.2.1</version>
</dependency>
<!-- spring-integration-mqtt -->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>6.3.4</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
<version>3.4.2</version>
</dependency>
</dependencies>
<dependencyManagement>

122
user-service/src/main/java/com/mh/user/config/MHConfig.java

@ -1,122 +0,0 @@
package com.mh.user.config;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
/**
* 读取项目相关配置
*
* @author mh
*/
@Component
@ConfigurationProperties(prefix = "mh")
public class MHConfig
{
/** 项目名称 */
private String name;
/** 版本 */
private String version;
/** 版权年份 */
private String copyrightYear;
/** 上传路径 */
private static String profile;
/** 获取地址开关 */
private static boolean addressEnabled;
/** 验证码类型 */
private static String captchaType;
public String getName()
{
return name;
}
public void setName(String name)
{
this.name = name;
}
public String getVersion()
{
return version;
}
public void setVersion(String version)
{
this.version = version;
}
public String getCopyrightYear()
{
return copyrightYear;
}
public void setCopyrightYear(String copyrightYear)
{
this.copyrightYear = copyrightYear;
}
public static String getProfile()
{
return profile;
}
public void setProfile(String profile)
{
MHConfig.profile = profile;
}
public static boolean isAddressEnabled()
{
return addressEnabled;
}
public void setAddressEnabled(boolean addressEnabled)
{
MHConfig.addressEnabled = addressEnabled;
}
public static String getCaptchaType() {
return captchaType;
}
public void setCaptchaType(String captchaType) {
MHConfig.captchaType = captchaType;
}
/**
* 获取导入上传路径
*/
public static String getImportPath()
{
return getProfile() + "/import";
}
/**
* 获取头像上传路径
*/
public static String getAvatarPath()
{
return getProfile() + "/avatar";
}
/**
* 获取下载路径
*/
public static String getDownloadPath()
{
return getProfile() + "/download/";
}
/**
* 获取上传路径
*/
public static String getUploadPath()
{
return getProfile() + "/upload";
}
}

62
user-service/src/main/java/com/mh/user/config/mqtt/InboundMessageRouter.java

@ -1,62 +0,0 @@
package com.mh.user.config.mqtt;
import com.mh.user.config.MHConfig;
import com.mh.user.constants.ChannelName;
import com.mh.user.constants.TopicEnum;
import com.mh.user.utils.SpringBeanUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.annotation.Router;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.integration.router.AbstractMessageRouter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
import org.springframework.stereotype.Component;
import java.util.Collection;
import java.util.Collections;
import java.util.Objects;
/**
* @author LJF
* @version 1.0
* @project springboot-mqtt-demo
* @description 入站消息路由分发中心
* @date 2024-10-29 17:04:17
*/
@Slf4j
@Component
public class InboundMessageRouter extends AbstractMessageRouter {
/** 系统基础配置 */
@Autowired
private MHConfig mHConfig;
/**
* 目前只需要这个方式后期在拓展使用IntegrationFlow方式
* @param message
* @return
*/
@Router(inputChannel = ChannelName.INBOUND)
@Override
protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
MessageHeaders headers = message.getHeaders();
String topic = Objects.requireNonNull(headers.get(MqttHeaders.RECEIVED_TOPIC)).toString();
// byte[] payload = (byte[]) message.getPayload();
// log.info("从当前主题 topic: {}, 接收到的消息:{}", topic, new String(payload));
// 判断当前主题是否是当前项目的,温湿度目前写死的
if (!topic.startsWith(mHConfig.getName()) && !topic.contains("/temp")) {
log.info("当前主题 topic: {} 不是当前项目的,直接丢弃", topic);
return Collections.singleton((MessageChannel) SpringBeanUtil.getBean(ChannelName.DEFAULT_BOUND));
}
// 找到对应的主题消息通道
if (topic.contains("/temp")) {
return Collections.singleton((MessageChannel) SpringBeanUtil.getBean(ChannelName.EVENTS_UPLOAD_INBOUND));
} else {
TopicEnum topicEnum = TopicEnum.find(mHConfig.getName() + "/", topic);
MessageChannel bean = (MessageChannel) SpringBeanUtil.getBean(topicEnum.getBeanName());
return Collections.singleton(bean);
}
}
}

99
user-service/src/main/java/com/mh/user/config/mqtt/MqttConfig.java

@ -1,99 +0,0 @@
package com.mh.user.config.mqtt;
import com.mh.user.constants.MqttClientOptions;
import com.mh.user.constants.MqttProtocolEnum;
import com.mh.user.constants.MqttUseEnum;
import lombok.Data;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.util.StringUtils;
import java.util.Map;
/**
* @author LJF
* @version 1.0
* @project springboot-mqtt-demo
* @description mqtt连接配置
* @date 2024-10-29 14:44:51
*/
@Configuration
@Data
@ConfigurationProperties
public class MqttConfig {
private static Map<MqttUseEnum, MqttClientOptions> mqttSpring;
public void setMqttSpring(Map<MqttUseEnum, MqttClientOptions> mqtt) {
MqttConfig.mqttSpring = mqtt;
}
/**
* 获取mqtt基本配置
* @return
*/
static MqttClientOptions getBasicMqttClientOptions() {
if (!mqttSpring.containsKey(MqttUseEnum.BASIC)) {
throw new Error("请先配置MQTT的基本连接参数,否则无法启动项目");
}
return mqttSpring.get(MqttUseEnum.BASIC);
}
/**
* 拼接获取对应mqtt的连接地址
* @param options
* @return
*/
public static String getMqttAddress(MqttClientOptions options) {
StringBuilder addr = new StringBuilder();
addr.append(options.getProtocol().getProtocolAddr())
.append(options.getHost().trim())
.append(":")
.append(options.getPort());
if ((options.getProtocol() == MqttProtocolEnum.WS || options.getProtocol() == MqttProtocolEnum.WSS)
&& StringUtils.hasText(options.getPath())) {
addr.append(options.getPath());
}
return addr.toString();
}
public static String getBasicMqttAddress() {
return getMqttAddress(getBasicMqttClientOptions());
}
/**
* 获取连接参数注入到spring中
* @return
*/
@Bean
public MqttConnectOptions mqttConnectionOptions() {
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
MqttClientOptions customizeOptions = getBasicMqttClientOptions();
String basicMqttAddress = getBasicMqttAddress();
mqttConnectOptions.setServerURIs(new String[]{basicMqttAddress});
mqttConnectOptions.setUserName(StringUtils.hasText(customizeOptions.getUsername()) ?
customizeOptions.getUsername() : "");
mqttConnectOptions.setPassword(StringUtils.hasText(customizeOptions.getPassword()) ?
customizeOptions.getPassword().toCharArray() : new char[0]);
// 直接进行自动连接
mqttConnectOptions.setAutomaticReconnect(true);
// 时间间隔时间10s
mqttConnectOptions.setKeepAliveInterval(10);
return mqttConnectOptions;
}
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(mqttConnectionOptions());
return factory;
}
}

91
user-service/src/main/java/com/mh/user/config/mqtt/MqttInboundConfig.java

@ -1,91 +0,0 @@
package com.mh.user.config.mqtt;
import com.mh.user.constants.ChannelName;
import com.mh.user.constants.MqttClientOptions;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import javax.annotation.Resource;
/**
* @author LJF
* @version 1.0
* @project springboot-mqtt-demo
* @description 入站配置
* @date 2024-10-29 16:22:10
*/
@Slf4j
@Configuration
@IntegrationComponentScan
public class MqttInboundConfig {
@Autowired
private MqttPahoClientFactory mqttClientFactory;
@Resource(name = ChannelName.INBOUND)
private MessageChannel inboundChannel;
private String clientId;
/**
* 入站适配器
* @return
*/
@Bean(name = "adapter")
public MessageProducerSupport mqttInbound() {
MqttClientOptions options = MqttConfig.getBasicMqttClientOptions();
// 此处初始化的时候,默认订阅了配置文件中已经写好的topic
// 如果需要订阅多个,可以自己手动订阅,会写一个addTopic()进行添加订阅
clientId = options.getClientId() + "_consumer_" + System.currentTimeMillis();
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
clientId,
mqttClientFactory,
options.getInboundTopic().split(","));
// System.out.println("每一次都会入站适配器吗?"+clientId);
DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();
// 统一是字节处理
converter.setPayloadAsBytes(true);
// 设置消息转换器
adapter.setConverter(converter);
// 设置qos(quality of service)
// 0:最多一次传输(消息会丢失),
// 1:至少一次传输(消息会重复),
// 2:只有当消息发送成功时才确认(消息不回丢,但延迟高)。
adapter.setQos(0);
// 设置在接收已经订阅的主题信息后,发送给哪个通道,具体的发送方法需要翻上层的抽象类
adapter.setOutputChannel(inboundChannel);
return adapter;
}
/**
* 默认声明一个消息处理器用于处理无效的消息
* @return
*/
@Bean
@ServiceActivator(inputChannel = ChannelName.DEFAULT_BOUND)
public MessageHandler handler() {
return message -> {
log.info("The default channel does not handle messages." +
"\nTopic: {}" +
"\nPayload: {}",
message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC),
message.getPayload());
};
}
public String getClientId() {
return clientId;
}
}

55
user-service/src/main/java/com/mh/user/config/mqtt/MqttMessageChannel.java

@ -1,55 +0,0 @@
package com.mh.user.config.mqtt;
import com.mh.user.constants.ChannelName;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.messaging.MessageChannel;
/**
* @author LJF
* @version 1.0
* @project springboot-mqtt-demo
* @description 声明所有通道的定义类
* @date 2024-10-29 16:23:32
*/
@Slf4j
@Configuration
public class MqttMessageChannel {
@Bean(name = ChannelName.OUTBOUND)
public MessageChannel outboundChannel() {
return new DirectChannel();
}
@Bean(name = ChannelName.INBOUND)
public MessageChannel inboundChannel() {
return new DirectChannel();
}
/**
* 事件主动上报通道
* @return
*/
@Bean(name = ChannelName.EVENTS_UPLOAD_INBOUND)
public MessageChannel eventsUploadInbound() {
return new DirectChannel();
}
@Bean(name = ChannelName.EVENTS_COLLECTION_INBOUND)
public MessageChannel eventsCollectionInbound() {
return new DirectChannel();
}
@Bean(name = ChannelName.EVENTS_CONTROL_INBOUND)
public MessageChannel eventsControlInbound() {
return new DirectChannel();
}
@Bean(name = ChannelName.EVENTS_SEND_INBOUND)
public MessageChannel eventsSendInbound() {
return new DirectChannel();
}
}

51
user-service/src/main/java/com/mh/user/config/mqtt/MqttOutboundConfig.java

@ -1,51 +0,0 @@
package com.mh.user.config.mqtt;
import com.mh.user.constants.ChannelName;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageHandler;
/**
* @author LJF
* @version 1.0
* @project springboot-mqtt-demo
* @description 入站配置
* @date 2024-10-29 16:22:10
*/
@Slf4j
@Configuration
@IntegrationComponentScan
public class MqttOutboundConfig {
@Autowired
private MqttPahoClientFactory mqttClientFactory;
/**
* 默认声明一个出站处理器用于处理无效的消息
* @return
*/
@Bean
@ServiceActivator(inputChannel = ChannelName.OUTBOUND)
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
MqttConfig.getBasicMqttClientOptions().getClientId() + "_producer_" + System.currentTimeMillis(),
mqttClientFactory);
DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();
// use byte types uniformly
converter.setPayloadAsBytes(true);
messageHandler.setAsync(true);
messageHandler.setDefaultQos(0);
messageHandler.setConverter(converter);
return messageHandler;
}
}

59
user-service/src/main/java/com/mh/user/constants/ChannelName.java

@ -1,59 +0,0 @@
package com.mh.user.constants;
/**
* @author LJF
* @version 1.0
* @project springboot-mqtt-demo
* @description 声明所有通道
* @date 2024-10-29 16:04:19
*/
public class ChannelName {
/**
* 默认通道名称防止出错
*/
public static final String DEFAULT_BOUND = "default_bound";
/**
* 主动上报入站
*/
public static final String INBOUND = "inbound";
/**
* 出站
*/
public static final String OUTBOUND = "outbound";
/**
* 入站主动上报
*/
public static final String EVENTS_UPLOAD_INBOUND = "events_upload_inbound";
/**
* 入站主动采集
*/
public static final String EVENTS_COLLECTION_INBOUND = "events_collection_inbound";
/**
* 入站主动控制
*/
public static final String EVENTS_CONTROL_INBOUND = "events_control_inbound";
/**
* 默认进站处理
*/
public static final String EVENTS_DEFAULT_INBOUND = "events_default_inbound";
public static final String REPLY_EVENTS_OUTBOUND = "reply_events_outbound";
/**
* 新珠江收到的信息
*/
public static final String EVENTS_RECEIVE_INBOUND = "events_receive_inbound";
/**
* 接收服务端的数据报文
*/
public static final String EVENTS_SEND_INBOUND = "events_send_inbound";
}

36
user-service/src/main/java/com/mh/user/constants/CommonTopicResponse.java

@ -1,36 +0,0 @@
package com.mh.user.constants;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* Unified Topic response format
*
* @author sean.zhou
* @version 0.1
* @date 2021/11/15
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
@JsonIgnoreProperties(ignoreUnknown = true)
public class CommonTopicResponse<T> {
/**
* The command is sent and the response is matched by the tid and bid fields in the message,
* and the reply should keep the tid and bid the same.
*/
private String tid;
private String bid;
private String method;
private T data;
private Long timestamp;
}

108
user-service/src/main/java/com/mh/user/constants/MqttClientOptions.java

@ -1,108 +0,0 @@
package com.mh.user.constants;
/**
* @author LJF
* @version 1.0
* @project springboot-mqtt-demo
* @description mqtt连接的参数
* @date 2024-10-29 14:46:24
*/
public class MqttClientOptions {
private MqttProtocolEnum protocol;
private String host;
private Integer port;
private String username;
private String password;
private String clientId;
private String path;
/**
* 客户端连接的时候订阅的主题
*/
private String inboundTopic;
public MqttProtocolEnum getProtocol() {
return protocol;
}
public void setProtocol(MqttProtocolEnum protocol) {
this.protocol = protocol;
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public Integer getPort() {
return port;
}
public void setPort(Integer port) {
this.port = port;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public String getClientId() {
return clientId;
}
public void setClientId(String clientId) {
this.clientId = clientId;
}
public String getPath() {
return path;
}
public void setPath(String path) {
this.path = path;
}
public String getInboundTopic() {
return inboundTopic;
}
public void setInboundTopic(String inboundTopic) {
this.inboundTopic = inboundTopic;
}
@Override
public String toString() {
return "MqttClientOptions{" +
"protocol=" + protocol +
", host='" + host + '\'' +
", port=" + port +
", username='" + username + '\'' +
", password='" + password + '\'' +
", clientId='" + clientId + '\'' +
", path='" + path + '\'' +
", inboundTopic='" + inboundTopic + '\'' +
'}';
}
}

33
user-service/src/main/java/com/mh/user/constants/MqttProtocolEnum.java

@ -1,33 +0,0 @@
package com.mh.user.constants;
/**
* @author LJF
* @version 1.0
* @project springboot-mqtt-demo
* @description 采用哪种协议进行数据交互
* @date 2024-10-29 15:21:07
*/
public enum MqttProtocolEnum {
MQTT("tcp"),
MQTTS("tcp"),
WS("ws"),
WSS("wss");
final String protocol;
MqttProtocolEnum(String protocol) {
this.protocol = protocol;
}
public String getProtocolAddr() {
return protocol + "://";
}
public String getProtocol() {
return protocol;
}
}

16
user-service/src/main/java/com/mh/user/constants/MqttUseEnum.java

@ -1,16 +0,0 @@
package com.mh.user.constants;
/**
* @author LJF
* @version 1.0
* @project springboot-mqtt-demo
* @description mqtt选择模式
* @date 2024-10-29 15:19:21
*/
public enum MqttUseEnum {
BASIC,
DRC
}

20
user-service/src/main/java/com/mh/user/constants/ServiceReply.java

@ -1,20 +0,0 @@
package com.mh.user.constants;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.Data;
/**
* @author sean.zhou
* @version 0.1
* @date 2021/11/22
*/
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public class ServiceReply<T> {
private Integer result;
private T info;
private T output;
}

31
user-service/src/main/java/com/mh/user/constants/TopicConst.java

@ -1,31 +0,0 @@
package com.mh.user.constants;
/**
* All the topics that need to be used in the project.
*
* @author ljf
* @version 0.1
* @date 2025-01-22
*/
public class TopicConst {
public static final String MH_UPLOAD = "mh_upload/";
public static final String EVENTS_UPLOAD = "events_upload/";
public static final String MH_COLLECTION = "mh_collection/";
public static final String EVENTS_COLLECTION = "events_collection/";
public static final String MH_CONTROL = "mh_control/";
public static final String EVENTS_CONTROL = "events_control/";
public static final String REGEX_SN = "[A-Za-z0-9]+";
public static final String THING_MODEL_PRE = "thing/";
public static final String PRODUCT = "product/";
public static final String SERVICES_SUF = "/services";
}

60
user-service/src/main/java/com/mh/user/constants/TopicEnum.java

@ -1,60 +0,0 @@
package com.mh.user.constants;
import java.util.Arrays;
import java.util.regex.Pattern;
import static com.mh.user.constants.TopicConst.*;
/**
* @author ljf
* @version 1.0
* @description: TODO
* @date 2024/11/06 14:28
*/
public enum TopicEnum {
/**
* 冷水机组客户端主动上报数据
*/
CLIENT_UPLOAD_DATA(Pattern.compile("^" + MH_UPLOAD + EVENTS_UPLOAD + REGEX_SN + "$"), ChannelName.EVENTS_UPLOAD_INBOUND),
/**
* 服务端采集数据
*/
SERVER_COLLECTION_DATA(Pattern.compile("^" + MH_COLLECTION + EVENTS_COLLECTION + REGEX_SN + "$"), ChannelName.EVENTS_COLLECTION_INBOUND),
/**
* 服务端控制指令
*/
SERVER_CONTROL_DATA(Pattern.compile("^" + MH_CONTROL + EVENTS_CONTROL + REGEX_SN + "$"), ChannelName.EVENTS_CONTROL_INBOUND),
/**
* 订阅服务端发送的主题命令
*/
SERVER_SEND_DATA(Pattern.compile("^A/cmd/ctl/send" + "$"), ChannelName.EVENTS_SEND_INBOUND),
UNKNOWN(Pattern.compile("^.*$"), ChannelName.DEFAULT_BOUND);
final Pattern pattern;
final String beanName;
TopicEnum(Pattern pattern, String beanName) {
this.pattern = pattern;
this.beanName = beanName;
}
public Pattern getPattern() {
return pattern;
}
public String getBeanName() {
return beanName;
}
public static TopicEnum find(String proName, String topic) {
// 去掉第一个"/"以及之前数据
String finalTopic = topic.replaceFirst("^"+proName, "");;
return Arrays.stream(TopicEnum.values()).filter(topicEnum -> topicEnum.pattern.matcher(finalTopic).matches()).findAny().orElse(UNKNOWN);
}
}

48
user-service/src/main/java/com/mh/user/entity/MqttSubscriptionEntity.java

@ -1,48 +0,0 @@
package com.mh.user.entity;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import lombok.Data;
import java.util.Date;
import java.util.Map;
/**
* @author LJF
* @version 1.0
* @project EEMCS
* @description mqtt订阅管理
* @date 2025-02-14 13:47:07
*/
@Data
public class MqttSubscriptionEntity {
private Long id;
private String topic;
private Short qos;
private String clientId;
private String status;
/** 创建者 */
private String createBy;
/** 创建时间 */
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private Date createTime;
/** 更新者 */
private String updateBy;
/** 更新时间 */
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private Date updateTime;
/** 备注 */
private String remark;
}

35
user-service/src/main/java/com/mh/user/job/CollectionLoopRunner.java

@ -1,17 +1,13 @@
package com.mh.user.job;
import com.mh.common.utils.StringUtils;
import com.mh.user.constants.Constant;
import com.mh.user.entity.AddCronJobReq;
import com.mh.user.entity.MqttSubscriptionEntity;
import com.mh.user.manage.QuartzManager;
import com.mh.user.netty.NettyEchoServer;
import com.mh.user.serialport.SerialPortListener;
import com.mh.user.serialport.SerialPortUtil;
import com.mh.user.serialport.SerialTool;
import com.mh.user.service.DeviceCodeParamService;
import com.mh.user.service.MqttSubscriptionService;
import com.mh.user.service.mqtt.service.IMqttTopicService;
import com.mh.user.utils.CacheUtil;
import com.mh.user.utils.ExchangeStringUtil;
import com.mh.user.utils.GetReadOrder485;
@ -23,7 +19,6 @@ import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
@ -47,12 +42,6 @@ public class CollectionLoopRunner implements ApplicationRunner {
@Resource
private GetWeatherInfoJob getWeatherInfoJob;
@Resource
private MqttSubscriptionService iMqttSubscriptionService;
@Resource
private IMqttTopicService iMqttTopicService;
@Override
public void run(ApplicationArguments args) throws Exception {
// collectionMeterAndCloud();//采集
@ -64,28 +53,8 @@ public class CollectionLoopRunner implements ApplicationRunner {
// 获取天气数据
getWeatherInfoJob.getWeatherInfo();
// 启动netty端口
// NettyEchoServer nettyEchoServer = new NettyEchoServer();
// nettyEchoServer.bind(8098);
// 初始化mqtt订阅记录
initializeMqttSubscription();
}
/**
* 初始化mqtt订阅记录
*/
private void initializeMqttSubscription() {
MqttSubscriptionEntity mqttSubscription = new MqttSubscriptionEntity();
mqttSubscription.setStatus("0");
List<MqttSubscriptionEntity> mqttSubscriptions = iMqttSubscriptionService.selectMqttSubList(mqttSubscription);
for (MqttSubscriptionEntity subscription : mqttSubscriptions) {
try {
if (!StringUtils.isBlank(subscription.getTopic())) {
iMqttTopicService.subscribe(subscription.getTopic(), subscription.getQos());
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
NettyEchoServer nettyEchoServer = new NettyEchoServer();
nettyEchoServer.bind(8098);
}
private void simulationCollection() throws Exception {

32
user-service/src/main/java/com/mh/user/mapper/MqttSubscriptionMapper.java

@ -1,32 +0,0 @@
package com.mh.user.mapper;
import com.mh.user.entity.MqttSubscriptionEntity;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
import tk.mybatis.mapper.common.BaseMapper;
import java.util.List;
/**
* @author LJF
* @version 1.0
* @project EEMCS
* @description mqtt订阅mapper类
* @date 2025-02-14 14:00:58
*/
@Mapper
public interface MqttSubscriptionMapper extends BaseMapper<MqttSubscriptionEntity> {
@Select("<script>" +
"SELECT * FROM mqtt_subscription WHERE 1=1 " +
"<if test='topic != null and topic != \"\"'>AND topic = #{topic}</if>" +
"<if test='status != null and status != \"\"'>AND status = #{status}</if>" +
"ORDER BY crate_time DESC " +
"</script>")
List<MqttSubscriptionEntity> selectListByTopic(@Param("topic") String topic,
@Param("status") String status);
@Select("SELECT top 1 * FROM mqtt_subscription WHERE id = #{id}")
MqttSubscriptionEntity selectById(@Param("id") String id);
}

30
user-service/src/main/java/com/mh/user/model/SanShiFengDatas.java

@ -1,30 +0,0 @@
package com.mh.user.model;
import lombok.Data;
/**
* @author LJF
* @version 1.0
* @project EEMCS
* @description 研华数据体
* @date 2025-01-22 14:47:25
*/
@Data
public class SanShiFengDatas<T extends Number> {
/**
* 对应研华的标签值
*/
private String tag;
/**
* 上报值
*/
private T value;
/**
* 质量值
*/
private T quality;
}

29
user-service/src/main/java/com/mh/user/model/SanShiFengReceiver.java

@ -1,29 +0,0 @@
package com.mh.user.model;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.Data;
import java.util.List;
/**
* @author LJF
* @version 1.0
* @project EEMCS
* @description 研华网关发送接收数据
* @date 2025-01-22 14:43:15
*/
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public class SanShiFengReceiver<T> {
/**
* 数据集合
*/
private List<T> d;
/**
* 主动上报数据时间带T类型
*/
private String ts;
}

26
user-service/src/main/java/com/mh/user/service/MqttSubscriptionService.java

@ -1,26 +0,0 @@
package com.mh.user.service;
import com.mh.user.entity.MqttSubscriptionEntity;
import java.util.List;
/**
* @author LJF
* @version 1.0
* @project EEMCS
* @description mqtt订阅管理
* @date 2025-02-14 13:58:37
*/
public interface MqttSubscriptionService {
List<MqttSubscriptionEntity> selectMqttSubList(MqttSubscriptionEntity mqttSubscription);
MqttSubscriptionEntity selectMqttSubById(String msId);
int insertMqttSub(MqttSubscriptionEntity mqttSubscription);
int updateMqttSub(MqttSubscriptionEntity mqttSubscription);
int deleteMqttSubByIds(String[] msIds);
}

24
user-service/src/main/java/com/mh/user/service/impl/DeviceControlServiceImpl.java

@ -401,7 +401,7 @@ public class DeviceControlServiceImpl implements DeviceControlService {
if (Constant.WRITE.equals(type)) {
deviceCodeParam.setFunCode("06");
String time1 = split[1].substring(0, 2);
String time2 = split[1].substring(3, 5);
String time2 = split[1].substring(2, 4);
deviceCodeParam.setDataValue(String.valueOf(Integer.parseInt(time1)*60 + Integer.parseInt(time2)));
serialPortModel.setDataValue(String.valueOf(Integer.parseInt(time1)*60 + Integer.parseInt(time2)));
}
@ -409,11 +409,11 @@ public class DeviceControlServiceImpl implements DeviceControlService {
case "timeSetClose1":
// 关时间设置\读取1
registerStr = ExchangeStringUtil.addZeroForNum(ExchangeStringUtil.decToHex(String.valueOf(31 + (scene - 1) * 18)), 4);
deviceCodeParam.setRegisterSize(1);
deviceCodeParam.setRegisterSize(2);
if (Constant.WRITE.equals(type)) {
deviceCodeParam.setFunCode("06");
String time1 = split[1].substring(0, 2);
String time2 = split[1].substring(3, 5);
String time2 = split[1].substring(2, 4);
deviceCodeParam.setDataValue(String.valueOf(Integer.parseInt(time1)*60 + Integer.parseInt(time2)));
serialPortModel.setDataValue(String.valueOf(Integer.parseInt(time1)*60 + Integer.parseInt(time2)));
}
@ -425,7 +425,7 @@ public class DeviceControlServiceImpl implements DeviceControlService {
if (Constant.WRITE.equals(type)) {
deviceCodeParam.setFunCode("06");
String time1 = split[1].substring(0, 2);
String time2 = split[1].substring(3, 5);
String time2 = split[1].substring(2, 4);
deviceCodeParam.setDataValue(String.valueOf(Integer.parseInt(time1)*60 + Integer.parseInt(time2)));
serialPortModel.setDataValue(String.valueOf(Integer.parseInt(time1)*60 + Integer.parseInt(time2)));
}
@ -433,11 +433,11 @@ public class DeviceControlServiceImpl implements DeviceControlService {
case "timeSetClose2":
// 关时间设置\读取2
registerStr = ExchangeStringUtil.addZeroForNum(ExchangeStringUtil.decToHex(String.valueOf(33 + (scene - 1) * 18)), 4);
deviceCodeParam.setRegisterSize(1);
deviceCodeParam.setRegisterSize(2);
if (Constant.WRITE.equals(type)) {
deviceCodeParam.setFunCode("06");
String time1 = split[1].substring(0, 2);
String time2 = split[1].substring(3, 5);
String time2 = split[1].substring(2, 4);
deviceCodeParam.setDataValue(String.valueOf(Integer.parseInt(time1)*60 + Integer.parseInt(time2)));
serialPortModel.setDataValue(String.valueOf(Integer.parseInt(time1)*60 + Integer.parseInt(time2)));
}
@ -449,7 +449,7 @@ public class DeviceControlServiceImpl implements DeviceControlService {
if (Constant.WRITE.equals(type)) {
deviceCodeParam.setFunCode("06");
String time1 = split[1].substring(0, 2);
String time2 = split[1].substring(3, 5);
String time2 = split[1].substring(2, 4);
deviceCodeParam.setDataValue(String.valueOf(Integer.parseInt(time1)*60 + Integer.parseInt(time2)));
serialPortModel.setDataValue(String.valueOf(Integer.parseInt(time1)*60 + Integer.parseInt(time2)));
}
@ -457,11 +457,11 @@ public class DeviceControlServiceImpl implements DeviceControlService {
case "timeSetClose3":
// 关时间设置\读取3
registerStr = ExchangeStringUtil.addZeroForNum(ExchangeStringUtil.decToHex(String.valueOf(35 + (scene - 1) * 18)), 4);
deviceCodeParam.setRegisterSize(1);
deviceCodeParam.setRegisterSize(2);
if (Constant.WRITE.equals(type)) {
deviceCodeParam.setFunCode("06");
String time1 = split[1].substring(0, 2);
String time2 = split[1].substring(3, 5);
String time2 = split[1].substring(2, 4);
deviceCodeParam.setDataValue(String.valueOf(Integer.parseInt(time1)*60 + Integer.parseInt(time2)));
serialPortModel.setDataValue(String.valueOf(Integer.parseInt(time1)*60 + Integer.parseInt(time2)));
}
@ -473,7 +473,7 @@ public class DeviceControlServiceImpl implements DeviceControlService {
if (Constant.WRITE.equals(type)) {
deviceCodeParam.setFunCode("06");
String time1 = split[1].substring(0, 2);
String time2 = split[1].substring(3, 5);
String time2 = split[1].substring(2, 4);
deviceCodeParam.setDataValue(String.valueOf(Integer.parseInt(time1)*60 + Integer.parseInt(time2)));
serialPortModel.setDataValue(String.valueOf(Integer.parseInt(time1)*60 + Integer.parseInt(time2)));
}
@ -481,11 +481,11 @@ public class DeviceControlServiceImpl implements DeviceControlService {
case "timeSetClose4":
// 关时间设置\读取4
registerStr = ExchangeStringUtil.addZeroForNum(ExchangeStringUtil.decToHex(String.valueOf(37 + (scene - 1) * 18)), 4);
deviceCodeParam.setRegisterSize(1);
deviceCodeParam.setRegisterSize(2);
if (Constant.WRITE.equals(type)) {
deviceCodeParam.setFunCode("06");
String time1 = split[1].substring(0, 2);
String time2 = split[1].substring(3, 5);
String time2 = split[1].substring(2, 4);
deviceCodeParam.setDataValue(String.valueOf(Integer.parseInt(time1)*60 + Integer.parseInt(time2)));
serialPortModel.setDataValue(String.valueOf(Integer.parseInt(time1)*60 + Integer.parseInt(time2)));
}

61
user-service/src/main/java/com/mh/user/service/impl/MqttSubscriptionServiceImpl.java

@ -1,61 +0,0 @@
package com.mh.user.service.impl;
import com.mh.user.entity.MqttSubscriptionEntity;
import com.mh.user.mapper.MqttSubscriptionMapper;
import com.mh.user.service.MqttSubscriptionService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
/**
* @author LJF
* @version 1.0
* @project EEMCS
* @description mqtt订阅实现类
* @date 2025-02-14 13:59:27
*/
@Service
public class MqttSubscriptionServiceImpl implements MqttSubscriptionService {
private final MqttSubscriptionMapper mqttSubscriptionMapper;
@Autowired
public MqttSubscriptionServiceImpl(MqttSubscriptionMapper mqttSubscriptionMapper) {
this.mqttSubscriptionMapper = mqttSubscriptionMapper;
}
@Override
public List<MqttSubscriptionEntity> selectMqttSubList(MqttSubscriptionEntity mqttSubscription) {
if (mqttSubscription == null) {
return null;
}
return mqttSubscriptionMapper.selectListByTopic(mqttSubscription.getTopic(), mqttSubscription.getStatus());
}
@Override
public MqttSubscriptionEntity selectMqttSubById(String msId) {
return mqttSubscriptionMapper.selectById(msId);
}
@Override
public int insertMqttSub(MqttSubscriptionEntity mqttSubscription) {
return mqttSubscriptionMapper.insert(mqttSubscription);
}
@Override
public int updateMqttSub(MqttSubscriptionEntity mqttSubscription) {
return mqttSubscriptionMapper.updateByPrimaryKey(mqttSubscription);
}
@Override
public int deleteMqttSubByIds(String[] msIds) {
if (msIds != null && msIds.length > 0) {
for (String msId : msIds) {
mqttSubscriptionMapper.deleteByPrimaryKey(msId);
}
return msIds.length;
}
return 0;
}
}

99
user-service/src/main/java/com/mh/user/service/mqtt/config/MqttConfig.java

@ -1,99 +0,0 @@
package com.mh.user.service.mqtt.config;
import com.mh.user.constants.MqttClientOptions;
import com.mh.user.constants.MqttProtocolEnum;
import com.mh.user.constants.MqttUseEnum;
import lombok.Data;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.util.StringUtils;
import java.util.Map;
/**
* @author LJF
* @version 1.0
* @project springboot-mqtt-demo
* @description mqtt连接配置
* @date 2024-10-29 14:44:51
*/
@Configuration
@Data
@ConfigurationProperties
public class MqttConfig {
private static Map<MqttUseEnum, MqttClientOptions> mqttSpring;
public void setMqttSpring(Map<MqttUseEnum, MqttClientOptions> mqtt) {
MqttConfig.mqttSpring = mqtt;
}
/**
* 获取mqtt基本配置
* @return
*/
static MqttClientOptions getBasicMqttClientOptions() {
if (!mqttSpring.containsKey(MqttUseEnum.BASIC)) {
throw new Error("请先配置MQTT的基本连接参数,否则无法启动项目");
}
return mqttSpring.get(MqttUseEnum.BASIC);
}
/**
* 拼接获取对应mqtt的连接地址
* @param options
* @return
*/
public static String getMqttAddress(MqttClientOptions options) {
StringBuilder addr = new StringBuilder();
addr.append(options.getProtocol().getProtocolAddr())
.append(options.getHost().trim())
.append(":")
.append(options.getPort());
if ((options.getProtocol() == MqttProtocolEnum.WS || options.getProtocol() == MqttProtocolEnum.WSS)
&& StringUtils.hasText(options.getPath())) {
addr.append(options.getPath());
}
return addr.toString();
}
public static String getBasicMqttAddress() {
return getMqttAddress(getBasicMqttClientOptions());
}
/**
* 获取连接参数注入到spring中
* @return
*/
@Bean
public MqttConnectOptions mqttConnectionOptions() {
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
MqttClientOptions customizeOptions = getBasicMqttClientOptions();
String basicMqttAddress = getBasicMqttAddress();
mqttConnectOptions.setServerURIs(new String[]{basicMqttAddress});
mqttConnectOptions.setUserName(StringUtils.hasText(customizeOptions.getUsername()) ?
customizeOptions.getUsername() : "");
mqttConnectOptions.setPassword(StringUtils.hasText(customizeOptions.getPassword()) ?
customizeOptions.getPassword().toCharArray() : new char[0]);
// 直接进行自动连接
mqttConnectOptions.setAutomaticReconnect(true);
// 时间间隔时间10s
mqttConnectOptions.setKeepAliveInterval(10);
return mqttConnectOptions;
}
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(mqttConnectionOptions());
return factory;
}
}

91
user-service/src/main/java/com/mh/user/service/mqtt/config/MqttInboundConfig.java

@ -1,91 +0,0 @@
package com.mh.user.service.mqtt.config;
import com.mh.user.constants.ChannelName;
import com.mh.user.constants.MqttClientOptions;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import javax.annotation.Resource;
/**
* @author LJF
* @version 1.0
* @project springboot-mqtt-demo
* @description 入站配置
* @date 2024-10-29 16:22:10
*/
@Slf4j
@Configuration
@IntegrationComponentScan
public class MqttInboundConfig {
@Autowired
private MqttPahoClientFactory mqttClientFactory;
@Resource(name = ChannelName.INBOUND)
private MessageChannel inboundChannel;
private String clientId;
/**
* 入站适配器
* @return
*/
@Bean(name = "adapter")
public MessageProducerSupport mqttInbound() {
MqttClientOptions options = MqttConfig.getBasicMqttClientOptions();
// 此处初始化的时候,默认订阅了配置文件中已经写好的topic
// 如果需要订阅多个,可以自己手动订阅,会写一个addTopic()进行添加订阅
clientId = options.getClientId() + "_consumer_" + System.currentTimeMillis();
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
clientId,
mqttClientFactory,
options.getInboundTopic().split(","));
// System.out.println("每一次都会入站适配器吗?"+clientId);
DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();
// 统一是字节处理
converter.setPayloadAsBytes(true);
// 设置消息转换器
adapter.setConverter(converter);
// 设置qos(quality of service)
// 0:最多一次传输(消息会丢失),
// 1:至少一次传输(消息会重复),
// 2:只有当消息发送成功时才确认(消息不回丢,但延迟高)。
adapter.setQos(0);
// 设置在接收已经订阅的主题信息后,发送给哪个通道,具体的发送方法需要翻上层的抽象类
adapter.setOutputChannel(inboundChannel);
return adapter;
}
/**
* 默认声明一个消息处理器用于处理无效的消息
* @return
*/
@Bean
@ServiceActivator(inputChannel = ChannelName.DEFAULT_BOUND)
public MessageHandler handler() {
return message -> {
log.info("The default channel does not handle messages." +
"\nTopic: {}" +
"\nPayload: {}",
message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC),
message.getPayload());
};
}
public String getClientId() {
return clientId;
}
}

55
user-service/src/main/java/com/mh/user/service/mqtt/config/MqttMessageChannel.java

@ -1,55 +0,0 @@
package com.mh.user.service.mqtt.config;
import com.mh.user.constants.ChannelName;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.messaging.MessageChannel;
/**
* @author LJF
* @version 1.0
* @project springboot-mqtt-demo
* @description 声明所有通道的定义类
* @date 2024-10-29 16:23:32
*/
@Slf4j
@Configuration
public class MqttMessageChannel {
@Bean(name = ChannelName.OUTBOUND)
public MessageChannel outboundChannel() {
return new DirectChannel();
}
@Bean(name = ChannelName.INBOUND)
public MessageChannel inboundChannel() {
return new DirectChannel();
}
/**
* 事件主动上报通道
* @return
*/
@Bean(name = ChannelName.EVENTS_UPLOAD_INBOUND)
public MessageChannel eventsUploadInbound() {
return new DirectChannel();
}
@Bean(name = ChannelName.EVENTS_COLLECTION_INBOUND)
public MessageChannel eventsCollectionInbound() {
return new DirectChannel();
}
@Bean(name = ChannelName.EVENTS_CONTROL_INBOUND)
public MessageChannel eventsControlInbound() {
return new DirectChannel();
}
@Bean(name = ChannelName.EVENTS_SEND_INBOUND)
public MessageChannel eventsSendInbound() {
return new DirectChannel();
}
}

51
user-service/src/main/java/com/mh/user/service/mqtt/config/MqttOutboundConfig.java

@ -1,51 +0,0 @@
package com.mh.user.service.mqtt.config;
import com.mh.user.constants.ChannelName;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageHandler;
/**
* @author LJF
* @version 1.0
* @project springboot-mqtt-demo
* @description 入站配置
* @date 2024-10-29 16:22:10
*/
@Slf4j
@Configuration
@IntegrationComponentScan
public class MqttOutboundConfig {
@Autowired
private MqttPahoClientFactory mqttClientFactory;
/**
* 默认声明一个出站处理器用于处理无效的消息
* @return
*/
@Bean
@ServiceActivator(inputChannel = ChannelName.OUTBOUND)
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
MqttConfig.getBasicMqttClientOptions().getClientId() + "_producer_" + System.currentTimeMillis(),
mqttClientFactory);
DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();
// use byte types uniformly
converter.setPayloadAsBytes(true);
messageHandler.setAsync(true);
messageHandler.setDefaultQos(0);
messageHandler.setConverter(converter);
return messageHandler;
}
}

62
user-service/src/main/java/com/mh/user/service/mqtt/handler/InboundMessageRouter.java

@ -1,62 +0,0 @@
package com.mh.user.service.mqtt.handler;
import com.mh.user.config.MHConfig;
import com.mh.user.constants.ChannelName;
import com.mh.user.constants.TopicEnum;
import com.mh.user.utils.SpringContextUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.annotation.Router;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.integration.router.AbstractMessageRouter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
import org.springframework.stereotype.Component;
import java.util.Collection;
import java.util.Collections;
import java.util.Objects;
/**
* @author LJF
* @version 1.0
* @project springboot-mqtt-demo
* @description 入站消息路由分发中心
* @date 2024-10-29 17:04:17
*/
@Slf4j
@Component
public class InboundMessageRouter extends AbstractMessageRouter {
/** 系统基础配置 */
@Autowired
private MHConfig mHConfig;
/**
* 目前只需要这个方式后期在拓展使用IntegrationFlow方式
* @param message
* @return
*/
@Router(inputChannel = ChannelName.INBOUND)
@Override
protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
MessageHeaders headers = message.getHeaders();
String topic = Objects.requireNonNull(headers.get(MqttHeaders.RECEIVED_TOPIC)).toString();
// byte[] payload = (byte[]) message.getPayload();
// log.info("从当前主题 topic: {}, 接收到的消息:{}", topic, new String(payload));
// 判断当前主题是否是当前项目的,温湿度目前写死的
// if (!topic.startsWith(mHConfig.getName()) && !topic.contains("/temp")) {
// log.info("当前主题 topic: {} 不是当前项目的,直接丢弃", topic);
// return Collections.singleton((MessageChannel) SpringContextUtils.getBean(ChannelName.DEFAULT_BOUND));
// }
// 找到对应的主题消息通道
if (topic.contains("/temp")) {
return Collections.singleton((MessageChannel) SpringContextUtils.getBean(ChannelName.EVENTS_UPLOAD_INBOUND));
} else {
TopicEnum topicEnum = TopicEnum.find(mHConfig.getName() + "/", topic);
MessageChannel bean = (MessageChannel) SpringContextUtils.getBean(topicEnum.getBeanName());
return Collections.singleton(bean);
}
}
}

49
user-service/src/main/java/com/mh/user/service/mqtt/service/IEventsService.java

@ -1,49 +0,0 @@
package com.mh.user.service.mqtt.service;
import org.springframework.messaging.MessageHeaders;
import java.io.IOException;
/**
* @author LJF
* @version 1.0
* @project springboot-mqtt-demo
* @description 通道处理类
* @date 2024-11-05 11:30:26
*/
public interface IEventsService {
/**
* 处理主动上报
* @param receiver
* @param headers
* @return
*/
void handleInboundUpload(byte[] receiver, MessageHeaders headers) throws IOException;
/**
* 处理服务器主动采集上报
* @param receiver
* @param headers
* @return
*/
void handleInboundCollection(byte[] receiver, MessageHeaders headers) throws IOException;
/**
* 处理控制上报
* @param receiver
* @param headers
* @return
*/
void handleInboundControl(byte[] receiver, MessageHeaders headers) throws IOException;
/**
* 处理发送
* @param receiver
* @param headers
* @throws IOException
*/
void handleInboundSend(byte[] receiver, MessageHeaders headers) throws IOException;
}

36
user-service/src/main/java/com/mh/user/service/mqtt/service/IMqttGatewayService.java

@ -1,36 +0,0 @@
package com.mh.user.service.mqtt.service;
/**
* @author LJF
* @version 1.0
* @project springboot-mqtt
* @description 消息网关
* @date 2024-10-30 14:43:55
*/
//@Component
//@Configuration
//@MessagingGateway(defaultRequestChannel = ChannelName.OUTBOUND)
public interface IMqttGatewayService {
/**
* 发送消息
* @param topic
* @param payload
*/
void publish(String topic, String payload, int qos);
/**
* 发送消息
* @param topic
* @param payload
*/
void publish(String topic, byte[] payload);
/**
* 发送消息并带上qos
* @param topic
* @param payload
* @param qos
*/
void publish(String topic, byte[] payload, int qos);
}

96
user-service/src/main/java/com/mh/user/service/mqtt/service/IMqttMsgSenderService.java

@ -1,96 +0,0 @@
package com.mh.user.service.mqtt.service;
import com.fasterxml.jackson.core.type.TypeReference;
import com.mh.user.constants.CommonTopicResponse;
import com.mh.user.constants.ServiceReply;
/**
* @author LJF
* @version 1.0
* @project springboot-mqtt-demo
* @description Mqtt发送消息
* @date 2024-10-29 17:31:40
*/
public interface IMqttMsgSenderService {
void publish(String topic, String pushMessage);
/**
* 发布消息
* @param topic target
* @param response message
*/
void publish(String topic, CommonTopicResponse response);
/**
* 使用qos发布消息
*
* @param topic target
* @param qos qos
* @param response message
*/
void publish(String topic, int qos, CommonTopicResponse response);
/**
* 发送消息并同时接收响应
* @param clazz
* @param topic
* @param response 通知启动是否成功
* @return
*/
<T> T publishWithReply(Class<T> clazz, String topic, CommonTopicResponse response);
/**
* 发送消息并同时接收响应
* @param clazz
* @param topic
* @param response
* @param retryTime
* @param <T>
* @return
*/
<T> T publishWithReply(Class<T> clazz, String topic, CommonTopicResponse response, int retryTime);
/**
* 专门用于发送服务消息
* @param clazz The generic class for ServiceReply.
* @param sn
* @param method
* @param data
* @param bid
* @param <T>
* @return
*/
<T> ServiceReply<T> publishServicesTopic(TypeReference<T> clazz, String sn, String method, Object data, String bid);
/**
* 仅用于为服务发送消息不设置接收到的子类型
* @param sn
* @param method
* @param data
* @param bid
* @return
*/
ServiceReply publishServicesTopic(String sn, String method, Object data, String bid);
/**
* 专门用于发送服务消息
* @param clazz The generic class for ServiceReply.
* @param sn
* @param method
* @param data
* @param <T>
* @return
*/
<T> ServiceReply<T> publishServicesTopic(TypeReference<T> clazz, String sn, String method, Object data);
/**
* 仅用于为服务发送消息不设置接收到的子类型
* @param sn
* @param method
* @param data
* @return
*/
ServiceReply publishServicesTopic(String sn, String method, Object data);
}

40
user-service/src/main/java/com/mh/user/service/mqtt/service/IMqttTopicService.java

@ -1,40 +0,0 @@
package com.mh.user.service.mqtt.service;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
/**
* @author LJF
* @version 1.0
* @project springboot-mqtt-demo
* @description mqtt订阅主题
* @date 2024-10-29 17:29:52
*/
public interface IMqttTopicService {
/**
* 订阅主题
* @param topic
*/
void subscribe(@Header(MqttHeaders.TOPIC) String topic);
/**
* 订阅主题并设置qos
* @param topic
* @param qos
*/
void subscribe(@Header(MqttHeaders.TOPIC) String topic, int qos);
/**
* 解绑主题
* @param topic
*/
void unsubscribe(@Header(MqttHeaders.TOPIC) String topic);
/**
* 获取已订阅的主题
* @return
*/
String[] getSubscribedTopics();
}

75
user-service/src/main/java/com/mh/user/service/mqtt/service/impl/EventsServiceImpl.java

@ -1,75 +0,0 @@
package com.mh.user.service.mqtt.service.impl;
import com.alibaba.fastjson2.JSONObject;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.mh.user.constants.ChannelName;
import com.mh.user.model.SanShiFengReceiver;
import com.mh.user.service.mqtt.service.IEventsService;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.MessageHeaders;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.Objects;
/**
* @author LJF
* @version 1.0
* @project springboot-mqtt-demo
* @description 通道消息处理
* @date 2024-11-05 11:42:30
*/
@Slf4j
@Service
public class EventsServiceImpl implements IEventsService {
@Autowired
private ObjectMapper mapper;
@ServiceActivator(inputChannel = ChannelName.EVENTS_UPLOAD_INBOUND)
@Override
public void handleInboundUpload(byte[] receiver, MessageHeaders headers) {
// 获取当前的主题
String topic = Objects.requireNonNull(headers.get("mqtt_receivedTopic")).toString();
handleInboundData(receiver, topic, "主动上报数据");
}
@ServiceActivator(inputChannel = ChannelName.EVENTS_COLLECTION_INBOUND)
@Override
public void handleInboundCollection(byte[] receiver, MessageHeaders headers) {
// 获取当前的主题
String topic = Objects.requireNonNull(headers.get("mqtt_receivedTopic")).toString();
handleInboundData(receiver, topic, "主动下发采集数据");
}
@ServiceActivator(inputChannel = ChannelName.EVENTS_CONTROL_INBOUND)
@Override
public void handleInboundControl(byte[] receiver, MessageHeaders headers) {
// 获取当前的主题
String topic = Objects.requireNonNull(headers.get("mqtt_receivedTopic")).toString();
handleInboundData(receiver, topic, "控制指令下发");
}
@ServiceActivator(inputChannel = ChannelName.EVENTS_SEND_INBOUND)
@Override
public void handleInboundSend(byte[] receiver, MessageHeaders headers) throws IOException {
String sendStr = new String(receiver, CharsetUtil.UTF_8);
log.info("接收到控制指令下发=>{}", sendStr);
}
private void handleInboundData(byte[] receiver,String topic, String logMessage) {
try {
SanShiFengReceiver commonTopicReceiver = new SanShiFengReceiver();
commonTopicReceiver = mapper.readValue(receiver, SanShiFengReceiver.class);
log.info("主题:{},类型:{}: ,数据:{}", topic, logMessage, commonTopicReceiver.toString());
// 不使用消息队列,查询属于哪种设备类型,然后通过策略进行数据解析
} catch (IOException e) {
log.error("处理数据时发生错误: ", e);
}
}
}

54
user-service/src/main/java/com/mh/user/service/mqtt/service/impl/MqttGatewayServiceImpl.java

@ -1,54 +0,0 @@
package com.mh.user.service.mqtt.service.impl;
import com.mh.user.service.mqtt.service.IMqttGatewayService;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
/**
* @author LJF
* @version 1.0
* @project EEMCS
* @description 网关实现类
* @date 2025-02-07 08:44:55
*/
@Service
public class MqttGatewayServiceImpl implements IMqttGatewayService {
private final MessageChannel outboundChannel;
public MqttGatewayServiceImpl(@Qualifier("outbound") MessageChannel outboundChannel) {
this.outboundChannel = outboundChannel;
}
@Override
public synchronized void publish(String topic, String payload, int qos) {
outboundChannel.send(
MessageBuilder
.withPayload(payload)
.setHeader(MqttHeaders.TOPIC, topic)
.setHeader(MqttHeaders.QOS, qos)
.build());
}
@Override
public void publish(String topic, byte[] payload) {
outboundChannel.send(
MessageBuilder
.withPayload(payload)
.setHeader(MqttHeaders.TOPIC, topic)
.build());
}
@Override
public void publish(String topic, byte[] payload, int qos) {
outboundChannel.send(
MessageBuilder
.withPayload(payload)
.setHeader(MqttHeaders.TOPIC, topic)
.setHeader(MqttHeaders.QOS, qos)
.build());
}
}

142
user-service/src/main/java/com/mh/user/service/mqtt/service/impl/MqttMsgSenderServiceImpl.java

@ -1,142 +0,0 @@
package com.mh.user.service.mqtt.service.impl;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.mh.user.constants.CommonTopicResponse;
import com.mh.user.constants.ServiceReply;
import com.mh.user.constants.TopicConst;
import com.mh.user.service.mqtt.service.IMqttGatewayService;
import com.mh.user.service.mqtt.service.IMqttMsgSenderService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import java.util.Objects;
import java.util.UUID;
/**
* @author LJF
* @version 1.0
* @project springboot-mqtt-demo
* @description mqtt不同类型发送消息
* @date 2024-10-30 14:30:03
*/
@Slf4j
@Service
public class MqttMsgSenderServiceImpl implements IMqttMsgSenderService {
@Autowired
private IMqttGatewayService mqttGatewayService;
@Autowired
private ObjectMapper mapper;
/**
* 发布默认qos为0非持久化
*
* @param pushMessage
* @param topic
*/
@Override
public void publish(String topic, String pushMessage) {
synchronized (this) {//注意:这里一定要同步,否则,在多线程publish的情况下,线程会发生死锁,分析见文章最后补充
try {
mqttGatewayService.publish(topic, pushMessage, 0);
log.info("发送主题:{},消息:{}", topic, pushMessage);
} catch (Exception e) {
log.error("发送主题异常:{},消息:{}", topic, pushMessage, e);
throw new RuntimeException(e);
}
}
}
@Override
public void publish(String topic, CommonTopicResponse response) {
this.publish(topic, 1, response);
}
@Override
public void publish(String topic, int qos, CommonTopicResponse response) {
try {
log.info("发送主题:{},消息:{}", topic, response.toString());
mqttGatewayService.publish(topic, mapper.writeValueAsBytes(response), qos);
} catch (JsonProcessingException e) {
log.error("发送主题:{},消息:{}", topic, response.toString(), e);
throw new RuntimeException(e);
}
}
@Override
public <T> T publishWithReply(Class<T> clazz, String topic, CommonTopicResponse response) {
return this.publishWithReply(clazz, topic, response, 2);
}
@Override
public <T> T publishWithReply(Class<T> clazz, String topic, CommonTopicResponse response, int retryTime) {
// AtomicInteger time = new AtomicInteger(0);
// // Retry three times
// while (time.getAndIncrement() <= retryTime) {
// this.publish(topic, response);
//
// Chan<CommonTopicReceiver<T>> chan = Chan.getInstance();
// // If the message is not received in 0.5 seconds then resend it again.
// CommonTopicReceiver<T> receiver = chan.get(response.getTid());
//
// // Need to match tid and bid.
// if (Objects.nonNull(receiver) && receiver.getTid().equals(response.getTid()) &&
// receiver.getBid().equals(response.getBid())) {
// if (clazz.isAssignableFrom(receiver.getData().getClass())) {
// return receiver.getData();
// }
// throw new TypeMismatchException(receiver.getData(), clazz);
// }
// // It must be guaranteed that the tid and bid of each message are different.
// response.setBid(UUID.randomUUID().toString());
// response.setTid(UUID.randomUUID().toString());
// }
// throw new RuntimeException("没有消息接收到");
return null;
}
@Override
public <T> ServiceReply<T> publishServicesTopic(TypeReference<T> clazz, String sn, String method, Object data, String bid) {
String topic = TopicConst.THING_MODEL_PRE + TopicConst.PRODUCT + sn + TopicConst.SERVICES_SUF;
ServiceReply reply = this.publishWithReply(ServiceReply.class, topic,
CommonTopicResponse.builder()
.tid(UUID.randomUUID().toString())
.bid(StringUtils.hasText(bid) ? bid : UUID.randomUUID().toString())
.timestamp(System.currentTimeMillis())
.method(method)
.data(Objects.requireNonNull(data, ""))
.build());
if (Objects.isNull(clazz)) {
return reply;
}
// put together in "output"
if (Objects.nonNull(reply.getInfo())) {
reply.setOutput(mapper.convertValue(reply.getInfo(), clazz));
}
if (Objects.nonNull(reply.getOutput())) {
reply.setOutput(mapper.convertValue(reply.getOutput(), clazz));
}
return reply;
}
@Override
public ServiceReply publishServicesTopic(String sn, String method, Object data, String bid) {
return this.publishServicesTopic(null, sn, method, data, bid);
}
@Override
public <T> ServiceReply<T> publishServicesTopic(TypeReference<T> clazz, String sn, String method, Object data) {
return this.publishServicesTopic(clazz, sn, method, data, null);
}
@Override
public ServiceReply publishServicesTopic(String sn, String method, Object data) {
return this.publishServicesTopic(null, sn, method, data, null);
}
}

40
user-service/src/main/java/com/mh/user/service/mqtt/service/impl/MqttTopicServiceImpl.java

@ -1,40 +0,0 @@
package com.mh.user.service.mqtt.service.impl;
import com.mh.framework.mqtt.service.IMqttTopicService;
import jakarta.annotation.Resource;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.stereotype.Service;
/**
* @author LJF
* @version 1.0
* @project springboot-mqtt-demo
* @description 订阅主题实现类
* @date 2024-10-29 17:36:31
*/
@Service
public class MqttTopicServiceImpl implements IMqttTopicService {
@Resource
private MqttPahoMessageDrivenChannelAdapter adapter;
@Override
public void subscribe(String topic) {
adapter.addTopic(topic);
}
@Override
public void subscribe(String topic, int qos) {
adapter.addTopic(topic, qos);
}
@Override
public void unsubscribe(String topic) {
adapter.removeTopic(topic);
}
@Override
public String[] getSubscribedTopics() {
return adapter.getTopic();
}
}

33
user-service/src/main/java/com/mh/user/strategy/BackTempControlStrategy.java

@ -2,7 +2,6 @@ package com.mh.user.strategy;
import com.mh.user.constants.Constant;
import com.mh.user.entity.DeviceCodeParamEntity;
import com.mh.user.entity.DeviceInstallEntity;
import com.mh.user.entity.NowPublicDataEntity;
import com.mh.user.service.BuildingService;
import com.mh.user.service.NowDataService;
@ -101,36 +100,4 @@ public class BackTempControlStrategy implements DeviceStrategy {
}
return result;
}
@Override
public String analysisMQTTReceiveData(String dateStr,
String registerAddr,
String dataStr,
String operateType,
DeviceInstallEntity deviceInstallEntity) {
String result = Constant.FAIL;
if (Integer.parseInt(dataStr) < 0) {
log.info("回水温控报文检验失败: " + dataStr);
return result;
}
String addr = deviceInstallEntity.getDeviceAddr();//地址
String data = "";
if (operateType.equalsIgnoreCase(Constant.READ)) {// 读
Double fdata = Double.parseDouble(dataStr);
nowDataService.saveNowHistoryData2(addr, "回水温控", String.valueOf(fdata), "waterTemp", deviceInstallEntity.getBuildingId());
nowDataService.proWaterTemp(dateStr, deviceInstallEntity.getBuildingId(), "");//保存时间点温度
String avgTemp = nowDataService.selectAve(deviceInstallEntity.getBuildingId());
NowPublicDataEntity publicData = new NowPublicDataEntity();
publicData.setBuildingId(deviceInstallEntity.getBuildingId());
publicData.setUseWaterTemp(avgTemp);
publicData.setBackWaterTemp(avgTemp);
publicData.setSingleTemp(String.valueOf(fdata));//单箱温度
nowPublicDataService.saveNowHistoryPublicData(publicData);
log.info("回水温控号:" + addr + ",温度值:" + fdata + ",保存数据库成功!楼栋名称:" + deviceInstallEntity.getBuildingName());
return String.valueOf(fdata);
} else if (operateType.equalsIgnoreCase(Constant.WRITE)) {// 写
result = Constant.SUCCESS;
}
return result;
}
}

16
user-service/src/main/java/com/mh/user/strategy/DeviceStrategy.java

@ -1,7 +1,6 @@
package com.mh.user.strategy;
import com.mh.user.entity.DeviceCodeParamEntity;
import com.mh.user.entity.DeviceInstallEntity;
/**
* @author LJF
@ -18,19 +17,4 @@ public interface DeviceStrategy {
String createOrders(DeviceCodeParamEntity deviceCodeParamEntity);
String analysisReceiveData(String dateStr, String deviceType, String registerAddr, String brand, String buildingId, String buildingName, String dataStr, DeviceCodeParamEntity deviceCodeParamEntity);
/**
* 解析MQTT报文
* @param dateStr
* @param registerAddr
* @param dataStr 已经是解析好的数据
* @param operateType 操作类型读取/设置
* @param deviceInstallEntity
* @return
*/
String analysisMQTTReceiveData(String dateStr,
String registerAddr,
String dataStr,
String operateType,
DeviceInstallEntity deviceInstallEntity);
}

32
user-service/src/main/java/com/mh/user/strategy/EleMeterStrategy.java

@ -1,10 +1,8 @@
package com.mh.user.strategy;
import com.mh.common.utils.StringUtils;
import com.mh.user.constants.Constant;
import com.mh.user.entity.DataResultEntity;
import com.mh.user.entity.DeviceCodeParamEntity;
import com.mh.user.entity.DeviceInstallEntity;
import com.mh.user.service.DataResultService;
import com.mh.user.utils.ExchangeStringUtil;
import com.mh.user.utils.SpringBeanUtil;
@ -158,34 +156,4 @@ public class EleMeterStrategy implements DeviceStrategy {
}
return data;
}
@Override
public String analysisMQTTReceiveData(String dateStr, String registerAddr, String dataStr, String operateType, DeviceInstallEntity deviceInstallEntity) {
String data = Constant.FAIL;
if (Integer.parseInt(dataStr) < 0) {
return data;
}
log.info("电表表号:{},电表读数:{}", deviceInstallEntity.getDeviceAddr(), dataStr);
// 考虑dataStr是否走大数或者走小数
if (Double.parseDouble(dataStr)-deviceInstallEntity.getLastValue()>1000 || Double.parseDouble(dataStr)-deviceInstallEntity.getLastValue()<0) {
dataStr = String.valueOf(deviceInstallEntity.getLastValue());
}
try {
DataResultEntity dataResultEntity = new DataResultEntity();
dataResultEntity.setDeviceAddr(deviceInstallEntity.getDeviceAddr());//通讯编号
dataResultEntity.setDeviceType("电表");
dataResultEntity.setBuildingId(deviceInstallEntity.getBuildingId());
dataResultEntity.setCurValue(Double.parseDouble(dataStr)); //当前读数
Date date = new Date();
dataResultEntity.setCurDate(date); //当前日期
dataResultService.saveDataResult(dataResultEntity);
log.info("电表数据保存数据库成功! 楼栋名称:{}", deviceInstallEntity.getBuildingName());
} catch (Exception e) {
log.error("电表数据保存数据库失败!楼栋名称:{}", deviceInstallEntity.getBuildingName(), e);
}
if (!StringUtils.isBlank(dataStr)) {
data = String.valueOf(Double.valueOf(dataStr)); //00010.76,去除读数前面带0的情况
}
return data;
}
}

6
user-service/src/main/java/com/mh/user/strategy/HeatPumpStatusStrategy.java

@ -4,7 +4,6 @@ import com.alibaba.fastjson2.JSON;
import com.mh.common.utils.StringUtils;
import com.mh.user.constants.Constant;
import com.mh.user.entity.DeviceCodeParamEntity;
import com.mh.user.entity.DeviceInstallEntity;
import com.mh.user.service.BuildingService;
import com.mh.user.service.NowDataService;
import com.mh.user.service.NowPublicDataService;
@ -239,9 +238,4 @@ public class HeatPumpStatusStrategy implements DeviceStrategy {
}
return result;
}
@Override
public String analysisMQTTReceiveData(String dateStr, String registerAddr, String dataStr, String operateType, DeviceInstallEntity deviceInstallEntity) {
return "";
}
}

6
user-service/src/main/java/com/mh/user/strategy/HeatPumpStrategy.java

@ -3,7 +3,6 @@ package com.mh.user.strategy;
import com.mh.common.utils.StringUtils;
import com.mh.user.constants.Constant;
import com.mh.user.entity.DeviceCodeParamEntity;
import com.mh.user.entity.DeviceInstallEntity;
import com.mh.user.entity.NowPublicDataEntity;
import com.mh.user.service.*;
import com.mh.user.utils.ExchangeStringUtil;
@ -671,9 +670,4 @@ public class HeatPumpStrategy implements DeviceStrategy {
stringBuffer.append(dataType);
return sValue;
}
@Override
public String analysisMQTTReceiveData(String dateStr, String registerAddr, String dataStr, String operateType, DeviceInstallEntity deviceInstallEntity) {
return "";
}
}

5
user-service/src/main/java/com/mh/user/strategy/MultiControlStrategy.java

@ -390,9 +390,4 @@ public class MultiControlStrategy implements DeviceStrategy {
}
return l1;
}
@Override
public String analysisMQTTReceiveData(String dateStr, String registerAddr, String dataStr, String operateType, DeviceInstallEntity deviceInstallEntity) {
return "";
}
}

40
user-service/src/main/java/com/mh/user/strategy/PressureTransStrategy.java

@ -38,7 +38,7 @@ public class PressureTransStrategy implements DeviceStrategy {
}
public static PressureTransStrategy getInstance() {
return SingletonHolder.INSTANCE;
return PressureTransStrategy.SingletonHolder.INSTANCE;
}
@Override
@ -128,42 +128,4 @@ public class PressureTransStrategy implements DeviceStrategy {
}
return result;
}
@Override
public String analysisMQTTReceiveData(String dateStr, String registerAddr, String dataStr, String operateType, DeviceInstallEntity deviceInstallEntity) {
String result = "fail";
if (Integer.parseInt(dataStr) < 0) {
return result;
}
if (operateType.equalsIgnoreCase(Constant.READ)) {// 读
Double tankHeight = 0.0;
// 查询当前压变有低区
if (null != deviceInstallEntity
&& !StringUtils.isBlank(deviceInstallEntity.getDeviceName())) {
if (deviceInstallEntity.getDeviceName().contains("低")) {
tankHeight = buildingService.queryLowTankHeight(deviceInstallEntity.getBuildingId());//水箱高,从数据库获取
if (tankHeight == null) {
tankHeight = 2.0;
}
}
}
Double wtLevel = Double.parseDouble(dataStr)/tankHeight * 100; //水箱水位
log.info("------水箱水高:" + wtLevel + "------");
if (wtLevel <= 0) {
wtLevel = 0.0;
} else if (wtLevel >= 100) {
wtLevel = 100.0;
}
DecimalFormat df = new DecimalFormat("0.0");
String strWtLevel = df.format(wtLevel);
// 更新device_install数据
assert deviceInstallEntity != null;
deviceInstallService.updateLastValueByOther(deviceInstallEntity.getDeviceAddr(), strWtLevel, "压变", deviceInstallEntity.getBuildingId());
nowDataService.saveNowHistoryData2(deviceInstallEntity.getDeviceAddr(), "压变", strWtLevel, "waterLevel", deviceInstallEntity.getBuildingId());
nowDataService.proWaterLevel(dateStr, deviceInstallEntity.getBuildingId(), deviceInstallEntity.getDeviceAddr()); //楼栋水位
log.info("时间:"+ dateStr +"压变号:" + deviceInstallEntity.getDeviceAddr() + ",保存数据库成功!楼栋名称:" + deviceInstallEntity.getBuildingName());
result = strWtLevel;
}
return result;
}
}

6
user-service/src/main/java/com/mh/user/strategy/StatusCheckStrategy.java

@ -2,7 +2,6 @@ package com.mh.user.strategy;
import com.mh.user.constants.Constant;
import com.mh.user.entity.DeviceCodeParamEntity;
import com.mh.user.entity.DeviceInstallEntity;
import com.mh.user.entity.NowPublicDataEntity;
import com.mh.user.service.*;
import com.mh.user.utils.ExchangeStringUtil;
@ -125,9 +124,4 @@ public class StatusCheckStrategy implements DeviceStrategy {
}
return Constant.FAIL;
}
@Override
public String analysisMQTTReceiveData(String dateStr, String registerAddr, String dataStr, String operateType, DeviceInstallEntity deviceInstallEntity) {
return "";
}
}

27
user-service/src/main/java/com/mh/user/strategy/TempControlStrategy.java

@ -2,7 +2,6 @@ package com.mh.user.strategy;
import com.mh.user.constants.Constant;
import com.mh.user.entity.DeviceCodeParamEntity;
import com.mh.user.entity.DeviceInstallEntity;
import com.mh.user.entity.NowPublicDataEntity;
import com.mh.user.service.BuildingService;
import com.mh.user.service.NowDataService;
@ -114,30 +113,4 @@ public class TempControlStrategy implements DeviceStrategy {
}
return result;
}
@Override
public String analysisMQTTReceiveData(String dateStr, String registerAddr, String dataStr, String operateType, DeviceInstallEntity deviceInstallEntity) {
String result = Constant.FAIL;
if (Integer.parseInt(dataStr) < 0) {
log.info("温控报文检验失败: " + dataStr);
return result;
}
String data = "";
if (operateType.equalsIgnoreCase(Constant.READ)) {// 读
nowDataService.saveNowHistoryData2(deviceInstallEntity.getDeviceAddr(), "温控", dataStr, "waterTemp", deviceInstallEntity.getBuildingId());
nowDataService.proWaterTemp(dateStr, deviceInstallEntity.getBuildingId(), "");//保存时间点温度
String avgTemp = nowDataService.selectAve(deviceInstallEntity.getBuildingId());
NowPublicDataEntity publicData = new NowPublicDataEntity();
publicData.setBuildingId(deviceInstallEntity.getBuildingId());
publicData.setUseWaterTemp(avgTemp);
publicData.setBackWaterTemp(avgTemp);
publicData.setSingleTemp(dataStr);//单箱温度
nowPublicDataService.saveNowHistoryPublicData(publicData);
log.info("温控号:" + deviceInstallEntity.getDeviceAddr() + ",温度值:" + dataStr + ",保存数据库成功!楼栋名称:" + deviceInstallEntity.getBuildingName());
return dataStr;
} else {// 写
result = Constant.SUCCESS;
}
return result;
}
}

8
user-service/src/main/java/com/mh/user/strategy/TempTransStrategy.java

@ -3,7 +3,6 @@ package com.mh.user.strategy;
import com.alibaba.fastjson2.JSON;
import com.mh.user.constants.Constant;
import com.mh.user.entity.DeviceCodeParamEntity;
import com.mh.user.entity.DeviceInstallEntity;
import com.mh.user.entity.NowPublicDataEntity;
import com.mh.user.service.BuildingService;
import com.mh.user.service.NowDataService;
@ -20,7 +19,7 @@ import java.util.Map;
* @author LJF
* @version 1.0
* @project CHWS
* @description 温度变送器策略
* @description 压力变送器策略
* @date 2024-03-18 09:51:17
*/
@Slf4j
@ -118,9 +117,4 @@ public class TempTransStrategy implements DeviceStrategy {
}
return result;
}
@Override
public String analysisMQTTReceiveData(String dateStr, String registerAddr, String dataStr, String operateType, DeviceInstallEntity deviceInstallEntity) {
return "";
}
}

19
user-service/src/main/java/com/mh/user/strategy/TimeControlStrategy.java

@ -3,7 +3,6 @@ package com.mh.user.strategy;
import com.mh.common.utils.StringUtils;
import com.mh.user.constants.Constant;
import com.mh.user.entity.DeviceCodeParamEntity;
import com.mh.user.entity.DeviceInstallEntity;
import com.mh.user.service.NowDataService;
import com.mh.user.service.NowPublicDataService;
import com.mh.user.utils.ExchangeStringUtil;
@ -182,22 +181,14 @@ public class TimeControlStrategy implements DeviceStrategy {
// 开关时间
// 发送:0603001E0001E5BB
// 返回:0603020041CDB4
if (rec == 14 && (isExactlyDivisible("001E", registerAddr)
|| isExactlyDivisible("001F", registerAddr)
|| isExactlyDivisible("0020", registerAddr)
|| isExactlyDivisible("0021", registerAddr)
|| isExactlyDivisible("0022", registerAddr)
|| isExactlyDivisible("0023", registerAddr)
|| isExactlyDivisible("0024", registerAddr)
|| isExactlyDivisible("0025", registerAddr)
)) {
if (rec == 14 && isExactlyDivisible("001E", registerAddr)) {
// 开关时间
data = ExchangeStringUtil.hexToDec(checkStr.substring(6, 10));
data = ExchangeStringUtil.hexToDec(checkStr.substring(8, 10));
// 得出整数,然后拆分时分,比如data="65",换算成时分就是01:05
int totalMinutes = Integer.parseInt(data);
int hours = totalMinutes / 60;
int minutes = totalMinutes % 60;
data = String.format("%02d%02d", hours, minutes);
data = String.format("%02d:%02d", hours, minutes);
} else if (rec == 14 && isExactlyDivisible("0018", registerAddr)) {
// 星期掩码
// 截取时间
@ -293,8 +284,4 @@ public class TimeControlStrategy implements DeviceStrategy {
return false;
}
@Override
public String analysisMQTTReceiveData(String dateStr, String registerAddr, String dataStr, String operateType, DeviceInstallEntity deviceInstallEntity) {
return "";
}
}

5
user-service/src/main/java/com/mh/user/strategy/WaterLevelSwitchStrategy.java

@ -576,9 +576,4 @@ public class WaterLevelSwitchStrategy implements DeviceStrategy {
}
return Constant.FAIL;
}
@Override
public String analysisMQTTReceiveData(String dateStr, String registerAddr, String dataStr, String operateType, DeviceInstallEntity deviceInstallEntity) {
return "";
}
}

36
user-service/src/main/java/com/mh/user/strategy/WtMeterStrategy.java

@ -1,10 +1,8 @@
package com.mh.user.strategy;
import com.mh.common.utils.StringUtils;
import com.mh.user.constants.Constant;
import com.mh.user.entity.DataResultEntity;
import com.mh.user.entity.DeviceCodeParamEntity;
import com.mh.user.entity.DeviceInstallEntity;
import com.mh.user.service.DataResultService;
import com.mh.user.utils.CRC16;
import com.mh.user.utils.ExchangeStringUtil;
@ -47,7 +45,7 @@ public class WtMeterStrategy implements DeviceStrategy {
String str = "";
if (deviceAddr != null && deviceAddr.length() > 0) {
try {
if (StringUtils.isBlank(brand) || brand.equals("埃美柯") || brand.equals("艾美柯")) {
if (StringUtils.isBlank(brand) || brand.equals("埃美柯")) {
// 0 代表前面补充0,14 代表长度为14,d 代表参数为正数型
str = String.format("%014d", Long.parseLong(deviceAddr));//基表通讯号
// 转换位置
@ -134,36 +132,4 @@ public class WtMeterStrategy implements DeviceStrategy {
}
return data;
}
@Override
public String analysisMQTTReceiveData(String dateStr, String registerAddr, String dataStr, String operateType, DeviceInstallEntity deviceInstallEntity) {
String data = Constant.FAIL;
if (Integer.parseInt(dataStr) < 0) {
return data;
}
log.info("水表表号: " + deviceInstallEntity.getDeviceAddr() + ",水表读数:" + dataStr);
// 考虑dataStr是否走大数或者走小数
if (Double.parseDouble(dataStr)-deviceInstallEntity.getLastValue()>100 || Double.parseDouble(dataStr)-deviceInstallEntity.getLastValue()<0) {
dataStr = String.valueOf(deviceInstallEntity.getLastValue());
}
try {
if (!StringUtils.isBlank(dataStr)) {
DataResultEntity dataResultEntity = new DataResultEntity();
dataResultEntity.setDeviceAddr(deviceInstallEntity.getDeviceAddr());//通讯编号
dataResultEntity.setDeviceType("水表");
dataResultEntity.setCurValue(Double.parseDouble(dataStr)); //当前读数
Date date = new Date();
dataResultEntity.setCurDate(date); //当前日期
dataResultEntity.setBuildingId(deviceInstallEntity.getBuildingId());
dataResultService.saveDataResult(dataResultEntity);
log.info("水表数据保存数据库成功!楼栋名称:" + deviceInstallEntity.getBuildingName());
}
} catch (Exception e) {
log.error("水表数据保存数据库失败!楼栋名称:{}", deviceInstallEntity.getBuildingName(), e);
}
if (!StringUtils.isBlank(data)) {
data = String.valueOf(Double.valueOf(data));
}
return data;
}
}

2
user-service/src/main/java/com/mh/user/utils/GetReadOrder485.java

@ -60,7 +60,7 @@ public class GetReadOrder485 {
String str = "";
if (deviceAddr != null && deviceAddr.length() > 0) {
try {
if (StringUtils.isBlank(brand) || brand.equals("艾美柯") || brand.equals("埃美柯")) {
if (StringUtils.isBlank(brand) || brand.equals("艾美柯")) {
// 0 代表前面补充0,14 代表长度为14,d 代表参数为正数型
str = String.format("%014d", Long.parseLong(deviceAddr));//基表通讯号
// 转换位置

23
user-service/src/main/resources/application-dev.yml

@ -96,28 +96,5 @@ logging:
amap:
key: 984603bf28ef94ac78765a3ea27a6c26
mqttSpring:
# BASIC parameters are required.
BASIC:
protocol: MQTT
host: 192.168.1.79
port: 1883
username: test
password: test123456
client-id: chws_nfxy_mqtt_dev
# If the protocol is ws/wss, this value is required.
path:
# Topics that need to be subscribed when initially connecting to mqtt, multiple topics are divided by ",".
inbound-topic: chws_nfxy_mqtt_dev/read/events_upload/devices
# 此部分是提供给后端生成token返回给前端,让前端使用websocket方式去和MQTT实现交互的,笔者此文的案例中并没有去实现
# 无人机远程控制模式(drone remote control)
DRC:
protocol: WS
host: 192.168.1.79
port: 8083
path: /mqtt
control:
topic: chws_nfxy_mqtt_dev/control/events_upload/devices

18
user-service/src/main/resources/application-prod.yml

@ -14,10 +14,10 @@ spring:
# password: mh@803
## url: jdbc:sqlserver://120.25.220.177:32012;DatabaseName=M_CHWS;allowMultiQueries=true
#阿里云服务器-广州理工
url: jdbc:sqlserver://111.230.50.186:32012;DatabaseName=CHWS;allowMultiQueries=true
driver-class-name: com.microsoft.sqlserver.jdbc.SQLServerDriver
username: test
password: minghan123456@
# url: jdbc:sqlserver://111.230.50.186:32012;DatabaseName=CHWS;allowMultiQueries=true
# driver-class-name: com.microsoft.sqlserver.jdbc.SQLServerDriver
# username: test
# password: minghan123456@
# #华厦云服务器
# url: jdbc:sqlserver://127.0.0.1:1433;DatabaseName=CHWS;allowMultiQueries=true
# driver-class-name: com.microsoft.sqlserver.jdbc.SQLServerDriver
@ -29,10 +29,10 @@ spring:
# username: chws_gsh
# password: Mhtech@803
#广商服务器
# url: jdbc:sqlserver://175.178.153.91:8033;DatabaseName=chws_gsh;allowMultiQueries=true
# driver-class-name: com.microsoft.sqlserver.jdbc.SQLServerDriver
# username: chws_gsh
# password: Mhtech@803gsh
url: jdbc:sqlserver://175.178.153.91:8033;DatabaseName=chws_gsh;allowMultiQueries=true
driver-class-name: com.microsoft.sqlserver.jdbc.SQLServerDriver
username: chws_gsh
password: Mhtech@803gsh
#本机
# url: jdbc:sqlserver://127.0.0.1:9956;DatabaseName=CHWS;allowMultiQueries=true
# driver-class-name: com.microsoft.sqlserver.jdbc.SQLServerDriver
@ -64,7 +64,7 @@ spring:
# username: chws_jm
# password: Mhtech@803
# # 珠海北师大
# # 华软江门
# url: jdbc:sqlserver://127.0.0.1:8033;DatabaseName=chws_bsdz;allowMultiQueries=true
# driver-class-name: com.microsoft.sqlserver.jdbc.SQLServerDriver
# username: chws_bsdz

Loading…
Cancel
Save