Compare commits
No commits in common. '62c8a6fc87a9ece2a4b7553bc166d989889c931a' and 'ada1d2ec2fd70f153cb49074c73542f4f8068931' have entirely different histories.
62c8a6fc87
...
ada1d2ec2f
52 changed files with 30 additions and 2289 deletions
@ -1,122 +0,0 @@ |
|||||||
package com.mh.user.config; |
|
||||||
|
|
||||||
import org.springframework.boot.context.properties.ConfigurationProperties; |
|
||||||
import org.springframework.stereotype.Component; |
|
||||||
|
|
||||||
/** |
|
||||||
* 读取项目相关配置 |
|
||||||
* |
|
||||||
* @author mh |
|
||||||
*/ |
|
||||||
@Component |
|
||||||
@ConfigurationProperties(prefix = "mh") |
|
||||||
public class MHConfig |
|
||||||
{ |
|
||||||
/** 项目名称 */ |
|
||||||
private String name; |
|
||||||
|
|
||||||
/** 版本 */ |
|
||||||
private String version; |
|
||||||
|
|
||||||
/** 版权年份 */ |
|
||||||
private String copyrightYear; |
|
||||||
|
|
||||||
/** 上传路径 */ |
|
||||||
private static String profile; |
|
||||||
|
|
||||||
/** 获取地址开关 */ |
|
||||||
private static boolean addressEnabled; |
|
||||||
|
|
||||||
/** 验证码类型 */ |
|
||||||
private static String captchaType; |
|
||||||
|
|
||||||
public String getName() |
|
||||||
{ |
|
||||||
return name; |
|
||||||
} |
|
||||||
|
|
||||||
public void setName(String name) |
|
||||||
{ |
|
||||||
this.name = name; |
|
||||||
} |
|
||||||
|
|
||||||
public String getVersion() |
|
||||||
{ |
|
||||||
return version; |
|
||||||
} |
|
||||||
|
|
||||||
public void setVersion(String version) |
|
||||||
{ |
|
||||||
this.version = version; |
|
||||||
} |
|
||||||
|
|
||||||
public String getCopyrightYear() |
|
||||||
{ |
|
||||||
return copyrightYear; |
|
||||||
} |
|
||||||
|
|
||||||
public void setCopyrightYear(String copyrightYear) |
|
||||||
{ |
|
||||||
this.copyrightYear = copyrightYear; |
|
||||||
} |
|
||||||
|
|
||||||
public static String getProfile() |
|
||||||
{ |
|
||||||
return profile; |
|
||||||
} |
|
||||||
|
|
||||||
public void setProfile(String profile) |
|
||||||
{ |
|
||||||
MHConfig.profile = profile; |
|
||||||
} |
|
||||||
|
|
||||||
public static boolean isAddressEnabled() |
|
||||||
{ |
|
||||||
return addressEnabled; |
|
||||||
} |
|
||||||
|
|
||||||
public void setAddressEnabled(boolean addressEnabled) |
|
||||||
{ |
|
||||||
MHConfig.addressEnabled = addressEnabled; |
|
||||||
} |
|
||||||
|
|
||||||
public static String getCaptchaType() { |
|
||||||
return captchaType; |
|
||||||
} |
|
||||||
|
|
||||||
public void setCaptchaType(String captchaType) { |
|
||||||
MHConfig.captchaType = captchaType; |
|
||||||
} |
|
||||||
|
|
||||||
/** |
|
||||||
* 获取导入上传路径 |
|
||||||
*/ |
|
||||||
public static String getImportPath() |
|
||||||
{ |
|
||||||
return getProfile() + "/import"; |
|
||||||
} |
|
||||||
|
|
||||||
/** |
|
||||||
* 获取头像上传路径 |
|
||||||
*/ |
|
||||||
public static String getAvatarPath() |
|
||||||
{ |
|
||||||
return getProfile() + "/avatar"; |
|
||||||
} |
|
||||||
|
|
||||||
/** |
|
||||||
* 获取下载路径 |
|
||||||
*/ |
|
||||||
public static String getDownloadPath() |
|
||||||
{ |
|
||||||
return getProfile() + "/download/"; |
|
||||||
} |
|
||||||
|
|
||||||
/** |
|
||||||
* 获取上传路径 |
|
||||||
*/ |
|
||||||
public static String getUploadPath() |
|
||||||
{ |
|
||||||
return getProfile() + "/upload"; |
|
||||||
} |
|
||||||
} |
|
||||||
@ -1,62 +0,0 @@ |
|||||||
package com.mh.user.config.mqtt; |
|
||||||
|
|
||||||
import com.mh.user.config.MHConfig; |
|
||||||
import com.mh.user.constants.ChannelName; |
|
||||||
import com.mh.user.constants.TopicEnum; |
|
||||||
import com.mh.user.utils.SpringBeanUtil; |
|
||||||
import lombok.extern.slf4j.Slf4j; |
|
||||||
import org.springframework.beans.factory.annotation.Autowired; |
|
||||||
import org.springframework.integration.annotation.Router; |
|
||||||
import org.springframework.integration.mqtt.support.MqttHeaders; |
|
||||||
import org.springframework.integration.router.AbstractMessageRouter; |
|
||||||
import org.springframework.messaging.Message; |
|
||||||
import org.springframework.messaging.MessageChannel; |
|
||||||
import org.springframework.messaging.MessageHeaders; |
|
||||||
import org.springframework.stereotype.Component; |
|
||||||
|
|
||||||
import java.util.Collection; |
|
||||||
import java.util.Collections; |
|
||||||
import java.util.Objects; |
|
||||||
|
|
||||||
/** |
|
||||||
* @author LJF |
|
||||||
* @version 1.0 |
|
||||||
* @project springboot-mqtt-demo |
|
||||||
* @description 入站消息路由分发中心 |
|
||||||
* @date 2024-10-29 17:04:17 |
|
||||||
*/ |
|
||||||
@Slf4j |
|
||||||
@Component |
|
||||||
public class InboundMessageRouter extends AbstractMessageRouter { |
|
||||||
|
|
||||||
/** 系统基础配置 */ |
|
||||||
@Autowired |
|
||||||
private MHConfig mHConfig; |
|
||||||
|
|
||||||
/** |
|
||||||
* 目前只需要这个方式,后期在拓展使用IntegrationFlow方式 |
|
||||||
* @param message |
|
||||||
* @return |
|
||||||
*/ |
|
||||||
@Router(inputChannel = ChannelName.INBOUND) |
|
||||||
@Override |
|
||||||
protected Collection<MessageChannel> determineTargetChannels(Message<?> message) { |
|
||||||
MessageHeaders headers = message.getHeaders(); |
|
||||||
String topic = Objects.requireNonNull(headers.get(MqttHeaders.RECEIVED_TOPIC)).toString(); |
|
||||||
// byte[] payload = (byte[]) message.getPayload();
|
|
||||||
// log.info("从当前主题 topic: {}, 接收到的消息:{}", topic, new String(payload));
|
|
||||||
// 判断当前主题是否是当前项目的,温湿度目前写死的
|
|
||||||
if (!topic.startsWith(mHConfig.getName()) && !topic.contains("/temp")) { |
|
||||||
log.info("当前主题 topic: {} 不是当前项目的,直接丢弃", topic); |
|
||||||
return Collections.singleton((MessageChannel) SpringBeanUtil.getBean(ChannelName.DEFAULT_BOUND)); |
|
||||||
} |
|
||||||
// 找到对应的主题消息通道
|
|
||||||
if (topic.contains("/temp")) { |
|
||||||
return Collections.singleton((MessageChannel) SpringBeanUtil.getBean(ChannelName.EVENTS_UPLOAD_INBOUND)); |
|
||||||
} else { |
|
||||||
TopicEnum topicEnum = TopicEnum.find(mHConfig.getName() + "/", topic); |
|
||||||
MessageChannel bean = (MessageChannel) SpringBeanUtil.getBean(topicEnum.getBeanName()); |
|
||||||
return Collections.singleton(bean); |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
||||||
@ -1,99 +0,0 @@ |
|||||||
package com.mh.user.config.mqtt; |
|
||||||
|
|
||||||
import com.mh.user.constants.MqttClientOptions; |
|
||||||
import com.mh.user.constants.MqttProtocolEnum; |
|
||||||
import com.mh.user.constants.MqttUseEnum; |
|
||||||
import lombok.Data; |
|
||||||
import org.eclipse.paho.client.mqttv3.MqttConnectOptions; |
|
||||||
import org.springframework.boot.context.properties.ConfigurationProperties; |
|
||||||
import org.springframework.context.annotation.Bean; |
|
||||||
import org.springframework.context.annotation.Configuration; |
|
||||||
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; |
|
||||||
import org.springframework.integration.mqtt.core.MqttPahoClientFactory; |
|
||||||
import org.springframework.util.StringUtils; |
|
||||||
|
|
||||||
import java.util.Map; |
|
||||||
|
|
||||||
/** |
|
||||||
* @author LJF |
|
||||||
* @version 1.0 |
|
||||||
* @project springboot-mqtt-demo |
|
||||||
* @description mqtt连接配置 |
|
||||||
* @date 2024-10-29 14:44:51 |
|
||||||
*/ |
|
||||||
@Configuration |
|
||||||
@Data |
|
||||||
@ConfigurationProperties |
|
||||||
public class MqttConfig { |
|
||||||
|
|
||||||
private static Map<MqttUseEnum, MqttClientOptions> mqttSpring; |
|
||||||
|
|
||||||
public void setMqttSpring(Map<MqttUseEnum, MqttClientOptions> mqtt) { |
|
||||||
MqttConfig.mqttSpring = mqtt; |
|
||||||
} |
|
||||||
|
|
||||||
/** |
|
||||||
* 获取mqtt基本配置 |
|
||||||
* @return |
|
||||||
*/ |
|
||||||
static MqttClientOptions getBasicMqttClientOptions() { |
|
||||||
if (!mqttSpring.containsKey(MqttUseEnum.BASIC)) { |
|
||||||
throw new Error("请先配置MQTT的基本连接参数,否则无法启动项目"); |
|
||||||
} |
|
||||||
return mqttSpring.get(MqttUseEnum.BASIC); |
|
||||||
} |
|
||||||
|
|
||||||
/** |
|
||||||
* 拼接获取对应mqtt的连接地址 |
|
||||||
* @param options |
|
||||||
* @return |
|
||||||
*/ |
|
||||||
public static String getMqttAddress(MqttClientOptions options) { |
|
||||||
StringBuilder addr = new StringBuilder(); |
|
||||||
addr.append(options.getProtocol().getProtocolAddr()) |
|
||||||
.append(options.getHost().trim()) |
|
||||||
.append(":") |
|
||||||
.append(options.getPort()); |
|
||||||
if ((options.getProtocol() == MqttProtocolEnum.WS || options.getProtocol() == MqttProtocolEnum.WSS) |
|
||||||
&& StringUtils.hasText(options.getPath())) { |
|
||||||
addr.append(options.getPath()); |
|
||||||
} |
|
||||||
return addr.toString(); |
|
||||||
} |
|
||||||
|
|
||||||
public static String getBasicMqttAddress() { |
|
||||||
return getMqttAddress(getBasicMqttClientOptions()); |
|
||||||
} |
|
||||||
|
|
||||||
/** |
|
||||||
* 获取连接参数,注入到spring中 |
|
||||||
* @return |
|
||||||
*/ |
|
||||||
@Bean |
|
||||||
public MqttConnectOptions mqttConnectionOptions() { |
|
||||||
|
|
||||||
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); |
|
||||||
|
|
||||||
MqttClientOptions customizeOptions = getBasicMqttClientOptions(); |
|
||||||
String basicMqttAddress = getBasicMqttAddress(); |
|
||||||
mqttConnectOptions.setServerURIs(new String[]{basicMqttAddress}); |
|
||||||
mqttConnectOptions.setUserName(StringUtils.hasText(customizeOptions.getUsername()) ? |
|
||||||
customizeOptions.getUsername() : ""); |
|
||||||
mqttConnectOptions.setPassword(StringUtils.hasText(customizeOptions.getPassword()) ? |
|
||||||
customizeOptions.getPassword().toCharArray() : new char[0]); |
|
||||||
// 直接进行自动连接
|
|
||||||
mqttConnectOptions.setAutomaticReconnect(true); |
|
||||||
// 时间间隔时间10s
|
|
||||||
mqttConnectOptions.setKeepAliveInterval(10); |
|
||||||
|
|
||||||
return mqttConnectOptions; |
|
||||||
} |
|
||||||
|
|
||||||
@Bean |
|
||||||
public MqttPahoClientFactory mqttClientFactory() { |
|
||||||
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); |
|
||||||
factory.setConnectionOptions(mqttConnectionOptions()); |
|
||||||
return factory; |
|
||||||
} |
|
||||||
|
|
||||||
} |
|
||||||
@ -1,91 +0,0 @@ |
|||||||
package com.mh.user.config.mqtt; |
|
||||||
|
|
||||||
import com.mh.user.constants.ChannelName; |
|
||||||
import com.mh.user.constants.MqttClientOptions; |
|
||||||
import lombok.extern.slf4j.Slf4j; |
|
||||||
import org.springframework.beans.factory.annotation.Autowired; |
|
||||||
import org.springframework.context.annotation.Bean; |
|
||||||
import org.springframework.context.annotation.Configuration; |
|
||||||
import org.springframework.integration.annotation.IntegrationComponentScan; |
|
||||||
import org.springframework.integration.annotation.ServiceActivator; |
|
||||||
import org.springframework.integration.endpoint.MessageProducerSupport; |
|
||||||
import org.springframework.integration.mqtt.core.MqttPahoClientFactory; |
|
||||||
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; |
|
||||||
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; |
|
||||||
import org.springframework.integration.mqtt.support.MqttHeaders; |
|
||||||
import org.springframework.messaging.MessageChannel; |
|
||||||
import org.springframework.messaging.MessageHandler; |
|
||||||
|
|
||||||
import javax.annotation.Resource; |
|
||||||
|
|
||||||
/** |
|
||||||
* @author LJF |
|
||||||
* @version 1.0 |
|
||||||
* @project springboot-mqtt-demo |
|
||||||
* @description 入站配置 |
|
||||||
* @date 2024-10-29 16:22:10 |
|
||||||
*/ |
|
||||||
@Slf4j |
|
||||||
@Configuration |
|
||||||
@IntegrationComponentScan |
|
||||||
public class MqttInboundConfig { |
|
||||||
|
|
||||||
@Autowired |
|
||||||
private MqttPahoClientFactory mqttClientFactory; |
|
||||||
|
|
||||||
@Resource(name = ChannelName.INBOUND) |
|
||||||
private MessageChannel inboundChannel; |
|
||||||
|
|
||||||
private String clientId; |
|
||||||
|
|
||||||
/** |
|
||||||
* 入站适配器 |
|
||||||
* @return |
|
||||||
*/ |
|
||||||
@Bean(name = "adapter") |
|
||||||
public MessageProducerSupport mqttInbound() { |
|
||||||
MqttClientOptions options = MqttConfig.getBasicMqttClientOptions(); |
|
||||||
// 此处初始化的时候,默认订阅了配置文件中已经写好的topic
|
|
||||||
// 如果需要订阅多个,可以自己手动订阅,会写一个addTopic()进行添加订阅
|
|
||||||
clientId = options.getClientId() + "_consumer_" + System.currentTimeMillis(); |
|
||||||
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter( |
|
||||||
clientId, |
|
||||||
mqttClientFactory, |
|
||||||
options.getInboundTopic().split(",")); |
|
||||||
// System.out.println("每一次都会入站适配器吗?"+clientId);
|
|
||||||
DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter(); |
|
||||||
// 统一是字节处理
|
|
||||||
converter.setPayloadAsBytes(true); |
|
||||||
// 设置消息转换器
|
|
||||||
adapter.setConverter(converter); |
|
||||||
// 设置qos(quality of service)
|
|
||||||
// 0:最多一次传输(消息会丢失),
|
|
||||||
// 1:至少一次传输(消息会重复),
|
|
||||||
// 2:只有当消息发送成功时才确认(消息不回丢,但延迟高)。
|
|
||||||
adapter.setQos(0); |
|
||||||
// 设置在接收已经订阅的主题信息后,发送给哪个通道,具体的发送方法需要翻上层的抽象类
|
|
||||||
adapter.setOutputChannel(inboundChannel); |
|
||||||
return adapter; |
|
||||||
} |
|
||||||
|
|
||||||
/** |
|
||||||
* 默认声明一个消息处理器,用于处理无效的消息 |
|
||||||
* @return |
|
||||||
*/ |
|
||||||
@Bean |
|
||||||
@ServiceActivator(inputChannel = ChannelName.DEFAULT_BOUND) |
|
||||||
public MessageHandler handler() { |
|
||||||
return message -> { |
|
||||||
log.info("The default channel does not handle messages." + |
|
||||||
"\nTopic: {}" + |
|
||||||
"\nPayload: {}", |
|
||||||
message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC), |
|
||||||
message.getPayload()); |
|
||||||
}; |
|
||||||
} |
|
||||||
|
|
||||||
public String getClientId() { |
|
||||||
return clientId; |
|
||||||
} |
|
||||||
|
|
||||||
} |
|
||||||
@ -1,55 +0,0 @@ |
|||||||
package com.mh.user.config.mqtt; |
|
||||||
|
|
||||||
import com.mh.user.constants.ChannelName; |
|
||||||
import lombok.extern.slf4j.Slf4j; |
|
||||||
import org.springframework.context.annotation.Bean; |
|
||||||
import org.springframework.context.annotation.Configuration; |
|
||||||
import org.springframework.integration.channel.DirectChannel; |
|
||||||
import org.springframework.messaging.MessageChannel; |
|
||||||
|
|
||||||
/** |
|
||||||
* @author LJF |
|
||||||
* @version 1.0 |
|
||||||
* @project springboot-mqtt-demo |
|
||||||
* @description 声明所有通道的定义类 |
|
||||||
* @date 2024-10-29 16:23:32 |
|
||||||
*/ |
|
||||||
@Slf4j |
|
||||||
@Configuration |
|
||||||
public class MqttMessageChannel { |
|
||||||
|
|
||||||
@Bean(name = ChannelName.OUTBOUND) |
|
||||||
public MessageChannel outboundChannel() { |
|
||||||
return new DirectChannel(); |
|
||||||
} |
|
||||||
|
|
||||||
@Bean(name = ChannelName.INBOUND) |
|
||||||
public MessageChannel inboundChannel() { |
|
||||||
return new DirectChannel(); |
|
||||||
} |
|
||||||
|
|
||||||
/** |
|
||||||
* 事件主动上报通道 |
|
||||||
* @return |
|
||||||
*/ |
|
||||||
@Bean(name = ChannelName.EVENTS_UPLOAD_INBOUND) |
|
||||||
public MessageChannel eventsUploadInbound() { |
|
||||||
return new DirectChannel(); |
|
||||||
} |
|
||||||
|
|
||||||
@Bean(name = ChannelName.EVENTS_COLLECTION_INBOUND) |
|
||||||
public MessageChannel eventsCollectionInbound() { |
|
||||||
return new DirectChannel(); |
|
||||||
} |
|
||||||
|
|
||||||
@Bean(name = ChannelName.EVENTS_CONTROL_INBOUND) |
|
||||||
public MessageChannel eventsControlInbound() { |
|
||||||
return new DirectChannel(); |
|
||||||
} |
|
||||||
|
|
||||||
@Bean(name = ChannelName.EVENTS_SEND_INBOUND) |
|
||||||
public MessageChannel eventsSendInbound() { |
|
||||||
return new DirectChannel(); |
|
||||||
} |
|
||||||
|
|
||||||
} |
|
||||||
@ -1,51 +0,0 @@ |
|||||||
package com.mh.user.config.mqtt; |
|
||||||
|
|
||||||
import com.mh.user.constants.ChannelName; |
|
||||||
import lombok.extern.slf4j.Slf4j; |
|
||||||
import org.springframework.beans.factory.annotation.Autowired; |
|
||||||
import org.springframework.context.annotation.Bean; |
|
||||||
import org.springframework.context.annotation.Configuration; |
|
||||||
import org.springframework.integration.annotation.IntegrationComponentScan; |
|
||||||
import org.springframework.integration.annotation.ServiceActivator; |
|
||||||
import org.springframework.integration.mqtt.core.MqttPahoClientFactory; |
|
||||||
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; |
|
||||||
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; |
|
||||||
import org.springframework.messaging.MessageHandler; |
|
||||||
|
|
||||||
/** |
|
||||||
* @author LJF |
|
||||||
* @version 1.0 |
|
||||||
* @project springboot-mqtt-demo |
|
||||||
* @description 入站配置 |
|
||||||
* @date 2024-10-29 16:22:10 |
|
||||||
*/ |
|
||||||
@Slf4j |
|
||||||
@Configuration |
|
||||||
@IntegrationComponentScan |
|
||||||
public class MqttOutboundConfig { |
|
||||||
|
|
||||||
@Autowired |
|
||||||
private MqttPahoClientFactory mqttClientFactory; |
|
||||||
|
|
||||||
/** |
|
||||||
* 默认声明一个出站处理器,用于处理无效的消息 |
|
||||||
* @return |
|
||||||
*/ |
|
||||||
@Bean |
|
||||||
@ServiceActivator(inputChannel = ChannelName.OUTBOUND) |
|
||||||
public MessageHandler mqttOutbound() { |
|
||||||
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler( |
|
||||||
MqttConfig.getBasicMqttClientOptions().getClientId() + "_producer_" + System.currentTimeMillis(), |
|
||||||
mqttClientFactory); |
|
||||||
DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter(); |
|
||||||
// use byte types uniformly
|
|
||||||
converter.setPayloadAsBytes(true); |
|
||||||
|
|
||||||
messageHandler.setAsync(true); |
|
||||||
messageHandler.setDefaultQos(0); |
|
||||||
messageHandler.setConverter(converter); |
|
||||||
return messageHandler; |
|
||||||
} |
|
||||||
|
|
||||||
|
|
||||||
} |
|
||||||
@ -1,59 +0,0 @@ |
|||||||
package com.mh.user.constants; |
|
||||||
|
|
||||||
/** |
|
||||||
* @author LJF |
|
||||||
* @version 1.0 |
|
||||||
* @project springboot-mqtt-demo |
|
||||||
* @description 声明所有通道 |
|
||||||
* @date 2024-10-29 16:04:19 |
|
||||||
*/ |
|
||||||
public class ChannelName { |
|
||||||
|
|
||||||
/** |
|
||||||
* 默认通道名称(防止出错) |
|
||||||
*/ |
|
||||||
public static final String DEFAULT_BOUND = "default_bound"; |
|
||||||
|
|
||||||
/** |
|
||||||
* 主动上报入站 |
|
||||||
*/ |
|
||||||
public static final String INBOUND = "inbound"; |
|
||||||
|
|
||||||
/** |
|
||||||
* 出站 |
|
||||||
*/ |
|
||||||
public static final String OUTBOUND = "outbound"; |
|
||||||
|
|
||||||
|
|
||||||
/** |
|
||||||
* 入站主动上报 |
|
||||||
*/ |
|
||||||
public static final String EVENTS_UPLOAD_INBOUND = "events_upload_inbound"; |
|
||||||
|
|
||||||
/** |
|
||||||
* 入站主动采集 |
|
||||||
*/ |
|
||||||
public static final String EVENTS_COLLECTION_INBOUND = "events_collection_inbound"; |
|
||||||
|
|
||||||
/** |
|
||||||
* 入站主动控制 |
|
||||||
*/ |
|
||||||
public static final String EVENTS_CONTROL_INBOUND = "events_control_inbound"; |
|
||||||
|
|
||||||
/** |
|
||||||
* 默认进站处理 |
|
||||||
*/ |
|
||||||
public static final String EVENTS_DEFAULT_INBOUND = "events_default_inbound"; |
|
||||||
|
|
||||||
public static final String REPLY_EVENTS_OUTBOUND = "reply_events_outbound"; |
|
||||||
|
|
||||||
/** |
|
||||||
* 新珠江收到的信息 |
|
||||||
*/ |
|
||||||
public static final String EVENTS_RECEIVE_INBOUND = "events_receive_inbound"; |
|
||||||
|
|
||||||
/** |
|
||||||
* 接收服务端的数据报文 |
|
||||||
*/ |
|
||||||
public static final String EVENTS_SEND_INBOUND = "events_send_inbound"; |
|
||||||
} |
|
||||||
@ -1,36 +0,0 @@ |
|||||||
package com.mh.user.constants; |
|
||||||
|
|
||||||
import com.fasterxml.jackson.annotation.JsonIgnoreProperties; |
|
||||||
import lombok.AllArgsConstructor; |
|
||||||
import lombok.Builder; |
|
||||||
import lombok.Data; |
|
||||||
import lombok.NoArgsConstructor; |
|
||||||
|
|
||||||
/** |
|
||||||
* Unified Topic response format |
|
||||||
* |
|
||||||
* @author sean.zhou |
|
||||||
* @version 0.1 |
|
||||||
* @date 2021/11/15 |
|
||||||
*/ |
|
||||||
@Data |
|
||||||
@AllArgsConstructor |
|
||||||
@NoArgsConstructor |
|
||||||
@Builder |
|
||||||
@JsonIgnoreProperties(ignoreUnknown = true) |
|
||||||
public class CommonTopicResponse<T> { |
|
||||||
|
|
||||||
/** |
|
||||||
* The command is sent and the response is matched by the tid and bid fields in the message, |
|
||||||
* and the reply should keep the tid and bid the same. |
|
||||||
*/ |
|
||||||
private String tid; |
|
||||||
|
|
||||||
private String bid; |
|
||||||
|
|
||||||
private String method; |
|
||||||
|
|
||||||
private T data; |
|
||||||
|
|
||||||
private Long timestamp; |
|
||||||
} |
|
||||||
@ -1,108 +0,0 @@ |
|||||||
package com.mh.user.constants; |
|
||||||
|
|
||||||
/** |
|
||||||
* @author LJF |
|
||||||
* @version 1.0 |
|
||||||
* @project springboot-mqtt-demo |
|
||||||
* @description mqtt连接的参数 |
|
||||||
* @date 2024-10-29 14:46:24 |
|
||||||
*/ |
|
||||||
public class MqttClientOptions { |
|
||||||
|
|
||||||
private MqttProtocolEnum protocol; |
|
||||||
|
|
||||||
private String host; |
|
||||||
|
|
||||||
private Integer port; |
|
||||||
|
|
||||||
private String username; |
|
||||||
|
|
||||||
private String password; |
|
||||||
|
|
||||||
private String clientId; |
|
||||||
|
|
||||||
private String path; |
|
||||||
|
|
||||||
/** |
|
||||||
* 客户端连接的时候,订阅的主题 |
|
||||||
*/ |
|
||||||
private String inboundTopic; |
|
||||||
|
|
||||||
public MqttProtocolEnum getProtocol() { |
|
||||||
return protocol; |
|
||||||
} |
|
||||||
|
|
||||||
public void setProtocol(MqttProtocolEnum protocol) { |
|
||||||
this.protocol = protocol; |
|
||||||
} |
|
||||||
|
|
||||||
public String getHost() { |
|
||||||
return host; |
|
||||||
} |
|
||||||
|
|
||||||
public void setHost(String host) { |
|
||||||
this.host = host; |
|
||||||
} |
|
||||||
|
|
||||||
public Integer getPort() { |
|
||||||
return port; |
|
||||||
} |
|
||||||
|
|
||||||
public void setPort(Integer port) { |
|
||||||
this.port = port; |
|
||||||
} |
|
||||||
|
|
||||||
public String getUsername() { |
|
||||||
return username; |
|
||||||
} |
|
||||||
|
|
||||||
public void setUsername(String username) { |
|
||||||
this.username = username; |
|
||||||
} |
|
||||||
|
|
||||||
public String getPassword() { |
|
||||||
return password; |
|
||||||
} |
|
||||||
|
|
||||||
public void setPassword(String password) { |
|
||||||
this.password = password; |
|
||||||
} |
|
||||||
|
|
||||||
public String getClientId() { |
|
||||||
return clientId; |
|
||||||
} |
|
||||||
|
|
||||||
public void setClientId(String clientId) { |
|
||||||
this.clientId = clientId; |
|
||||||
} |
|
||||||
|
|
||||||
public String getPath() { |
|
||||||
return path; |
|
||||||
} |
|
||||||
|
|
||||||
public void setPath(String path) { |
|
||||||
this.path = path; |
|
||||||
} |
|
||||||
|
|
||||||
public String getInboundTopic() { |
|
||||||
return inboundTopic; |
|
||||||
} |
|
||||||
|
|
||||||
public void setInboundTopic(String inboundTopic) { |
|
||||||
this.inboundTopic = inboundTopic; |
|
||||||
} |
|
||||||
|
|
||||||
@Override |
|
||||||
public String toString() { |
|
||||||
return "MqttClientOptions{" + |
|
||||||
"protocol=" + protocol + |
|
||||||
", host='" + host + '\'' + |
|
||||||
", port=" + port + |
|
||||||
", username='" + username + '\'' + |
|
||||||
", password='" + password + '\'' + |
|
||||||
", clientId='" + clientId + '\'' + |
|
||||||
", path='" + path + '\'' + |
|
||||||
", inboundTopic='" + inboundTopic + '\'' + |
|
||||||
'}'; |
|
||||||
} |
|
||||||
} |
|
||||||
@ -1,33 +0,0 @@ |
|||||||
package com.mh.user.constants; |
|
||||||
|
|
||||||
/** |
|
||||||
* @author LJF |
|
||||||
* @version 1.0 |
|
||||||
* @project springboot-mqtt-demo |
|
||||||
* @description 采用哪种协议进行数据交互 |
|
||||||
* @date 2024-10-29 15:21:07 |
|
||||||
*/ |
|
||||||
public enum MqttProtocolEnum { |
|
||||||
|
|
||||||
MQTT("tcp"), |
|
||||||
|
|
||||||
MQTTS("tcp"), |
|
||||||
|
|
||||||
WS("ws"), |
|
||||||
|
|
||||||
WSS("wss"); |
|
||||||
|
|
||||||
final String protocol; |
|
||||||
|
|
||||||
MqttProtocolEnum(String protocol) { |
|
||||||
this.protocol = protocol; |
|
||||||
} |
|
||||||
|
|
||||||
public String getProtocolAddr() { |
|
||||||
return protocol + "://"; |
|
||||||
} |
|
||||||
|
|
||||||
public String getProtocol() { |
|
||||||
return protocol; |
|
||||||
} |
|
||||||
} |
|
||||||
@ -1,16 +0,0 @@ |
|||||||
package com.mh.user.constants; |
|
||||||
|
|
||||||
/** |
|
||||||
* @author LJF |
|
||||||
* @version 1.0 |
|
||||||
* @project springboot-mqtt-demo |
|
||||||
* @description mqtt选择模式 |
|
||||||
* @date 2024-10-29 15:19:21 |
|
||||||
*/ |
|
||||||
public enum MqttUseEnum { |
|
||||||
|
|
||||||
BASIC, |
|
||||||
|
|
||||||
DRC |
|
||||||
|
|
||||||
} |
|
||||||
@ -1,20 +0,0 @@ |
|||||||
package com.mh.user.constants; |
|
||||||
|
|
||||||
import com.fasterxml.jackson.annotation.JsonIgnoreProperties; |
|
||||||
import lombok.Data; |
|
||||||
|
|
||||||
/** |
|
||||||
* @author sean.zhou |
|
||||||
* @version 0.1 |
|
||||||
* @date 2021/11/22 |
|
||||||
*/ |
|
||||||
@Data |
|
||||||
@JsonIgnoreProperties(ignoreUnknown = true) |
|
||||||
public class ServiceReply<T> { |
|
||||||
|
|
||||||
private Integer result; |
|
||||||
|
|
||||||
private T info; |
|
||||||
|
|
||||||
private T output; |
|
||||||
} |
|
||||||
@ -1,31 +0,0 @@ |
|||||||
package com.mh.user.constants; |
|
||||||
|
|
||||||
/** |
|
||||||
* All the topics that need to be used in the project. |
|
||||||
* |
|
||||||
* @author ljf |
|
||||||
* @version 0.1 |
|
||||||
* @date 2025-01-22 |
|
||||||
*/ |
|
||||||
public class TopicConst { |
|
||||||
|
|
||||||
public static final String MH_UPLOAD = "mh_upload/"; |
|
||||||
|
|
||||||
public static final String EVENTS_UPLOAD = "events_upload/"; |
|
||||||
|
|
||||||
public static final String MH_COLLECTION = "mh_collection/"; |
|
||||||
|
|
||||||
public static final String EVENTS_COLLECTION = "events_collection/"; |
|
||||||
|
|
||||||
public static final String MH_CONTROL = "mh_control/"; |
|
||||||
|
|
||||||
public static final String EVENTS_CONTROL = "events_control/"; |
|
||||||
|
|
||||||
public static final String REGEX_SN = "[A-Za-z0-9]+"; |
|
||||||
|
|
||||||
public static final String THING_MODEL_PRE = "thing/"; |
|
||||||
|
|
||||||
public static final String PRODUCT = "product/"; |
|
||||||
|
|
||||||
public static final String SERVICES_SUF = "/services"; |
|
||||||
} |
|
||||||
@ -1,60 +0,0 @@ |
|||||||
package com.mh.user.constants; |
|
||||||
|
|
||||||
import java.util.Arrays; |
|
||||||
import java.util.regex.Pattern; |
|
||||||
|
|
||||||
import static com.mh.user.constants.TopicConst.*; |
|
||||||
|
|
||||||
/** |
|
||||||
* @author ljf |
|
||||||
* @version 1.0 |
|
||||||
* @description: TODO |
|
||||||
* @date 2024/11/06 14:28 |
|
||||||
*/ |
|
||||||
public enum TopicEnum { |
|
||||||
|
|
||||||
/** |
|
||||||
* 冷水机组客户端主动上报数据 |
|
||||||
*/ |
|
||||||
CLIENT_UPLOAD_DATA(Pattern.compile("^" + MH_UPLOAD + EVENTS_UPLOAD + REGEX_SN + "$"), ChannelName.EVENTS_UPLOAD_INBOUND), |
|
||||||
|
|
||||||
/** |
|
||||||
* 服务端采集数据 |
|
||||||
*/ |
|
||||||
SERVER_COLLECTION_DATA(Pattern.compile("^" + MH_COLLECTION + EVENTS_COLLECTION + REGEX_SN + "$"), ChannelName.EVENTS_COLLECTION_INBOUND), |
|
||||||
|
|
||||||
/** |
|
||||||
* 服务端控制指令 |
|
||||||
*/ |
|
||||||
SERVER_CONTROL_DATA(Pattern.compile("^" + MH_CONTROL + EVENTS_CONTROL + REGEX_SN + "$"), ChannelName.EVENTS_CONTROL_INBOUND), |
|
||||||
|
|
||||||
/** |
|
||||||
* 订阅服务端发送的主题命令 |
|
||||||
*/ |
|
||||||
SERVER_SEND_DATA(Pattern.compile("^A/cmd/ctl/send" + "$"), ChannelName.EVENTS_SEND_INBOUND), |
|
||||||
|
|
||||||
UNKNOWN(Pattern.compile("^.*$"), ChannelName.DEFAULT_BOUND); |
|
||||||
|
|
||||||
final Pattern pattern; |
|
||||||
|
|
||||||
final String beanName; |
|
||||||
|
|
||||||
TopicEnum(Pattern pattern, String beanName) { |
|
||||||
this.pattern = pattern; |
|
||||||
this.beanName = beanName; |
|
||||||
} |
|
||||||
|
|
||||||
public Pattern getPattern() { |
|
||||||
return pattern; |
|
||||||
} |
|
||||||
|
|
||||||
public String getBeanName() { |
|
||||||
return beanName; |
|
||||||
} |
|
||||||
|
|
||||||
public static TopicEnum find(String proName, String topic) { |
|
||||||
// 去掉第一个"/"以及之前数据
|
|
||||||
String finalTopic = topic.replaceFirst("^"+proName, "");; |
|
||||||
return Arrays.stream(TopicEnum.values()).filter(topicEnum -> topicEnum.pattern.matcher(finalTopic).matches()).findAny().orElse(UNKNOWN); |
|
||||||
} |
|
||||||
} |
|
||||||
@ -1,48 +0,0 @@ |
|||||||
package com.mh.user.entity; |
|
||||||
|
|
||||||
import com.fasterxml.jackson.annotation.JsonFormat; |
|
||||||
import com.fasterxml.jackson.annotation.JsonIgnore; |
|
||||||
import com.fasterxml.jackson.annotation.JsonInclude; |
|
||||||
import lombok.Data; |
|
||||||
|
|
||||||
import java.util.Date; |
|
||||||
import java.util.Map; |
|
||||||
|
|
||||||
/** |
|
||||||
* @author LJF |
|
||||||
* @version 1.0 |
|
||||||
* @project EEMCS |
|
||||||
* @description mqtt订阅管理 |
|
||||||
* @date 2025-02-14 13:47:07 |
|
||||||
*/ |
|
||||||
@Data |
|
||||||
public class MqttSubscriptionEntity { |
|
||||||
|
|
||||||
private Long id; |
|
||||||
|
|
||||||
private String topic; |
|
||||||
|
|
||||||
private Short qos; |
|
||||||
|
|
||||||
private String clientId; |
|
||||||
|
|
||||||
private String status; |
|
||||||
|
|
||||||
/** 创建者 */ |
|
||||||
private String createBy; |
|
||||||
|
|
||||||
/** 创建时间 */ |
|
||||||
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") |
|
||||||
private Date createTime; |
|
||||||
|
|
||||||
/** 更新者 */ |
|
||||||
private String updateBy; |
|
||||||
|
|
||||||
/** 更新时间 */ |
|
||||||
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") |
|
||||||
private Date updateTime; |
|
||||||
|
|
||||||
/** 备注 */ |
|
||||||
private String remark; |
|
||||||
|
|
||||||
} |
|
||||||
@ -1,32 +0,0 @@ |
|||||||
package com.mh.user.mapper; |
|
||||||
|
|
||||||
import com.mh.user.entity.MqttSubscriptionEntity; |
|
||||||
import org.apache.ibatis.annotations.Mapper; |
|
||||||
import org.apache.ibatis.annotations.Param; |
|
||||||
import org.apache.ibatis.annotations.Select; |
|
||||||
import tk.mybatis.mapper.common.BaseMapper; |
|
||||||
|
|
||||||
import java.util.List; |
|
||||||
|
|
||||||
/** |
|
||||||
* @author LJF |
|
||||||
* @version 1.0 |
|
||||||
* @project EEMCS |
|
||||||
* @description mqtt订阅mapper类 |
|
||||||
* @date 2025-02-14 14:00:58 |
|
||||||
*/ |
|
||||||
@Mapper |
|
||||||
public interface MqttSubscriptionMapper extends BaseMapper<MqttSubscriptionEntity> { |
|
||||||
|
|
||||||
@Select("<script>" + |
|
||||||
"SELECT * FROM mqtt_subscription WHERE 1=1 " + |
|
||||||
"<if test='topic != null and topic != \"\"'>AND topic = #{topic}</if>" + |
|
||||||
"<if test='status != null and status != \"\"'>AND status = #{status}</if>" + |
|
||||||
"ORDER BY crate_time DESC " + |
|
||||||
"</script>") |
|
||||||
List<MqttSubscriptionEntity> selectListByTopic(@Param("topic") String topic, |
|
||||||
@Param("status") String status); |
|
||||||
|
|
||||||
@Select("SELECT top 1 * FROM mqtt_subscription WHERE id = #{id}") |
|
||||||
MqttSubscriptionEntity selectById(@Param("id") String id); |
|
||||||
} |
|
||||||
@ -1,30 +0,0 @@ |
|||||||
package com.mh.user.model; |
|
||||||
|
|
||||||
import lombok.Data; |
|
||||||
|
|
||||||
/** |
|
||||||
* @author LJF |
|
||||||
* @version 1.0 |
|
||||||
* @project EEMCS |
|
||||||
* @description 研华数据体 |
|
||||||
* @date 2025-01-22 14:47:25 |
|
||||||
*/ |
|
||||||
@Data |
|
||||||
public class SanShiFengDatas<T extends Number> { |
|
||||||
|
|
||||||
/** |
|
||||||
* 对应研华的标签值 |
|
||||||
*/ |
|
||||||
private String tag; |
|
||||||
|
|
||||||
/** |
|
||||||
* 上报值 |
|
||||||
*/ |
|
||||||
private T value; |
|
||||||
|
|
||||||
/** |
|
||||||
* 质量值 |
|
||||||
*/ |
|
||||||
private T quality; |
|
||||||
|
|
||||||
} |
|
||||||
@ -1,29 +0,0 @@ |
|||||||
package com.mh.user.model; |
|
||||||
|
|
||||||
import com.fasterxml.jackson.annotation.JsonIgnoreProperties; |
|
||||||
import lombok.Data; |
|
||||||
|
|
||||||
import java.util.List; |
|
||||||
|
|
||||||
/** |
|
||||||
* @author LJF |
|
||||||
* @version 1.0 |
|
||||||
* @project EEMCS |
|
||||||
* @description 研华网关发送接收数据 |
|
||||||
* @date 2025-01-22 14:43:15 |
|
||||||
*/ |
|
||||||
@Data |
|
||||||
@JsonIgnoreProperties(ignoreUnknown = true) |
|
||||||
public class SanShiFengReceiver<T> { |
|
||||||
|
|
||||||
/** |
|
||||||
* 数据集合 |
|
||||||
*/ |
|
||||||
private List<T> d; |
|
||||||
|
|
||||||
/** |
|
||||||
* 主动上报数据时间(带T类型) |
|
||||||
*/ |
|
||||||
private String ts; |
|
||||||
|
|
||||||
} |
|
||||||
@ -1,26 +0,0 @@ |
|||||||
package com.mh.user.service; |
|
||||||
|
|
||||||
import com.mh.user.entity.MqttSubscriptionEntity; |
|
||||||
|
|
||||||
import java.util.List; |
|
||||||
|
|
||||||
/** |
|
||||||
* @author LJF |
|
||||||
* @version 1.0 |
|
||||||
* @project EEMCS |
|
||||||
* @description mqtt订阅管理 |
|
||||||
* @date 2025-02-14 13:58:37 |
|
||||||
*/ |
|
||||||
public interface MqttSubscriptionService { |
|
||||||
|
|
||||||
List<MqttSubscriptionEntity> selectMqttSubList(MqttSubscriptionEntity mqttSubscription); |
|
||||||
|
|
||||||
MqttSubscriptionEntity selectMqttSubById(String msId); |
|
||||||
|
|
||||||
int insertMqttSub(MqttSubscriptionEntity mqttSubscription); |
|
||||||
|
|
||||||
int updateMqttSub(MqttSubscriptionEntity mqttSubscription); |
|
||||||
|
|
||||||
int deleteMqttSubByIds(String[] msIds); |
|
||||||
|
|
||||||
} |
|
||||||
@ -1,61 +0,0 @@ |
|||||||
package com.mh.user.service.impl; |
|
||||||
|
|
||||||
import com.mh.user.entity.MqttSubscriptionEntity; |
|
||||||
import com.mh.user.mapper.MqttSubscriptionMapper; |
|
||||||
import com.mh.user.service.MqttSubscriptionService; |
|
||||||
import org.springframework.beans.factory.annotation.Autowired; |
|
||||||
import org.springframework.stereotype.Service; |
|
||||||
|
|
||||||
import java.util.List; |
|
||||||
|
|
||||||
/** |
|
||||||
* @author LJF |
|
||||||
* @version 1.0 |
|
||||||
* @project EEMCS |
|
||||||
* @description mqtt订阅实现类 |
|
||||||
* @date 2025-02-14 13:59:27 |
|
||||||
*/ |
|
||||||
@Service |
|
||||||
public class MqttSubscriptionServiceImpl implements MqttSubscriptionService { |
|
||||||
|
|
||||||
private final MqttSubscriptionMapper mqttSubscriptionMapper; |
|
||||||
|
|
||||||
@Autowired |
|
||||||
public MqttSubscriptionServiceImpl(MqttSubscriptionMapper mqttSubscriptionMapper) { |
|
||||||
this.mqttSubscriptionMapper = mqttSubscriptionMapper; |
|
||||||
} |
|
||||||
|
|
||||||
@Override |
|
||||||
public List<MqttSubscriptionEntity> selectMqttSubList(MqttSubscriptionEntity mqttSubscription) { |
|
||||||
if (mqttSubscription == null) { |
|
||||||
return null; |
|
||||||
} |
|
||||||
return mqttSubscriptionMapper.selectListByTopic(mqttSubscription.getTopic(), mqttSubscription.getStatus()); |
|
||||||
} |
|
||||||
|
|
||||||
@Override |
|
||||||
public MqttSubscriptionEntity selectMqttSubById(String msId) { |
|
||||||
return mqttSubscriptionMapper.selectById(msId); |
|
||||||
} |
|
||||||
|
|
||||||
@Override |
|
||||||
public int insertMqttSub(MqttSubscriptionEntity mqttSubscription) { |
|
||||||
return mqttSubscriptionMapper.insert(mqttSubscription); |
|
||||||
} |
|
||||||
|
|
||||||
@Override |
|
||||||
public int updateMqttSub(MqttSubscriptionEntity mqttSubscription) { |
|
||||||
return mqttSubscriptionMapper.updateByPrimaryKey(mqttSubscription); |
|
||||||
} |
|
||||||
|
|
||||||
@Override |
|
||||||
public int deleteMqttSubByIds(String[] msIds) { |
|
||||||
if (msIds != null && msIds.length > 0) { |
|
||||||
for (String msId : msIds) { |
|
||||||
mqttSubscriptionMapper.deleteByPrimaryKey(msId); |
|
||||||
} |
|
||||||
return msIds.length; |
|
||||||
} |
|
||||||
return 0; |
|
||||||
} |
|
||||||
} |
|
||||||
@ -1,99 +0,0 @@ |
|||||||
package com.mh.user.service.mqtt.config; |
|
||||||
|
|
||||||
import com.mh.user.constants.MqttClientOptions; |
|
||||||
import com.mh.user.constants.MqttProtocolEnum; |
|
||||||
import com.mh.user.constants.MqttUseEnum; |
|
||||||
import lombok.Data; |
|
||||||
import org.eclipse.paho.client.mqttv3.MqttConnectOptions; |
|
||||||
import org.springframework.boot.context.properties.ConfigurationProperties; |
|
||||||
import org.springframework.context.annotation.Bean; |
|
||||||
import org.springframework.context.annotation.Configuration; |
|
||||||
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; |
|
||||||
import org.springframework.integration.mqtt.core.MqttPahoClientFactory; |
|
||||||
import org.springframework.util.StringUtils; |
|
||||||
|
|
||||||
import java.util.Map; |
|
||||||
|
|
||||||
/** |
|
||||||
* @author LJF |
|
||||||
* @version 1.0 |
|
||||||
* @project springboot-mqtt-demo |
|
||||||
* @description mqtt连接配置 |
|
||||||
* @date 2024-10-29 14:44:51 |
|
||||||
*/ |
|
||||||
@Configuration |
|
||||||
@Data |
|
||||||
@ConfigurationProperties |
|
||||||
public class MqttConfig { |
|
||||||
|
|
||||||
private static Map<MqttUseEnum, MqttClientOptions> mqttSpring; |
|
||||||
|
|
||||||
public void setMqttSpring(Map<MqttUseEnum, MqttClientOptions> mqtt) { |
|
||||||
MqttConfig.mqttSpring = mqtt; |
|
||||||
} |
|
||||||
|
|
||||||
/** |
|
||||||
* 获取mqtt基本配置 |
|
||||||
* @return |
|
||||||
*/ |
|
||||||
static MqttClientOptions getBasicMqttClientOptions() { |
|
||||||
if (!mqttSpring.containsKey(MqttUseEnum.BASIC)) { |
|
||||||
throw new Error("请先配置MQTT的基本连接参数,否则无法启动项目"); |
|
||||||
} |
|
||||||
return mqttSpring.get(MqttUseEnum.BASIC); |
|
||||||
} |
|
||||||
|
|
||||||
/** |
|
||||||
* 拼接获取对应mqtt的连接地址 |
|
||||||
* @param options |
|
||||||
* @return |
|
||||||
*/ |
|
||||||
public static String getMqttAddress(MqttClientOptions options) { |
|
||||||
StringBuilder addr = new StringBuilder(); |
|
||||||
addr.append(options.getProtocol().getProtocolAddr()) |
|
||||||
.append(options.getHost().trim()) |
|
||||||
.append(":") |
|
||||||
.append(options.getPort()); |
|
||||||
if ((options.getProtocol() == MqttProtocolEnum.WS || options.getProtocol() == MqttProtocolEnum.WSS) |
|
||||||
&& StringUtils.hasText(options.getPath())) { |
|
||||||
addr.append(options.getPath()); |
|
||||||
} |
|
||||||
return addr.toString(); |
|
||||||
} |
|
||||||
|
|
||||||
public static String getBasicMqttAddress() { |
|
||||||
return getMqttAddress(getBasicMqttClientOptions()); |
|
||||||
} |
|
||||||
|
|
||||||
/** |
|
||||||
* 获取连接参数,注入到spring中 |
|
||||||
* @return |
|
||||||
*/ |
|
||||||
@Bean |
|
||||||
public MqttConnectOptions mqttConnectionOptions() { |
|
||||||
|
|
||||||
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); |
|
||||||
|
|
||||||
MqttClientOptions customizeOptions = getBasicMqttClientOptions(); |
|
||||||
String basicMqttAddress = getBasicMqttAddress(); |
|
||||||
mqttConnectOptions.setServerURIs(new String[]{basicMqttAddress}); |
|
||||||
mqttConnectOptions.setUserName(StringUtils.hasText(customizeOptions.getUsername()) ? |
|
||||||
customizeOptions.getUsername() : ""); |
|
||||||
mqttConnectOptions.setPassword(StringUtils.hasText(customizeOptions.getPassword()) ? |
|
||||||
customizeOptions.getPassword().toCharArray() : new char[0]); |
|
||||||
// 直接进行自动连接
|
|
||||||
mqttConnectOptions.setAutomaticReconnect(true); |
|
||||||
// 时间间隔时间10s
|
|
||||||
mqttConnectOptions.setKeepAliveInterval(10); |
|
||||||
|
|
||||||
return mqttConnectOptions; |
|
||||||
} |
|
||||||
|
|
||||||
@Bean |
|
||||||
public MqttPahoClientFactory mqttClientFactory() { |
|
||||||
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); |
|
||||||
factory.setConnectionOptions(mqttConnectionOptions()); |
|
||||||
return factory; |
|
||||||
} |
|
||||||
|
|
||||||
} |
|
||||||
@ -1,91 +0,0 @@ |
|||||||
package com.mh.user.service.mqtt.config; |
|
||||||
|
|
||||||
import com.mh.user.constants.ChannelName; |
|
||||||
import com.mh.user.constants.MqttClientOptions; |
|
||||||
import lombok.extern.slf4j.Slf4j; |
|
||||||
import org.springframework.beans.factory.annotation.Autowired; |
|
||||||
import org.springframework.context.annotation.Bean; |
|
||||||
import org.springframework.context.annotation.Configuration; |
|
||||||
import org.springframework.integration.annotation.IntegrationComponentScan; |
|
||||||
import org.springframework.integration.annotation.ServiceActivator; |
|
||||||
import org.springframework.integration.endpoint.MessageProducerSupport; |
|
||||||
import org.springframework.integration.mqtt.core.MqttPahoClientFactory; |
|
||||||
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; |
|
||||||
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; |
|
||||||
import org.springframework.integration.mqtt.support.MqttHeaders; |
|
||||||
import org.springframework.messaging.MessageChannel; |
|
||||||
import org.springframework.messaging.MessageHandler; |
|
||||||
|
|
||||||
import javax.annotation.Resource; |
|
||||||
|
|
||||||
/** |
|
||||||
* @author LJF |
|
||||||
* @version 1.0 |
|
||||||
* @project springboot-mqtt-demo |
|
||||||
* @description 入站配置 |
|
||||||
* @date 2024-10-29 16:22:10 |
|
||||||
*/ |
|
||||||
@Slf4j |
|
||||||
@Configuration |
|
||||||
@IntegrationComponentScan |
|
||||||
public class MqttInboundConfig { |
|
||||||
|
|
||||||
@Autowired |
|
||||||
private MqttPahoClientFactory mqttClientFactory; |
|
||||||
|
|
||||||
@Resource(name = ChannelName.INBOUND) |
|
||||||
private MessageChannel inboundChannel; |
|
||||||
|
|
||||||
private String clientId; |
|
||||||
|
|
||||||
/** |
|
||||||
* 入站适配器 |
|
||||||
* @return |
|
||||||
*/ |
|
||||||
@Bean(name = "adapter") |
|
||||||
public MessageProducerSupport mqttInbound() { |
|
||||||
MqttClientOptions options = MqttConfig.getBasicMqttClientOptions(); |
|
||||||
// 此处初始化的时候,默认订阅了配置文件中已经写好的topic
|
|
||||||
// 如果需要订阅多个,可以自己手动订阅,会写一个addTopic()进行添加订阅
|
|
||||||
clientId = options.getClientId() + "_consumer_" + System.currentTimeMillis(); |
|
||||||
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter( |
|
||||||
clientId, |
|
||||||
mqttClientFactory, |
|
||||||
options.getInboundTopic().split(",")); |
|
||||||
// System.out.println("每一次都会入站适配器吗?"+clientId);
|
|
||||||
DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter(); |
|
||||||
// 统一是字节处理
|
|
||||||
converter.setPayloadAsBytes(true); |
|
||||||
// 设置消息转换器
|
|
||||||
adapter.setConverter(converter); |
|
||||||
// 设置qos(quality of service)
|
|
||||||
// 0:最多一次传输(消息会丢失),
|
|
||||||
// 1:至少一次传输(消息会重复),
|
|
||||||
// 2:只有当消息发送成功时才确认(消息不回丢,但延迟高)。
|
|
||||||
adapter.setQos(0); |
|
||||||
// 设置在接收已经订阅的主题信息后,发送给哪个通道,具体的发送方法需要翻上层的抽象类
|
|
||||||
adapter.setOutputChannel(inboundChannel); |
|
||||||
return adapter; |
|
||||||
} |
|
||||||
|
|
||||||
/** |
|
||||||
* 默认声明一个消息处理器,用于处理无效的消息 |
|
||||||
* @return |
|
||||||
*/ |
|
||||||
@Bean |
|
||||||
@ServiceActivator(inputChannel = ChannelName.DEFAULT_BOUND) |
|
||||||
public MessageHandler handler() { |
|
||||||
return message -> { |
|
||||||
log.info("The default channel does not handle messages." + |
|
||||||
"\nTopic: {}" + |
|
||||||
"\nPayload: {}", |
|
||||||
message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC), |
|
||||||
message.getPayload()); |
|
||||||
}; |
|
||||||
} |
|
||||||
|
|
||||||
public String getClientId() { |
|
||||||
return clientId; |
|
||||||
} |
|
||||||
|
|
||||||
} |
|
||||||
@ -1,55 +0,0 @@ |
|||||||
package com.mh.user.service.mqtt.config; |
|
||||||
|
|
||||||
import com.mh.user.constants.ChannelName; |
|
||||||
import lombok.extern.slf4j.Slf4j; |
|
||||||
import org.springframework.context.annotation.Bean; |
|
||||||
import org.springframework.context.annotation.Configuration; |
|
||||||
import org.springframework.integration.channel.DirectChannel; |
|
||||||
import org.springframework.messaging.MessageChannel; |
|
||||||
|
|
||||||
/** |
|
||||||
* @author LJF |
|
||||||
* @version 1.0 |
|
||||||
* @project springboot-mqtt-demo |
|
||||||
* @description 声明所有通道的定义类 |
|
||||||
* @date 2024-10-29 16:23:32 |
|
||||||
*/ |
|
||||||
@Slf4j |
|
||||||
@Configuration |
|
||||||
public class MqttMessageChannel { |
|
||||||
|
|
||||||
@Bean(name = ChannelName.OUTBOUND) |
|
||||||
public MessageChannel outboundChannel() { |
|
||||||
return new DirectChannel(); |
|
||||||
} |
|
||||||
|
|
||||||
@Bean(name = ChannelName.INBOUND) |
|
||||||
public MessageChannel inboundChannel() { |
|
||||||
return new DirectChannel(); |
|
||||||
} |
|
||||||
|
|
||||||
/** |
|
||||||
* 事件主动上报通道 |
|
||||||
* @return |
|
||||||
*/ |
|
||||||
@Bean(name = ChannelName.EVENTS_UPLOAD_INBOUND) |
|
||||||
public MessageChannel eventsUploadInbound() { |
|
||||||
return new DirectChannel(); |
|
||||||
} |
|
||||||
|
|
||||||
@Bean(name = ChannelName.EVENTS_COLLECTION_INBOUND) |
|
||||||
public MessageChannel eventsCollectionInbound() { |
|
||||||
return new DirectChannel(); |
|
||||||
} |
|
||||||
|
|
||||||
@Bean(name = ChannelName.EVENTS_CONTROL_INBOUND) |
|
||||||
public MessageChannel eventsControlInbound() { |
|
||||||
return new DirectChannel(); |
|
||||||
} |
|
||||||
|
|
||||||
@Bean(name = ChannelName.EVENTS_SEND_INBOUND) |
|
||||||
public MessageChannel eventsSendInbound() { |
|
||||||
return new DirectChannel(); |
|
||||||
} |
|
||||||
|
|
||||||
} |
|
||||||
@ -1,51 +0,0 @@ |
|||||||
package com.mh.user.service.mqtt.config; |
|
||||||
|
|
||||||
import com.mh.user.constants.ChannelName; |
|
||||||
import lombok.extern.slf4j.Slf4j; |
|
||||||
import org.springframework.beans.factory.annotation.Autowired; |
|
||||||
import org.springframework.context.annotation.Bean; |
|
||||||
import org.springframework.context.annotation.Configuration; |
|
||||||
import org.springframework.integration.annotation.IntegrationComponentScan; |
|
||||||
import org.springframework.integration.annotation.ServiceActivator; |
|
||||||
import org.springframework.integration.mqtt.core.MqttPahoClientFactory; |
|
||||||
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; |
|
||||||
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; |
|
||||||
import org.springframework.messaging.MessageHandler; |
|
||||||
|
|
||||||
/** |
|
||||||
* @author LJF |
|
||||||
* @version 1.0 |
|
||||||
* @project springboot-mqtt-demo |
|
||||||
* @description 入站配置 |
|
||||||
* @date 2024-10-29 16:22:10 |
|
||||||
*/ |
|
||||||
@Slf4j |
|
||||||
@Configuration |
|
||||||
@IntegrationComponentScan |
|
||||||
public class MqttOutboundConfig { |
|
||||||
|
|
||||||
@Autowired |
|
||||||
private MqttPahoClientFactory mqttClientFactory; |
|
||||||
|
|
||||||
/** |
|
||||||
* 默认声明一个出站处理器,用于处理无效的消息 |
|
||||||
* @return |
|
||||||
*/ |
|
||||||
@Bean |
|
||||||
@ServiceActivator(inputChannel = ChannelName.OUTBOUND) |
|
||||||
public MessageHandler mqttOutbound() { |
|
||||||
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler( |
|
||||||
MqttConfig.getBasicMqttClientOptions().getClientId() + "_producer_" + System.currentTimeMillis(), |
|
||||||
mqttClientFactory); |
|
||||||
DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter(); |
|
||||||
// use byte types uniformly
|
|
||||||
converter.setPayloadAsBytes(true); |
|
||||||
|
|
||||||
messageHandler.setAsync(true); |
|
||||||
messageHandler.setDefaultQos(0); |
|
||||||
messageHandler.setConverter(converter); |
|
||||||
return messageHandler; |
|
||||||
} |
|
||||||
|
|
||||||
|
|
||||||
} |
|
||||||
@ -1,62 +0,0 @@ |
|||||||
package com.mh.user.service.mqtt.handler; |
|
||||||
|
|
||||||
import com.mh.user.config.MHConfig; |
|
||||||
import com.mh.user.constants.ChannelName; |
|
||||||
import com.mh.user.constants.TopicEnum; |
|
||||||
import com.mh.user.utils.SpringContextUtils; |
|
||||||
import lombok.extern.slf4j.Slf4j; |
|
||||||
import org.springframework.beans.factory.annotation.Autowired; |
|
||||||
import org.springframework.integration.annotation.Router; |
|
||||||
import org.springframework.integration.mqtt.support.MqttHeaders; |
|
||||||
import org.springframework.integration.router.AbstractMessageRouter; |
|
||||||
import org.springframework.messaging.Message; |
|
||||||
import org.springframework.messaging.MessageChannel; |
|
||||||
import org.springframework.messaging.MessageHeaders; |
|
||||||
import org.springframework.stereotype.Component; |
|
||||||
|
|
||||||
import java.util.Collection; |
|
||||||
import java.util.Collections; |
|
||||||
import java.util.Objects; |
|
||||||
|
|
||||||
/** |
|
||||||
* @author LJF |
|
||||||
* @version 1.0 |
|
||||||
* @project springboot-mqtt-demo |
|
||||||
* @description 入站消息路由分发中心 |
|
||||||
* @date 2024-10-29 17:04:17 |
|
||||||
*/ |
|
||||||
@Slf4j |
|
||||||
@Component |
|
||||||
public class InboundMessageRouter extends AbstractMessageRouter { |
|
||||||
|
|
||||||
/** 系统基础配置 */ |
|
||||||
@Autowired |
|
||||||
private MHConfig mHConfig; |
|
||||||
|
|
||||||
/** |
|
||||||
* 目前只需要这个方式,后期在拓展使用IntegrationFlow方式 |
|
||||||
* @param message |
|
||||||
* @return |
|
||||||
*/ |
|
||||||
@Router(inputChannel = ChannelName.INBOUND) |
|
||||||
@Override |
|
||||||
protected Collection<MessageChannel> determineTargetChannels(Message<?> message) { |
|
||||||
MessageHeaders headers = message.getHeaders(); |
|
||||||
String topic = Objects.requireNonNull(headers.get(MqttHeaders.RECEIVED_TOPIC)).toString(); |
|
||||||
// byte[] payload = (byte[]) message.getPayload();
|
|
||||||
// log.info("从当前主题 topic: {}, 接收到的消息:{}", topic, new String(payload));
|
|
||||||
// 判断当前主题是否是当前项目的,温湿度目前写死的
|
|
||||||
// if (!topic.startsWith(mHConfig.getName()) && !topic.contains("/temp")) {
|
|
||||||
// log.info("当前主题 topic: {} 不是当前项目的,直接丢弃", topic);
|
|
||||||
// return Collections.singleton((MessageChannel) SpringContextUtils.getBean(ChannelName.DEFAULT_BOUND));
|
|
||||||
// }
|
|
||||||
// 找到对应的主题消息通道
|
|
||||||
if (topic.contains("/temp")) { |
|
||||||
return Collections.singleton((MessageChannel) SpringContextUtils.getBean(ChannelName.EVENTS_UPLOAD_INBOUND)); |
|
||||||
} else { |
|
||||||
TopicEnum topicEnum = TopicEnum.find(mHConfig.getName() + "/", topic); |
|
||||||
MessageChannel bean = (MessageChannel) SpringContextUtils.getBean(topicEnum.getBeanName()); |
|
||||||
return Collections.singleton(bean); |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
||||||
@ -1,49 +0,0 @@ |
|||||||
package com.mh.user.service.mqtt.service; |
|
||||||
|
|
||||||
import org.springframework.messaging.MessageHeaders; |
|
||||||
|
|
||||||
import java.io.IOException; |
|
||||||
|
|
||||||
/** |
|
||||||
* @author LJF |
|
||||||
* @version 1.0 |
|
||||||
* @project springboot-mqtt-demo |
|
||||||
* @description 通道处理类 |
|
||||||
* @date 2024-11-05 11:30:26 |
|
||||||
*/ |
|
||||||
public interface IEventsService { |
|
||||||
|
|
||||||
/** |
|
||||||
* 处理主动上报 |
|
||||||
* @param receiver |
|
||||||
* @param headers |
|
||||||
* @return |
|
||||||
*/ |
|
||||||
void handleInboundUpload(byte[] receiver, MessageHeaders headers) throws IOException; |
|
||||||
|
|
||||||
|
|
||||||
/** |
|
||||||
* 处理服务器主动采集上报 |
|
||||||
* @param receiver |
|
||||||
* @param headers |
|
||||||
* @return |
|
||||||
*/ |
|
||||||
void handleInboundCollection(byte[] receiver, MessageHeaders headers) throws IOException; |
|
||||||
|
|
||||||
/** |
|
||||||
* 处理控制上报 |
|
||||||
* @param receiver |
|
||||||
* @param headers |
|
||||||
* @return |
|
||||||
*/ |
|
||||||
void handleInboundControl(byte[] receiver, MessageHeaders headers) throws IOException; |
|
||||||
|
|
||||||
/** |
|
||||||
* 处理发送 |
|
||||||
* @param receiver |
|
||||||
* @param headers |
|
||||||
* @throws IOException |
|
||||||
*/ |
|
||||||
void handleInboundSend(byte[] receiver, MessageHeaders headers) throws IOException; |
|
||||||
|
|
||||||
} |
|
||||||
@ -1,36 +0,0 @@ |
|||||||
package com.mh.user.service.mqtt.service; |
|
||||||
|
|
||||||
/** |
|
||||||
* @author LJF |
|
||||||
* @version 1.0 |
|
||||||
* @project springboot-mqtt |
|
||||||
* @description 消息网关 |
|
||||||
* @date 2024-10-30 14:43:55 |
|
||||||
*/ |
|
||||||
//@Component
|
|
||||||
//@Configuration
|
|
||||||
//@MessagingGateway(defaultRequestChannel = ChannelName.OUTBOUND)
|
|
||||||
public interface IMqttGatewayService { |
|
||||||
|
|
||||||
/** |
|
||||||
* 发送消息 |
|
||||||
* @param topic |
|
||||||
* @param payload |
|
||||||
*/ |
|
||||||
void publish(String topic, String payload, int qos); |
|
||||||
|
|
||||||
/** |
|
||||||
* 发送消息 |
|
||||||
* @param topic |
|
||||||
* @param payload |
|
||||||
*/ |
|
||||||
void publish(String topic, byte[] payload); |
|
||||||
|
|
||||||
/** |
|
||||||
* 发送消息并带上qos |
|
||||||
* @param topic |
|
||||||
* @param payload |
|
||||||
* @param qos |
|
||||||
*/ |
|
||||||
void publish(String topic, byte[] payload, int qos); |
|
||||||
} |
|
||||||
@ -1,96 +0,0 @@ |
|||||||
package com.mh.user.service.mqtt.service; |
|
||||||
|
|
||||||
import com.fasterxml.jackson.core.type.TypeReference; |
|
||||||
import com.mh.user.constants.CommonTopicResponse; |
|
||||||
import com.mh.user.constants.ServiceReply; |
|
||||||
|
|
||||||
/** |
|
||||||
* @author LJF |
|
||||||
* @version 1.0 |
|
||||||
* @project springboot-mqtt-demo |
|
||||||
* @description Mqtt发送消息 |
|
||||||
* @date 2024-10-29 17:31:40 |
|
||||||
*/ |
|
||||||
public interface IMqttMsgSenderService { |
|
||||||
|
|
||||||
void publish(String topic, String pushMessage); |
|
||||||
|
|
||||||
/** |
|
||||||
* 发布消息 |
|
||||||
* @param topic target |
|
||||||
* @param response message |
|
||||||
*/ |
|
||||||
void publish(String topic, CommonTopicResponse response); |
|
||||||
|
|
||||||
/** |
|
||||||
* 使用qos发布消息 |
|
||||||
* |
|
||||||
* @param topic target |
|
||||||
* @param qos qos |
|
||||||
* @param response message |
|
||||||
*/ |
|
||||||
void publish(String topic, int qos, CommonTopicResponse response); |
|
||||||
|
|
||||||
/** |
|
||||||
* 发送消息并同时接收响应 |
|
||||||
* @param clazz |
|
||||||
* @param topic |
|
||||||
* @param response 通知启动是否成功 |
|
||||||
* @return |
|
||||||
*/ |
|
||||||
<T> T publishWithReply(Class<T> clazz, String topic, CommonTopicResponse response); |
|
||||||
|
|
||||||
/** |
|
||||||
* 发送消息并同时接收响应 |
|
||||||
* @param clazz |
|
||||||
* @param topic |
|
||||||
* @param response |
|
||||||
* @param retryTime |
|
||||||
* @param <T> |
|
||||||
* @return |
|
||||||
*/ |
|
||||||
<T> T publishWithReply(Class<T> clazz, String topic, CommonTopicResponse response, int retryTime); |
|
||||||
|
|
||||||
/** |
|
||||||
* 专门用于发送服务消息 |
|
||||||
* @param clazz The generic class for ServiceReply. |
|
||||||
* @param sn |
|
||||||
* @param method |
|
||||||
* @param data |
|
||||||
* @param bid |
|
||||||
* @param <T> |
|
||||||
* @return |
|
||||||
*/ |
|
||||||
<T> ServiceReply<T> publishServicesTopic(TypeReference<T> clazz, String sn, String method, Object data, String bid); |
|
||||||
|
|
||||||
/** |
|
||||||
* 仅用于为服务发送消息,不设置接收到的子类型 |
|
||||||
* @param sn |
|
||||||
* @param method |
|
||||||
* @param data |
|
||||||
* @param bid |
|
||||||
* @return |
|
||||||
*/ |
|
||||||
ServiceReply publishServicesTopic(String sn, String method, Object data, String bid); |
|
||||||
|
|
||||||
/** |
|
||||||
* 专门用于发送服务消息 |
|
||||||
* @param clazz The generic class for ServiceReply. |
|
||||||
* @param sn |
|
||||||
* @param method |
|
||||||
* @param data |
|
||||||
* @param <T> |
|
||||||
* @return |
|
||||||
*/ |
|
||||||
<T> ServiceReply<T> publishServicesTopic(TypeReference<T> clazz, String sn, String method, Object data); |
|
||||||
|
|
||||||
/** |
|
||||||
* 仅用于为服务发送消息,不设置接收到的子类型 |
|
||||||
* @param sn |
|
||||||
* @param method |
|
||||||
* @param data |
|
||||||
* @return |
|
||||||
*/ |
|
||||||
ServiceReply publishServicesTopic(String sn, String method, Object data); |
|
||||||
|
|
||||||
} |
|
||||||
@ -1,40 +0,0 @@ |
|||||||
package com.mh.user.service.mqtt.service; |
|
||||||
|
|
||||||
import org.springframework.integration.mqtt.support.MqttHeaders; |
|
||||||
import org.springframework.messaging.handler.annotation.Header; |
|
||||||
|
|
||||||
/** |
|
||||||
* @author LJF |
|
||||||
* @version 1.0 |
|
||||||
* @project springboot-mqtt-demo |
|
||||||
* @description mqtt订阅主题 |
|
||||||
* @date 2024-10-29 17:29:52 |
|
||||||
*/ |
|
||||||
public interface IMqttTopicService { |
|
||||||
|
|
||||||
/** |
|
||||||
* 订阅主题 |
|
||||||
* @param topic |
|
||||||
*/ |
|
||||||
void subscribe(@Header(MqttHeaders.TOPIC) String topic); |
|
||||||
|
|
||||||
/** |
|
||||||
* 订阅主题并设置qos |
|
||||||
* @param topic |
|
||||||
* @param qos |
|
||||||
*/ |
|
||||||
void subscribe(@Header(MqttHeaders.TOPIC) String topic, int qos); |
|
||||||
|
|
||||||
/** |
|
||||||
* 解绑主题 |
|
||||||
* @param topic |
|
||||||
*/ |
|
||||||
void unsubscribe(@Header(MqttHeaders.TOPIC) String topic); |
|
||||||
|
|
||||||
/** |
|
||||||
* 获取已订阅的主题 |
|
||||||
* @return |
|
||||||
*/ |
|
||||||
String[] getSubscribedTopics(); |
|
||||||
|
|
||||||
} |
|
||||||
@ -1,75 +0,0 @@ |
|||||||
package com.mh.user.service.mqtt.service.impl; |
|
||||||
|
|
||||||
import com.alibaba.fastjson2.JSONObject; |
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper; |
|
||||||
import com.mh.user.constants.ChannelName; |
|
||||||
import com.mh.user.model.SanShiFengReceiver; |
|
||||||
import com.mh.user.service.mqtt.service.IEventsService; |
|
||||||
import io.netty.util.CharsetUtil; |
|
||||||
import lombok.extern.slf4j.Slf4j; |
|
||||||
import org.springframework.beans.factory.annotation.Autowired; |
|
||||||
import org.springframework.integration.annotation.ServiceActivator; |
|
||||||
import org.springframework.messaging.MessageHeaders; |
|
||||||
import org.springframework.stereotype.Service; |
|
||||||
|
|
||||||
import java.io.IOException; |
|
||||||
import java.util.Objects; |
|
||||||
|
|
||||||
/** |
|
||||||
* @author LJF |
|
||||||
* @version 1.0 |
|
||||||
* @project springboot-mqtt-demo |
|
||||||
* @description 通道消息处理 |
|
||||||
* @date 2024-11-05 11:42:30 |
|
||||||
*/ |
|
||||||
@Slf4j |
|
||||||
@Service |
|
||||||
public class EventsServiceImpl implements IEventsService { |
|
||||||
|
|
||||||
@Autowired |
|
||||||
private ObjectMapper mapper; |
|
||||||
|
|
||||||
@ServiceActivator(inputChannel = ChannelName.EVENTS_UPLOAD_INBOUND) |
|
||||||
@Override |
|
||||||
public void handleInboundUpload(byte[] receiver, MessageHeaders headers) { |
|
||||||
// 获取当前的主题
|
|
||||||
String topic = Objects.requireNonNull(headers.get("mqtt_receivedTopic")).toString(); |
|
||||||
handleInboundData(receiver, topic, "主动上报数据"); |
|
||||||
} |
|
||||||
|
|
||||||
@ServiceActivator(inputChannel = ChannelName.EVENTS_COLLECTION_INBOUND) |
|
||||||
@Override |
|
||||||
public void handleInboundCollection(byte[] receiver, MessageHeaders headers) { |
|
||||||
// 获取当前的主题
|
|
||||||
String topic = Objects.requireNonNull(headers.get("mqtt_receivedTopic")).toString(); |
|
||||||
handleInboundData(receiver, topic, "主动下发采集数据"); |
|
||||||
} |
|
||||||
|
|
||||||
@ServiceActivator(inputChannel = ChannelName.EVENTS_CONTROL_INBOUND) |
|
||||||
@Override |
|
||||||
public void handleInboundControl(byte[] receiver, MessageHeaders headers) { |
|
||||||
// 获取当前的主题
|
|
||||||
String topic = Objects.requireNonNull(headers.get("mqtt_receivedTopic")).toString(); |
|
||||||
handleInboundData(receiver, topic, "控制指令下发"); |
|
||||||
} |
|
||||||
|
|
||||||
@ServiceActivator(inputChannel = ChannelName.EVENTS_SEND_INBOUND) |
|
||||||
@Override |
|
||||||
public void handleInboundSend(byte[] receiver, MessageHeaders headers) throws IOException { |
|
||||||
String sendStr = new String(receiver, CharsetUtil.UTF_8); |
|
||||||
log.info("接收到控制指令下发=>{}", sendStr); |
|
||||||
} |
|
||||||
|
|
||||||
private void handleInboundData(byte[] receiver,String topic, String logMessage) { |
|
||||||
try { |
|
||||||
SanShiFengReceiver commonTopicReceiver = new SanShiFengReceiver(); |
|
||||||
commonTopicReceiver = mapper.readValue(receiver, SanShiFengReceiver.class); |
|
||||||
log.info("主题:{},类型:{}: ,数据:{}", topic, logMessage, commonTopicReceiver.toString()); |
|
||||||
// 不使用消息队列,查询属于哪种设备类型,然后通过策略进行数据解析
|
|
||||||
} catch (IOException e) { |
|
||||||
log.error("处理数据时发生错误: ", e); |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
|
|
||||||
} |
|
||||||
@ -1,54 +0,0 @@ |
|||||||
package com.mh.user.service.mqtt.service.impl; |
|
||||||
|
|
||||||
import com.mh.user.service.mqtt.service.IMqttGatewayService; |
|
||||||
import org.springframework.beans.factory.annotation.Qualifier; |
|
||||||
import org.springframework.integration.mqtt.support.MqttHeaders; |
|
||||||
import org.springframework.messaging.MessageChannel; |
|
||||||
import org.springframework.messaging.support.MessageBuilder; |
|
||||||
import org.springframework.stereotype.Service; |
|
||||||
|
|
||||||
/** |
|
||||||
* @author LJF |
|
||||||
* @version 1.0 |
|
||||||
* @project EEMCS |
|
||||||
* @description 网关实现类 |
|
||||||
* @date 2025-02-07 08:44:55 |
|
||||||
*/ |
|
||||||
@Service |
|
||||||
public class MqttGatewayServiceImpl implements IMqttGatewayService { |
|
||||||
|
|
||||||
private final MessageChannel outboundChannel; |
|
||||||
|
|
||||||
public MqttGatewayServiceImpl(@Qualifier("outbound") MessageChannel outboundChannel) { |
|
||||||
this.outboundChannel = outboundChannel; |
|
||||||
} |
|
||||||
|
|
||||||
@Override |
|
||||||
public synchronized void publish(String topic, String payload, int qos) { |
|
||||||
outboundChannel.send( |
|
||||||
MessageBuilder |
|
||||||
.withPayload(payload) |
|
||||||
.setHeader(MqttHeaders.TOPIC, topic) |
|
||||||
.setHeader(MqttHeaders.QOS, qos) |
|
||||||
.build()); |
|
||||||
} |
|
||||||
|
|
||||||
@Override |
|
||||||
public void publish(String topic, byte[] payload) { |
|
||||||
outboundChannel.send( |
|
||||||
MessageBuilder |
|
||||||
.withPayload(payload) |
|
||||||
.setHeader(MqttHeaders.TOPIC, topic) |
|
||||||
.build()); |
|
||||||
} |
|
||||||
|
|
||||||
@Override |
|
||||||
public void publish(String topic, byte[] payload, int qos) { |
|
||||||
outboundChannel.send( |
|
||||||
MessageBuilder |
|
||||||
.withPayload(payload) |
|
||||||
.setHeader(MqttHeaders.TOPIC, topic) |
|
||||||
.setHeader(MqttHeaders.QOS, qos) |
|
||||||
.build()); |
|
||||||
} |
|
||||||
} |
|
||||||
@ -1,142 +0,0 @@ |
|||||||
package com.mh.user.service.mqtt.service.impl; |
|
||||||
|
|
||||||
import com.fasterxml.jackson.core.JsonProcessingException; |
|
||||||
import com.fasterxml.jackson.core.type.TypeReference; |
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper; |
|
||||||
import com.mh.user.constants.CommonTopicResponse; |
|
||||||
import com.mh.user.constants.ServiceReply; |
|
||||||
import com.mh.user.constants.TopicConst; |
|
||||||
import com.mh.user.service.mqtt.service.IMqttGatewayService; |
|
||||||
import com.mh.user.service.mqtt.service.IMqttMsgSenderService; |
|
||||||
import lombok.extern.slf4j.Slf4j; |
|
||||||
import org.springframework.beans.factory.annotation.Autowired; |
|
||||||
import org.springframework.stereotype.Service; |
|
||||||
import org.springframework.util.StringUtils; |
|
||||||
|
|
||||||
import java.util.Objects; |
|
||||||
import java.util.UUID; |
|
||||||
|
|
||||||
/** |
|
||||||
* @author LJF |
|
||||||
* @version 1.0 |
|
||||||
* @project springboot-mqtt-demo |
|
||||||
* @description mqtt不同类型发送消息 |
|
||||||
* @date 2024-10-30 14:30:03 |
|
||||||
*/ |
|
||||||
@Slf4j |
|
||||||
@Service |
|
||||||
public class MqttMsgSenderServiceImpl implements IMqttMsgSenderService { |
|
||||||
|
|
||||||
@Autowired |
|
||||||
private IMqttGatewayService mqttGatewayService; |
|
||||||
|
|
||||||
@Autowired |
|
||||||
private ObjectMapper mapper; |
|
||||||
|
|
||||||
|
|
||||||
/** |
|
||||||
* 发布,默认qos为0,非持久化 |
|
||||||
* |
|
||||||
* @param pushMessage |
|
||||||
* @param topic |
|
||||||
*/ |
|
||||||
@Override |
|
||||||
public void publish(String topic, String pushMessage) { |
|
||||||
synchronized (this) {//注意:这里一定要同步,否则,在多线程publish的情况下,线程会发生死锁,分析见文章最后补充
|
|
||||||
try { |
|
||||||
mqttGatewayService.publish(topic, pushMessage, 0); |
|
||||||
log.info("发送主题:{},消息:{}", topic, pushMessage); |
|
||||||
} catch (Exception e) { |
|
||||||
log.error("发送主题异常:{},消息:{}", topic, pushMessage, e); |
|
||||||
throw new RuntimeException(e); |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
@Override |
|
||||||
public void publish(String topic, CommonTopicResponse response) { |
|
||||||
this.publish(topic, 1, response); |
|
||||||
} |
|
||||||
|
|
||||||
@Override |
|
||||||
public void publish(String topic, int qos, CommonTopicResponse response) { |
|
||||||
try { |
|
||||||
log.info("发送主题:{},消息:{}", topic, response.toString()); |
|
||||||
mqttGatewayService.publish(topic, mapper.writeValueAsBytes(response), qos); |
|
||||||
} catch (JsonProcessingException e) { |
|
||||||
log.error("发送主题:{},消息:{}", topic, response.toString(), e); |
|
||||||
throw new RuntimeException(e); |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
@Override |
|
||||||
public <T> T publishWithReply(Class<T> clazz, String topic, CommonTopicResponse response) { |
|
||||||
return this.publishWithReply(clazz, topic, response, 2); |
|
||||||
} |
|
||||||
|
|
||||||
@Override |
|
||||||
public <T> T publishWithReply(Class<T> clazz, String topic, CommonTopicResponse response, int retryTime) { |
|
||||||
// AtomicInteger time = new AtomicInteger(0);
|
|
||||||
// // Retry three times
|
|
||||||
// while (time.getAndIncrement() <= retryTime) {
|
|
||||||
// this.publish(topic, response);
|
|
||||||
//
|
|
||||||
// Chan<CommonTopicReceiver<T>> chan = Chan.getInstance();
|
|
||||||
// // If the message is not received in 0.5 seconds then resend it again.
|
|
||||||
// CommonTopicReceiver<T> receiver = chan.get(response.getTid());
|
|
||||||
//
|
|
||||||
// // Need to match tid and bid.
|
|
||||||
// if (Objects.nonNull(receiver) && receiver.getTid().equals(response.getTid()) &&
|
|
||||||
// receiver.getBid().equals(response.getBid())) {
|
|
||||||
// if (clazz.isAssignableFrom(receiver.getData().getClass())) {
|
|
||||||
// return receiver.getData();
|
|
||||||
// }
|
|
||||||
// throw new TypeMismatchException(receiver.getData(), clazz);
|
|
||||||
// }
|
|
||||||
// // It must be guaranteed that the tid and bid of each message are different.
|
|
||||||
// response.setBid(UUID.randomUUID().toString());
|
|
||||||
// response.setTid(UUID.randomUUID().toString());
|
|
||||||
// }
|
|
||||||
// throw new RuntimeException("没有消息接收到");
|
|
||||||
return null; |
|
||||||
} |
|
||||||
|
|
||||||
@Override |
|
||||||
public <T> ServiceReply<T> publishServicesTopic(TypeReference<T> clazz, String sn, String method, Object data, String bid) { |
|
||||||
String topic = TopicConst.THING_MODEL_PRE + TopicConst.PRODUCT + sn + TopicConst.SERVICES_SUF; |
|
||||||
ServiceReply reply = this.publishWithReply(ServiceReply.class, topic, |
|
||||||
CommonTopicResponse.builder() |
|
||||||
.tid(UUID.randomUUID().toString()) |
|
||||||
.bid(StringUtils.hasText(bid) ? bid : UUID.randomUUID().toString()) |
|
||||||
.timestamp(System.currentTimeMillis()) |
|
||||||
.method(method) |
|
||||||
.data(Objects.requireNonNull(data, "")) |
|
||||||
.build()); |
|
||||||
if (Objects.isNull(clazz)) { |
|
||||||
return reply; |
|
||||||
} |
|
||||||
// put together in "output"
|
|
||||||
if (Objects.nonNull(reply.getInfo())) { |
|
||||||
reply.setOutput(mapper.convertValue(reply.getInfo(), clazz)); |
|
||||||
} |
|
||||||
if (Objects.nonNull(reply.getOutput())) { |
|
||||||
reply.setOutput(mapper.convertValue(reply.getOutput(), clazz)); |
|
||||||
} |
|
||||||
return reply; |
|
||||||
} |
|
||||||
|
|
||||||
@Override |
|
||||||
public ServiceReply publishServicesTopic(String sn, String method, Object data, String bid) { |
|
||||||
return this.publishServicesTopic(null, sn, method, data, bid); |
|
||||||
} |
|
||||||
|
|
||||||
@Override |
|
||||||
public <T> ServiceReply<T> publishServicesTopic(TypeReference<T> clazz, String sn, String method, Object data) { |
|
||||||
return this.publishServicesTopic(clazz, sn, method, data, null); |
|
||||||
} |
|
||||||
|
|
||||||
@Override |
|
||||||
public ServiceReply publishServicesTopic(String sn, String method, Object data) { |
|
||||||
return this.publishServicesTopic(null, sn, method, data, null); |
|
||||||
} |
|
||||||
} |
|
||||||
@ -1,40 +0,0 @@ |
|||||||
package com.mh.user.service.mqtt.service.impl; |
|
||||||
|
|
||||||
import com.mh.framework.mqtt.service.IMqttTopicService; |
|
||||||
import jakarta.annotation.Resource; |
|
||||||
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; |
|
||||||
import org.springframework.stereotype.Service; |
|
||||||
|
|
||||||
/** |
|
||||||
* @author LJF |
|
||||||
* @version 1.0 |
|
||||||
* @project springboot-mqtt-demo |
|
||||||
* @description 订阅主题实现类 |
|
||||||
* @date 2024-10-29 17:36:31 |
|
||||||
*/ |
|
||||||
@Service |
|
||||||
public class MqttTopicServiceImpl implements IMqttTopicService { |
|
||||||
|
|
||||||
@Resource |
|
||||||
private MqttPahoMessageDrivenChannelAdapter adapter; |
|
||||||
|
|
||||||
@Override |
|
||||||
public void subscribe(String topic) { |
|
||||||
adapter.addTopic(topic); |
|
||||||
} |
|
||||||
|
|
||||||
@Override |
|
||||||
public void subscribe(String topic, int qos) { |
|
||||||
adapter.addTopic(topic, qos); |
|
||||||
} |
|
||||||
|
|
||||||
@Override |
|
||||||
public void unsubscribe(String topic) { |
|
||||||
adapter.removeTopic(topic); |
|
||||||
} |
|
||||||
|
|
||||||
@Override |
|
||||||
public String[] getSubscribedTopics() { |
|
||||||
return adapter.getTopic(); |
|
||||||
} |
|
||||||
} |
|
||||||
Loading…
Reference in new issue