5 changed files with 358 additions and 0 deletions
@ -0,0 +1,62 @@
|
||||
package com.mh.user.config.mqtt; |
||||
|
||||
import com.mh.user.config.MHConfig; |
||||
import com.mh.user.constants.ChannelName; |
||||
import com.mh.user.constants.TopicEnum; |
||||
import com.mh.user.utils.SpringBeanUtil; |
||||
import lombok.extern.slf4j.Slf4j; |
||||
import org.springframework.beans.factory.annotation.Autowired; |
||||
import org.springframework.integration.annotation.Router; |
||||
import org.springframework.integration.mqtt.support.MqttHeaders; |
||||
import org.springframework.integration.router.AbstractMessageRouter; |
||||
import org.springframework.messaging.Message; |
||||
import org.springframework.messaging.MessageChannel; |
||||
import org.springframework.messaging.MessageHeaders; |
||||
import org.springframework.stereotype.Component; |
||||
|
||||
import java.util.Collection; |
||||
import java.util.Collections; |
||||
import java.util.Objects; |
||||
|
||||
/** |
||||
* @author LJF |
||||
* @version 1.0 |
||||
* @project springboot-mqtt-demo |
||||
* @description 入站消息路由分发中心 |
||||
* @date 2024-10-29 17:04:17 |
||||
*/ |
||||
@Slf4j |
||||
@Component |
||||
public class InboundMessageRouter extends AbstractMessageRouter { |
||||
|
||||
/** 系统基础配置 */ |
||||
@Autowired |
||||
private MHConfig mHConfig; |
||||
|
||||
/** |
||||
* 目前只需要这个方式,后期在拓展使用IntegrationFlow方式 |
||||
* @param message |
||||
* @return |
||||
*/ |
||||
@Router(inputChannel = ChannelName.INBOUND) |
||||
@Override |
||||
protected Collection<MessageChannel> determineTargetChannels(Message<?> message) { |
||||
MessageHeaders headers = message.getHeaders(); |
||||
String topic = Objects.requireNonNull(headers.get(MqttHeaders.RECEIVED_TOPIC)).toString(); |
||||
// byte[] payload = (byte[]) message.getPayload();
|
||||
// log.info("从当前主题 topic: {}, 接收到的消息:{}", topic, new String(payload));
|
||||
// 判断当前主题是否是当前项目的,温湿度目前写死的
|
||||
if (!topic.startsWith(mHConfig.getName()) && !topic.contains("/temp")) { |
||||
log.info("当前主题 topic: {} 不是当前项目的,直接丢弃", topic); |
||||
return Collections.singleton((MessageChannel) SpringBeanUtil.getBean(ChannelName.DEFAULT_BOUND)); |
||||
} |
||||
// 找到对应的主题消息通道
|
||||
if (topic.contains("/temp")) { |
||||
return Collections.singleton((MessageChannel) SpringBeanUtil.getBean(ChannelName.EVENTS_UPLOAD_INBOUND)); |
||||
} else { |
||||
TopicEnum topicEnum = TopicEnum.find(mHConfig.getName() + "/", topic); |
||||
MessageChannel bean = (MessageChannel) SpringBeanUtil.getBean(topicEnum.getBeanName()); |
||||
return Collections.singleton(bean); |
||||
} |
||||
} |
||||
} |
||||
@ -0,0 +1,99 @@
|
||||
package com.mh.user.config.mqtt; |
||||
|
||||
import com.mh.user.constants.MqttClientOptions; |
||||
import com.mh.user.constants.MqttProtocolEnum; |
||||
import com.mh.user.constants.MqttUseEnum; |
||||
import lombok.Data; |
||||
import org.eclipse.paho.client.mqttv3.MqttConnectOptions; |
||||
import org.springframework.boot.context.properties.ConfigurationProperties; |
||||
import org.springframework.context.annotation.Bean; |
||||
import org.springframework.context.annotation.Configuration; |
||||
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; |
||||
import org.springframework.integration.mqtt.core.MqttPahoClientFactory; |
||||
import org.springframework.util.StringUtils; |
||||
|
||||
import java.util.Map; |
||||
|
||||
/** |
||||
* @author LJF |
||||
* @version 1.0 |
||||
* @project springboot-mqtt-demo |
||||
* @description mqtt连接配置 |
||||
* @date 2024-10-29 14:44:51 |
||||
*/ |
||||
@Configuration |
||||
@Data |
||||
@ConfigurationProperties |
||||
public class MqttConfig { |
||||
|
||||
private static Map<MqttUseEnum, MqttClientOptions> mqttSpring; |
||||
|
||||
public void setMqttSpring(Map<MqttUseEnum, MqttClientOptions> mqtt) { |
||||
MqttConfig.mqttSpring = mqtt; |
||||
} |
||||
|
||||
/** |
||||
* 获取mqtt基本配置 |
||||
* @return |
||||
*/ |
||||
static MqttClientOptions getBasicMqttClientOptions() { |
||||
if (!mqttSpring.containsKey(MqttUseEnum.BASIC)) { |
||||
throw new Error("请先配置MQTT的基本连接参数,否则无法启动项目"); |
||||
} |
||||
return mqttSpring.get(MqttUseEnum.BASIC); |
||||
} |
||||
|
||||
/** |
||||
* 拼接获取对应mqtt的连接地址 |
||||
* @param options |
||||
* @return |
||||
*/ |
||||
public static String getMqttAddress(MqttClientOptions options) { |
||||
StringBuilder addr = new StringBuilder(); |
||||
addr.append(options.getProtocol().getProtocolAddr()) |
||||
.append(options.getHost().trim()) |
||||
.append(":") |
||||
.append(options.getPort()); |
||||
if ((options.getProtocol() == MqttProtocolEnum.WS || options.getProtocol() == MqttProtocolEnum.WSS) |
||||
&& StringUtils.hasText(options.getPath())) { |
||||
addr.append(options.getPath()); |
||||
} |
||||
return addr.toString(); |
||||
} |
||||
|
||||
public static String getBasicMqttAddress() { |
||||
return getMqttAddress(getBasicMqttClientOptions()); |
||||
} |
||||
|
||||
/** |
||||
* 获取连接参数,注入到spring中 |
||||
* @return |
||||
*/ |
||||
@Bean |
||||
public MqttConnectOptions mqttConnectionOptions() { |
||||
|
||||
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); |
||||
|
||||
MqttClientOptions customizeOptions = getBasicMqttClientOptions(); |
||||
String basicMqttAddress = getBasicMqttAddress(); |
||||
mqttConnectOptions.setServerURIs(new String[]{basicMqttAddress}); |
||||
mqttConnectOptions.setUserName(StringUtils.hasText(customizeOptions.getUsername()) ? |
||||
customizeOptions.getUsername() : ""); |
||||
mqttConnectOptions.setPassword(StringUtils.hasText(customizeOptions.getPassword()) ? |
||||
customizeOptions.getPassword().toCharArray() : new char[0]); |
||||
// 直接进行自动连接
|
||||
mqttConnectOptions.setAutomaticReconnect(true); |
||||
// 时间间隔时间10s
|
||||
mqttConnectOptions.setKeepAliveInterval(10); |
||||
|
||||
return mqttConnectOptions; |
||||
} |
||||
|
||||
@Bean |
||||
public MqttPahoClientFactory mqttClientFactory() { |
||||
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); |
||||
factory.setConnectionOptions(mqttConnectionOptions()); |
||||
return factory; |
||||
} |
||||
|
||||
} |
||||
@ -0,0 +1,91 @@
|
||||
package com.mh.user.config.mqtt; |
||||
|
||||
import com.mh.user.constants.ChannelName; |
||||
import com.mh.user.constants.MqttClientOptions; |
||||
import lombok.extern.slf4j.Slf4j; |
||||
import org.springframework.beans.factory.annotation.Autowired; |
||||
import org.springframework.context.annotation.Bean; |
||||
import org.springframework.context.annotation.Configuration; |
||||
import org.springframework.integration.annotation.IntegrationComponentScan; |
||||
import org.springframework.integration.annotation.ServiceActivator; |
||||
import org.springframework.integration.endpoint.MessageProducerSupport; |
||||
import org.springframework.integration.mqtt.core.MqttPahoClientFactory; |
||||
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; |
||||
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; |
||||
import org.springframework.integration.mqtt.support.MqttHeaders; |
||||
import org.springframework.messaging.MessageChannel; |
||||
import org.springframework.messaging.MessageHandler; |
||||
|
||||
import javax.annotation.Resource; |
||||
|
||||
/** |
||||
* @author LJF |
||||
* @version 1.0 |
||||
* @project springboot-mqtt-demo |
||||
* @description 入站配置 |
||||
* @date 2024-10-29 16:22:10 |
||||
*/ |
||||
@Slf4j |
||||
@Configuration |
||||
@IntegrationComponentScan |
||||
public class MqttInboundConfig { |
||||
|
||||
@Autowired |
||||
private MqttPahoClientFactory mqttClientFactory; |
||||
|
||||
@Resource(name = ChannelName.INBOUND) |
||||
private MessageChannel inboundChannel; |
||||
|
||||
private String clientId; |
||||
|
||||
/** |
||||
* 入站适配器 |
||||
* @return |
||||
*/ |
||||
@Bean(name = "adapter") |
||||
public MessageProducerSupport mqttInbound() { |
||||
MqttClientOptions options = MqttConfig.getBasicMqttClientOptions(); |
||||
// 此处初始化的时候,默认订阅了配置文件中已经写好的topic
|
||||
// 如果需要订阅多个,可以自己手动订阅,会写一个addTopic()进行添加订阅
|
||||
clientId = options.getClientId() + "_consumer_" + System.currentTimeMillis(); |
||||
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter( |
||||
clientId, |
||||
mqttClientFactory, |
||||
options.getInboundTopic().split(",")); |
||||
// System.out.println("每一次都会入站适配器吗?"+clientId);
|
||||
DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter(); |
||||
// 统一是字节处理
|
||||
converter.setPayloadAsBytes(true); |
||||
// 设置消息转换器
|
||||
adapter.setConverter(converter); |
||||
// 设置qos(quality of service)
|
||||
// 0:最多一次传输(消息会丢失),
|
||||
// 1:至少一次传输(消息会重复),
|
||||
// 2:只有当消息发送成功时才确认(消息不回丢,但延迟高)。
|
||||
adapter.setQos(0); |
||||
// 设置在接收已经订阅的主题信息后,发送给哪个通道,具体的发送方法需要翻上层的抽象类
|
||||
adapter.setOutputChannel(inboundChannel); |
||||
return adapter; |
||||
} |
||||
|
||||
/** |
||||
* 默认声明一个消息处理器,用于处理无效的消息 |
||||
* @return |
||||
*/ |
||||
@Bean |
||||
@ServiceActivator(inputChannel = ChannelName.DEFAULT_BOUND) |
||||
public MessageHandler handler() { |
||||
return message -> { |
||||
log.info("The default channel does not handle messages." + |
||||
"\nTopic: {}" + |
||||
"\nPayload: {}", |
||||
message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC), |
||||
message.getPayload()); |
||||
}; |
||||
} |
||||
|
||||
public String getClientId() { |
||||
return clientId; |
||||
} |
||||
|
||||
} |
||||
@ -0,0 +1,55 @@
|
||||
package com.mh.user.config.mqtt; |
||||
|
||||
import com.mh.user.constants.ChannelName; |
||||
import lombok.extern.slf4j.Slf4j; |
||||
import org.springframework.context.annotation.Bean; |
||||
import org.springframework.context.annotation.Configuration; |
||||
import org.springframework.integration.channel.DirectChannel; |
||||
import org.springframework.messaging.MessageChannel; |
||||
|
||||
/** |
||||
* @author LJF |
||||
* @version 1.0 |
||||
* @project springboot-mqtt-demo |
||||
* @description 声明所有通道的定义类 |
||||
* @date 2024-10-29 16:23:32 |
||||
*/ |
||||
@Slf4j |
||||
@Configuration |
||||
public class MqttMessageChannel { |
||||
|
||||
@Bean(name = ChannelName.OUTBOUND) |
||||
public MessageChannel outboundChannel() { |
||||
return new DirectChannel(); |
||||
} |
||||
|
||||
@Bean(name = ChannelName.INBOUND) |
||||
public MessageChannel inboundChannel() { |
||||
return new DirectChannel(); |
||||
} |
||||
|
||||
/** |
||||
* 事件主动上报通道 |
||||
* @return |
||||
*/ |
||||
@Bean(name = ChannelName.EVENTS_UPLOAD_INBOUND) |
||||
public MessageChannel eventsUploadInbound() { |
||||
return new DirectChannel(); |
||||
} |
||||
|
||||
@Bean(name = ChannelName.EVENTS_COLLECTION_INBOUND) |
||||
public MessageChannel eventsCollectionInbound() { |
||||
return new DirectChannel(); |
||||
} |
||||
|
||||
@Bean(name = ChannelName.EVENTS_CONTROL_INBOUND) |
||||
public MessageChannel eventsControlInbound() { |
||||
return new DirectChannel(); |
||||
} |
||||
|
||||
@Bean(name = ChannelName.EVENTS_SEND_INBOUND) |
||||
public MessageChannel eventsSendInbound() { |
||||
return new DirectChannel(); |
||||
} |
||||
|
||||
} |
||||
@ -0,0 +1,51 @@
|
||||
package com.mh.user.config.mqtt; |
||||
|
||||
import com.mh.user.constants.ChannelName; |
||||
import lombok.extern.slf4j.Slf4j; |
||||
import org.springframework.beans.factory.annotation.Autowired; |
||||
import org.springframework.context.annotation.Bean; |
||||
import org.springframework.context.annotation.Configuration; |
||||
import org.springframework.integration.annotation.IntegrationComponentScan; |
||||
import org.springframework.integration.annotation.ServiceActivator; |
||||
import org.springframework.integration.mqtt.core.MqttPahoClientFactory; |
||||
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; |
||||
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; |
||||
import org.springframework.messaging.MessageHandler; |
||||
|
||||
/** |
||||
* @author LJF |
||||
* @version 1.0 |
||||
* @project springboot-mqtt-demo |
||||
* @description 入站配置 |
||||
* @date 2024-10-29 16:22:10 |
||||
*/ |
||||
@Slf4j |
||||
@Configuration |
||||
@IntegrationComponentScan |
||||
public class MqttOutboundConfig { |
||||
|
||||
@Autowired |
||||
private MqttPahoClientFactory mqttClientFactory; |
||||
|
||||
/** |
||||
* 默认声明一个出站处理器,用于处理无效的消息 |
||||
* @return |
||||
*/ |
||||
@Bean |
||||
@ServiceActivator(inputChannel = ChannelName.OUTBOUND) |
||||
public MessageHandler mqttOutbound() { |
||||
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler( |
||||
MqttConfig.getBasicMqttClientOptions().getClientId() + "_producer_" + System.currentTimeMillis(), |
||||
mqttClientFactory); |
||||
DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter(); |
||||
// use byte types uniformly
|
||||
converter.setPayloadAsBytes(true); |
||||
|
||||
messageHandler.setAsync(true); |
||||
messageHandler.setDefaultQos(0); |
||||
messageHandler.setConverter(converter); |
||||
return messageHandler; |
||||
} |
||||
|
||||
|
||||
} |
||||
Loading…
Reference in new issue