From 62c8a6fc87a9ece2a4b7553bc166d989889c931a Mon Sep 17 00:00:00 2001 From: 25604 Date: Tue, 9 Dec 2025 16:32:48 +0800 Subject: [PATCH] =?UTF-8?q?1=E3=80=81=E5=85=BC=E5=AE=B9MQTT=E5=8D=8F?= =?UTF-8?q?=E8=AE=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../config/mqtt/InboundMessageRouter.java | 62 ++++++++++++ .../com/mh/user/config/mqtt/MqttConfig.java | 99 +++++++++++++++++++ .../user/config/mqtt/MqttInboundConfig.java | 91 +++++++++++++++++ .../user/config/mqtt/MqttMessageChannel.java | 55 +++++++++++ .../user/config/mqtt/MqttOutboundConfig.java | 51 ++++++++++ 5 files changed, 358 insertions(+) create mode 100644 user-service/src/main/java/com/mh/user/config/mqtt/InboundMessageRouter.java create mode 100644 user-service/src/main/java/com/mh/user/config/mqtt/MqttConfig.java create mode 100644 user-service/src/main/java/com/mh/user/config/mqtt/MqttInboundConfig.java create mode 100644 user-service/src/main/java/com/mh/user/config/mqtt/MqttMessageChannel.java create mode 100644 user-service/src/main/java/com/mh/user/config/mqtt/MqttOutboundConfig.java diff --git a/user-service/src/main/java/com/mh/user/config/mqtt/InboundMessageRouter.java b/user-service/src/main/java/com/mh/user/config/mqtt/InboundMessageRouter.java new file mode 100644 index 0000000..ccc4804 --- /dev/null +++ b/user-service/src/main/java/com/mh/user/config/mqtt/InboundMessageRouter.java @@ -0,0 +1,62 @@ +package com.mh.user.config.mqtt; + +import com.mh.user.config.MHConfig; +import com.mh.user.constants.ChannelName; +import com.mh.user.constants.TopicEnum; +import com.mh.user.utils.SpringBeanUtil; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.integration.annotation.Router; +import org.springframework.integration.mqtt.support.MqttHeaders; +import org.springframework.integration.router.AbstractMessageRouter; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHeaders; +import org.springframework.stereotype.Component; + +import java.util.Collection; +import java.util.Collections; +import java.util.Objects; + +/** + * @author LJF + * @version 1.0 + * @project springboot-mqtt-demo + * @description 入站消息路由分发中心 + * @date 2024-10-29 17:04:17 + */ +@Slf4j +@Component +public class InboundMessageRouter extends AbstractMessageRouter { + + /** 系统基础配置 */ + @Autowired + private MHConfig mHConfig; + + /** + * 目前只需要这个方式,后期在拓展使用IntegrationFlow方式 + * @param message + * @return + */ + @Router(inputChannel = ChannelName.INBOUND) + @Override + protected Collection determineTargetChannels(Message message) { + MessageHeaders headers = message.getHeaders(); + String topic = Objects.requireNonNull(headers.get(MqttHeaders.RECEIVED_TOPIC)).toString(); +// byte[] payload = (byte[]) message.getPayload(); +// log.info("从当前主题 topic: {}, 接收到的消息:{}", topic, new String(payload)); + // 判断当前主题是否是当前项目的,温湿度目前写死的 + if (!topic.startsWith(mHConfig.getName()) && !topic.contains("/temp")) { + log.info("当前主题 topic: {} 不是当前项目的,直接丢弃", topic); + return Collections.singleton((MessageChannel) SpringBeanUtil.getBean(ChannelName.DEFAULT_BOUND)); + } + // 找到对应的主题消息通道 + if (topic.contains("/temp")) { + return Collections.singleton((MessageChannel) SpringBeanUtil.getBean(ChannelName.EVENTS_UPLOAD_INBOUND)); + } else { + TopicEnum topicEnum = TopicEnum.find(mHConfig.getName() + "/", topic); + MessageChannel bean = (MessageChannel) SpringBeanUtil.getBean(topicEnum.getBeanName()); + return Collections.singleton(bean); + } + } +} diff --git a/user-service/src/main/java/com/mh/user/config/mqtt/MqttConfig.java b/user-service/src/main/java/com/mh/user/config/mqtt/MqttConfig.java new file mode 100644 index 0000000..1595c23 --- /dev/null +++ b/user-service/src/main/java/com/mh/user/config/mqtt/MqttConfig.java @@ -0,0 +1,99 @@ +package com.mh.user.config.mqtt; + +import com.mh.user.constants.MqttClientOptions; +import com.mh.user.constants.MqttProtocolEnum; +import com.mh.user.constants.MqttUseEnum; +import lombok.Data; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; +import org.springframework.integration.mqtt.core.MqttPahoClientFactory; +import org.springframework.util.StringUtils; + +import java.util.Map; + +/** + * @author LJF + * @version 1.0 + * @project springboot-mqtt-demo + * @description mqtt连接配置 + * @date 2024-10-29 14:44:51 + */ +@Configuration +@Data +@ConfigurationProperties +public class MqttConfig { + + private static Map mqttSpring; + + public void setMqttSpring(Map mqtt) { + MqttConfig.mqttSpring = mqtt; + } + + /** + * 获取mqtt基本配置 + * @return + */ + static MqttClientOptions getBasicMqttClientOptions() { + if (!mqttSpring.containsKey(MqttUseEnum.BASIC)) { + throw new Error("请先配置MQTT的基本连接参数,否则无法启动项目"); + } + return mqttSpring.get(MqttUseEnum.BASIC); + } + + /** + * 拼接获取对应mqtt的连接地址 + * @param options + * @return + */ + public static String getMqttAddress(MqttClientOptions options) { + StringBuilder addr = new StringBuilder(); + addr.append(options.getProtocol().getProtocolAddr()) + .append(options.getHost().trim()) + .append(":") + .append(options.getPort()); + if ((options.getProtocol() == MqttProtocolEnum.WS || options.getProtocol() == MqttProtocolEnum.WSS) + && StringUtils.hasText(options.getPath())) { + addr.append(options.getPath()); + } + return addr.toString(); + } + + public static String getBasicMqttAddress() { + return getMqttAddress(getBasicMqttClientOptions()); + } + + /** + * 获取连接参数,注入到spring中 + * @return + */ + @Bean + public MqttConnectOptions mqttConnectionOptions() { + + MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); + + MqttClientOptions customizeOptions = getBasicMqttClientOptions(); + String basicMqttAddress = getBasicMqttAddress(); + mqttConnectOptions.setServerURIs(new String[]{basicMqttAddress}); + mqttConnectOptions.setUserName(StringUtils.hasText(customizeOptions.getUsername()) ? + customizeOptions.getUsername() : ""); + mqttConnectOptions.setPassword(StringUtils.hasText(customizeOptions.getPassword()) ? + customizeOptions.getPassword().toCharArray() : new char[0]); + // 直接进行自动连接 + mqttConnectOptions.setAutomaticReconnect(true); + // 时间间隔时间10s + mqttConnectOptions.setKeepAliveInterval(10); + + return mqttConnectOptions; + } + + @Bean + public MqttPahoClientFactory mqttClientFactory() { + DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); + factory.setConnectionOptions(mqttConnectionOptions()); + return factory; + } + +} diff --git a/user-service/src/main/java/com/mh/user/config/mqtt/MqttInboundConfig.java b/user-service/src/main/java/com/mh/user/config/mqtt/MqttInboundConfig.java new file mode 100644 index 0000000..27dcf3e --- /dev/null +++ b/user-service/src/main/java/com/mh/user/config/mqtt/MqttInboundConfig.java @@ -0,0 +1,91 @@ +package com.mh.user.config.mqtt; + +import com.mh.user.constants.ChannelName; +import com.mh.user.constants.MqttClientOptions; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.annotation.IntegrationComponentScan; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.endpoint.MessageProducerSupport; +import org.springframework.integration.mqtt.core.MqttPahoClientFactory; +import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; +import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; +import org.springframework.integration.mqtt.support.MqttHeaders; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHandler; + +import javax.annotation.Resource; + +/** + * @author LJF + * @version 1.0 + * @project springboot-mqtt-demo + * @description 入站配置 + * @date 2024-10-29 16:22:10 + */ +@Slf4j +@Configuration +@IntegrationComponentScan +public class MqttInboundConfig { + + @Autowired + private MqttPahoClientFactory mqttClientFactory; + + @Resource(name = ChannelName.INBOUND) + private MessageChannel inboundChannel; + + private String clientId; + + /** + * 入站适配器 + * @return + */ + @Bean(name = "adapter") + public MessageProducerSupport mqttInbound() { + MqttClientOptions options = MqttConfig.getBasicMqttClientOptions(); + // 此处初始化的时候,默认订阅了配置文件中已经写好的topic + // 如果需要订阅多个,可以自己手动订阅,会写一个addTopic()进行添加订阅 + clientId = options.getClientId() + "_consumer_" + System.currentTimeMillis(); + MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter( + clientId, + mqttClientFactory, + options.getInboundTopic().split(",")); + // System.out.println("每一次都会入站适配器吗?"+clientId); + DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter(); + // 统一是字节处理 + converter.setPayloadAsBytes(true); + // 设置消息转换器 + adapter.setConverter(converter); + // 设置qos(quality of service) + // 0:最多一次传输(消息会丢失), + // 1:至少一次传输(消息会重复), + // 2:只有当消息发送成功时才确认(消息不回丢,但延迟高)。 + adapter.setQos(0); + // 设置在接收已经订阅的主题信息后,发送给哪个通道,具体的发送方法需要翻上层的抽象类 + adapter.setOutputChannel(inboundChannel); + return adapter; + } + + /** + * 默认声明一个消息处理器,用于处理无效的消息 + * @return + */ + @Bean + @ServiceActivator(inputChannel = ChannelName.DEFAULT_BOUND) + public MessageHandler handler() { + return message -> { + log.info("The default channel does not handle messages." + + "\nTopic: {}" + + "\nPayload: {}", + message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC), + message.getPayload()); + }; + } + + public String getClientId() { + return clientId; + } + +} diff --git a/user-service/src/main/java/com/mh/user/config/mqtt/MqttMessageChannel.java b/user-service/src/main/java/com/mh/user/config/mqtt/MqttMessageChannel.java new file mode 100644 index 0000000..3a3c382 --- /dev/null +++ b/user-service/src/main/java/com/mh/user/config/mqtt/MqttMessageChannel.java @@ -0,0 +1,55 @@ +package com.mh.user.config.mqtt; + +import com.mh.user.constants.ChannelName; +import lombok.extern.slf4j.Slf4j; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.messaging.MessageChannel; + +/** + * @author LJF + * @version 1.0 + * @project springboot-mqtt-demo + * @description 声明所有通道的定义类 + * @date 2024-10-29 16:23:32 + */ +@Slf4j +@Configuration +public class MqttMessageChannel { + + @Bean(name = ChannelName.OUTBOUND) + public MessageChannel outboundChannel() { + return new DirectChannel(); + } + + @Bean(name = ChannelName.INBOUND) + public MessageChannel inboundChannel() { + return new DirectChannel(); + } + + /** + * 事件主动上报通道 + * @return + */ + @Bean(name = ChannelName.EVENTS_UPLOAD_INBOUND) + public MessageChannel eventsUploadInbound() { + return new DirectChannel(); + } + + @Bean(name = ChannelName.EVENTS_COLLECTION_INBOUND) + public MessageChannel eventsCollectionInbound() { + return new DirectChannel(); + } + + @Bean(name = ChannelName.EVENTS_CONTROL_INBOUND) + public MessageChannel eventsControlInbound() { + return new DirectChannel(); + } + + @Bean(name = ChannelName.EVENTS_SEND_INBOUND) + public MessageChannel eventsSendInbound() { + return new DirectChannel(); + } + +} diff --git a/user-service/src/main/java/com/mh/user/config/mqtt/MqttOutboundConfig.java b/user-service/src/main/java/com/mh/user/config/mqtt/MqttOutboundConfig.java new file mode 100644 index 0000000..6805a05 --- /dev/null +++ b/user-service/src/main/java/com/mh/user/config/mqtt/MqttOutboundConfig.java @@ -0,0 +1,51 @@ +package com.mh.user.config.mqtt; + +import com.mh.user.constants.ChannelName; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.annotation.IntegrationComponentScan; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.mqtt.core.MqttPahoClientFactory; +import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; +import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; +import org.springframework.messaging.MessageHandler; + +/** + * @author LJF + * @version 1.0 + * @project springboot-mqtt-demo + * @description 入站配置 + * @date 2024-10-29 16:22:10 + */ +@Slf4j +@Configuration +@IntegrationComponentScan +public class MqttOutboundConfig { + + @Autowired + private MqttPahoClientFactory mqttClientFactory; + + /** + * 默认声明一个出站处理器,用于处理无效的消息 + * @return + */ + @Bean + @ServiceActivator(inputChannel = ChannelName.OUTBOUND) + public MessageHandler mqttOutbound() { + MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler( + MqttConfig.getBasicMqttClientOptions().getClientId() + "_producer_" + System.currentTimeMillis(), + mqttClientFactory); + DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter(); + // use byte types uniformly + converter.setPayloadAsBytes(true); + + messageHandler.setAsync(true); + messageHandler.setDefaultQos(0); + messageHandler.setConverter(converter); + return messageHandler; + } + + +}