24 changed files with 331 additions and 63 deletions
@ -0,0 +1,59 @@
|
||||
package com.mh.common.constant; |
||||
|
||||
import java.util.Arrays; |
||||
import java.util.regex.Pattern; |
||||
|
||||
import static com.mh.common.constant.TopicConst.*; |
||||
|
||||
|
||||
/** |
||||
* @author ljf |
||||
* @version 1.0 |
||||
* @description: TODO |
||||
* @date 2024/11/06 14:28 |
||||
*/ |
||||
public enum TopicEnum { |
||||
|
||||
/** |
||||
* 客户端主动上报数据 |
||||
*/ |
||||
CLIENT_UPLOAD_DATA(Pattern.compile("^" + MH_UPLOAD + EVENTS_UPLOAD + REGEX_SN + "$"), ChannelName.EVENTS_UPLOAD_INBOUND), |
||||
|
||||
/** |
||||
* 服务端采集数据 |
||||
*/ |
||||
SERVER_COLLECTION_DATA(Pattern.compile("^" + MH_COLLECTION + EVENTS_COLLECTION + REGEX_SN + "$"), ChannelName.EVENTS_COLLECTION_INBOUND), |
||||
|
||||
/** |
||||
* 服务端控制指令 |
||||
*/ |
||||
SERVER_CONTROL_DATA(Pattern.compile("^" + MH_CONTROL + EVENTS_CONTROL + REGEX_SN + "$"), ChannelName.EVENTS_CONTROL_INBOUND), |
||||
|
||||
/** |
||||
* 订阅服务端发送的主题命令 |
||||
*/ |
||||
SERVER_SEND_DATA(Pattern.compile("^A/cmd/ctl/send" + "$"), ChannelName.EVENTS_SEND_INBOUND), |
||||
|
||||
UNKNOWN(Pattern.compile("^.*$"), ChannelName.DEFAULT_BOUND); |
||||
|
||||
final Pattern pattern; |
||||
|
||||
final String beanName; |
||||
|
||||
TopicEnum(Pattern pattern, String beanName) { |
||||
this.pattern = pattern; |
||||
this.beanName = beanName; |
||||
} |
||||
|
||||
public Pattern getPattern() { |
||||
return pattern; |
||||
} |
||||
|
||||
public String getBeanName() { |
||||
return beanName; |
||||
} |
||||
|
||||
public static TopicEnum find(String topic) { |
||||
return Arrays.stream(TopicEnum.values()).filter(topicEnum -> topicEnum.pattern.matcher(topic).matches()).findAny().orElse(UNKNOWN); |
||||
} |
||||
} |
@ -0,0 +1,25 @@
|
||||
package com.mh.common.model.request; |
||||
|
||||
import lombok.Data; |
||||
|
||||
/** |
||||
* @author LJF |
||||
* @version 1.0 |
||||
* @project EEMCS |
||||
* @description 研华数据体 |
||||
* @date 2025-01-22 14:47:25 |
||||
*/ |
||||
@Data |
||||
public class AdvantechDatas { |
||||
|
||||
/** |
||||
* 对应研华的标签值 |
||||
*/ |
||||
private String tag; |
||||
|
||||
/** |
||||
* 上报值 |
||||
*/ |
||||
private String value; |
||||
|
||||
} |
@ -0,0 +1,29 @@
|
||||
package com.mh.common.model.request; |
||||
|
||||
import com.fasterxml.jackson.annotation.JsonIgnoreProperties; |
||||
import lombok.Data; |
||||
|
||||
import java.util.List; |
||||
|
||||
/** |
||||
* @author LJF |
||||
* @version 1.0 |
||||
* @project EEMCS |
||||
* @description 研华网关发送接收数据 |
||||
* @date 2025-01-22 14:43:15 |
||||
*/ |
||||
@Data |
||||
@JsonIgnoreProperties(ignoreUnknown = true) |
||||
public class AdvantechReceiver<T> { |
||||
|
||||
/** |
||||
* 数据集合 |
||||
*/ |
||||
private List<T> d; |
||||
|
||||
/** |
||||
* 主动上报数据时间(带T类型) |
||||
*/ |
||||
private String ts; |
||||
|
||||
} |
@ -0,0 +1,29 @@
|
||||
package com.mh.common.model.response; |
||||
|
||||
import com.fasterxml.jackson.annotation.JsonIgnoreProperties; |
||||
import lombok.Data; |
||||
|
||||
import java.util.List; |
||||
|
||||
/** |
||||
* @author LJF |
||||
* @version 1.0 |
||||
* @project EEMCS |
||||
* @description 研华网关发送接收数据 |
||||
* @date 2025-01-22 14:43:15 |
||||
*/ |
||||
@Data |
||||
@JsonIgnoreProperties(ignoreUnknown = true) |
||||
public class AdvantechResponse<T> { |
||||
|
||||
/** |
||||
* 数据集合 |
||||
*/ |
||||
private List<T> w; |
||||
|
||||
/** |
||||
* 主动上报数据时间(带T类型) |
||||
*/ |
||||
private String ts; |
||||
|
||||
} |
@ -1,4 +1,4 @@
|
||||
package com.mh.common.config.mqtt; |
||||
package com.mh.framework.mqtt.config; |
||||
|
||||
import com.mh.common.enums.MqttClientOptions; |
||||
import com.mh.common.enums.MqttProtocolEnum; |
@ -1,4 +1,4 @@
|
||||
package com.mh.common.config.mqtt; |
||||
package com.mh.framework.mqtt.config; |
||||
|
||||
import com.mh.common.constant.ChannelName; |
||||
import lombok.extern.slf4j.Slf4j; |
@ -0,0 +1,47 @@
|
||||
package com.mh.framework.mqtt.handler; |
||||
|
||||
import com.mh.common.constant.ChannelName; |
||||
import com.mh.common.constant.TopicEnum; |
||||
import com.mh.common.utils.spring.SpringUtils; |
||||
import lombok.extern.slf4j.Slf4j; |
||||
import org.springframework.integration.annotation.Router; |
||||
import org.springframework.integration.mqtt.support.MqttHeaders; |
||||
import org.springframework.integration.router.AbstractMessageRouter; |
||||
import org.springframework.messaging.Message; |
||||
import org.springframework.messaging.MessageChannel; |
||||
import org.springframework.messaging.MessageHeaders; |
||||
import org.springframework.stereotype.Component; |
||||
|
||||
import java.util.Collection; |
||||
import java.util.Collections; |
||||
import java.util.Objects; |
||||
|
||||
/** |
||||
* @author LJF |
||||
* @version 1.0 |
||||
* @project springboot-mqtt-demo |
||||
* @description 入站消息路由分发中心 |
||||
* @date 2024-10-29 17:04:17 |
||||
*/ |
||||
@Slf4j |
||||
@Component |
||||
public class InboundMessageRouter extends AbstractMessageRouter { |
||||
|
||||
/** |
||||
* 目前只需要这个方式,后期在拓展使用IntegrationFlow方式 |
||||
* @param message |
||||
* @return |
||||
*/ |
||||
@Router(inputChannel = ChannelName.INBOUND) |
||||
@Override |
||||
protected Collection<MessageChannel> determineTargetChannels(Message<?> message) { |
||||
MessageHeaders headers = message.getHeaders(); |
||||
String topic = Objects.requireNonNull(headers.get(MqttHeaders.RECEIVED_TOPIC)).toString(); |
||||
byte[] payload = (byte[]) message.getPayload(); |
||||
log.info("从当前主题 topic: {}, 接收到的消息:{}", topic, new String(payload)); |
||||
// 找到对应的主题消息通道
|
||||
TopicEnum topicEnum = TopicEnum.find(topic); |
||||
MessageChannel bean = (MessageChannel) SpringUtils.getBean(topicEnum.getBeanName()); |
||||
return Collections.singleton(bean); |
||||
} |
||||
} |
@ -1,8 +1,8 @@
|
||||
package com.mh.system.service.mqtt; |
||||
package com.mh.framework.mqtt.service; |
||||
|
||||
import com.fasterxml.jackson.core.type.TypeReference; |
||||
import com.mh.user.model.response.CommonTopicResponse; |
||||
import com.mh.user.model.response.ServiceReply; |
||||
import com.mh.common.model.response.CommonTopicResponse; |
||||
import com.mh.common.model.response.ServiceReply; |
||||
|
||||
/** |
||||
* @author LJF |
@ -1,4 +1,4 @@
|
||||
package com.mh.system.service.mqtt; |
||||
package com.mh.framework.mqtt.service; |
||||
|
||||
import org.springframework.integration.mqtt.support.MqttHeaders; |
||||
import org.springframework.messaging.handler.annotation.Header; |
@ -1,11 +1,10 @@
|
||||
package com.mh.system.service.mqtt.impl; |
||||
package com.mh.framework.mqtt.service.impl; |
||||
|
||||
import com.mh.user.service.mqtt.IMqttTopicService; |
||||
import com.mh.framework.mqtt.service.IMqttTopicService; |
||||
import jakarta.annotation.Resource; |
||||
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; |
||||
import org.springframework.stereotype.Service; |
||||
|
||||
import javax.annotation.Resource; |
||||
|
||||
/** |
||||
* @author LJF |
||||
* @version 1.0 |
Loading…
Reference in new issue