From f81769b7382cca7d1ad3d422724697075e8fcf2f Mon Sep 17 00:00:00 2001 From: 25604 Date: Mon, 2 Feb 2026 19:02:15 +0800 Subject: [PATCH] =?UTF-8?q?1=E3=80=81=E7=83=AD=E5=9B=9E=E6=94=B6=E7=B3=BB?= =?UTF-8?q?=E7=BB=9F=E8=A7=A6=E6=91=B8=E5=B1=8Fmqtt=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E6=8E=A5=E6=94=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/main/resources/application-dev.yml | 10 +-- .../com/mh/common/constant/Constants.java | 1 + .../common/model/request/AdvantechDatas.java | 7 ++ .../model/request/AdvantechJsonParser.java | 74 +++++++++++++++++++ .../dealdata/DataProcessService.java | 2 + .../dealdata/impl/DataProcessServiceImpl.java | 31 ++++++-- .../mqtt/service/impl/EventsServiceImpl.java | 3 + .../mh/framework/rabbitmq/RabbitMqConfig.java | 18 +++++ .../rabbitmq/consumer/ReceiveHandler.java | 32 ++++++++ .../rabbitmq/producer/SendMsgByTopic.java | 10 +++ 10 files changed, 178 insertions(+), 10 deletions(-) create mode 100644 mh-common/src/main/java/com/mh/common/model/request/AdvantechJsonParser.java diff --git a/mh-admin/src/main/resources/application-dev.yml b/mh-admin/src/main/resources/application-dev.yml index 2b8164f..7853e74 100644 --- a/mh-admin/src/main/resources/application-dev.yml +++ b/mh-admin/src/main/resources/application-dev.yml @@ -1,7 +1,7 @@ # 项目相关配置 mh: # 名称 - name: gh_ers + name: gh_ers_dev # 版本 version: 1.0.0 # 版权年份 @@ -191,10 +191,10 @@ mqttSpring: # BASIC parameters are required. BASIC: protocol: MQTT - host: 127.0.0.1 - port: 2883 - username: mh - password: mhtech@803 + host: mqtt.mhito.net + port: 1883 + username: sa + password: sa123 # protocol: MQTT # host: mqtt.mhito.net # port: 1883 diff --git a/mh-common/src/main/java/com/mh/common/constant/Constants.java b/mh-common/src/main/java/com/mh/common/constant/Constants.java index 81da8a3..d0119cc 100644 --- a/mh-common/src/main/java/com/mh/common/constant/Constants.java +++ b/mh-common/src/main/java/com/mh/common/constant/Constants.java @@ -191,6 +191,7 @@ public class Constants { public static final String CHILLERS_TYPE = "0"; // 主机类型设备 public static final String OTHER_TYPE = "1"; // 其他设备 public static final String BOILER_TYPE = "12"; // 锅炉设备 + public static final String ERS = "ers"; // 热回收系统 public static boolean CONTROL_WEB_FLAG = false; public static boolean SEND_STATUS = false; // 指令发送状态 public static boolean FLAG = false; diff --git a/mh-common/src/main/java/com/mh/common/model/request/AdvantechDatas.java b/mh-common/src/main/java/com/mh/common/model/request/AdvantechDatas.java index dea0bb2..7ca8c75 100644 --- a/mh-common/src/main/java/com/mh/common/model/request/AdvantechDatas.java +++ b/mh-common/src/main/java/com/mh/common/model/request/AdvantechDatas.java @@ -27,4 +27,11 @@ public class AdvantechDatas { */ private T quality; + public AdvantechDatas() {} + + public AdvantechDatas(String tag, T value) { + this.tag = tag; + this.value = value; + } + } diff --git a/mh-common/src/main/java/com/mh/common/model/request/AdvantechJsonParser.java b/mh-common/src/main/java/com/mh/common/model/request/AdvantechJsonParser.java new file mode 100644 index 0000000..3f3a2f8 --- /dev/null +++ b/mh-common/src/main/java/com/mh/common/model/request/AdvantechJsonParser.java @@ -0,0 +1,74 @@ +package com.mh.common.model.request; + +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONArray; +import com.alibaba.fastjson2.JSONObject; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * @author LJF + * @version 1.0 + * @project EEMCS + * @description 昆仑通态触摸屏数据转换 + * @date 2026-02-02 15:29:47 + */ +public class AdvantechJsonParser { + + /** + * 将JSON字符串解析为 AdvantechReceiver> + * @param json 原始MQTT JSON + * @param defaultQuality quality默认值(如192表示Good),传null则不设置 + * @return 解析后的接收对象 + */ + public static AdvantechReceiver> parse( + String json, + Number defaultQuality) { + + JSONObject root = JSON.parseObject(json); + AdvantechReceiver> receiver = new AdvantechReceiver<>(); + + // 1. 提取ts(保留原始字符串格式) + // "2026-02-02T18:33:57.712049"时间格式是这个,转成yyyy-MM-dd HH:mm:ss + root.put("ts", root.getString("ts").replace("T", " ").substring(0, 19)); + receiver.setTs(root.getString("ts")); + + // 2. 解析d数组:展开所有键值对 → AdvantechDatas + List> dataList = new ArrayList<>(); + JSONArray dArray = root.getJSONArray("d"); + + if (dArray != null) { + for (int i = 0; i < dArray.size(); i++) { + JSONObject item = dArray.getJSONObject(i); + if (item == null || item.isEmpty()) continue; + + for (Map.Entry entry : item.entrySet()) { + String tag = entry.getKey(); + Object val = entry.getValue(); + + if (val instanceof Number) { + AdvantechDatas data = new AdvantechDatas<>(); + data.setTag(tag); + data.setValue((Number) val); + if (defaultQuality != null) { + data.setQuality(defaultQuality); + } + dataList.add(data); + } + // 非数值字段自动跳过(可扩展日志) + } + } + } + + receiver.setD(dataList); + return receiver; + } + + // 便捷重载:不设置quality + public static AdvantechReceiver> parse(String json) { + return parse(json, 0); + } + +} diff --git a/mh-framework/src/main/java/com/mh/framework/dealdata/DataProcessService.java b/mh-framework/src/main/java/com/mh/framework/dealdata/DataProcessService.java index 6983306..1adf3dc 100644 --- a/mh-framework/src/main/java/com/mh/framework/dealdata/DataProcessService.java +++ b/mh-framework/src/main/java/com/mh/framework/dealdata/DataProcessService.java @@ -119,4 +119,6 @@ public interface DataProcessService { * @param boilerData */ void insertBoilerData(AdvantechReceiver boilerData); + + void insertERSData(AdvantechReceiver boilerData); } diff --git a/mh-framework/src/main/java/com/mh/framework/dealdata/impl/DataProcessServiceImpl.java b/mh-framework/src/main/java/com/mh/framework/dealdata/impl/DataProcessServiceImpl.java index c2187a2..da7a671 100644 --- a/mh-framework/src/main/java/com/mh/framework/dealdata/impl/DataProcessServiceImpl.java +++ b/mh-framework/src/main/java/com/mh/framework/dealdata/impl/DataProcessServiceImpl.java @@ -137,6 +137,10 @@ public class DataProcessServiceImpl implements DataProcessService { insertData(data, "BOILER_REGISTER", Constants.BOILER); } + @Override + public void insertERSData(AdvantechReceiver data) { + insertData(data, "ERS_REGISTER", Constants.ERS); + } private void insertData(AdvantechReceiver data, String registerKey, String cacheKey) { log.info("{}数据解析入库:{}", registerKey.equals("CHILLERS_REGISTER") ? "机组设备" : "计量设备", data); @@ -192,13 +196,30 @@ public class DataProcessServiceImpl implements DataProcessService { ); } - String dString = data.getD().toString(); - // 替换掉inf - if (dString.contains("inf")) { - dString = dString.replace("inf", "0"); + List list; + Object dObject = data.getD(); + + if (dObject instanceof List) { + // 如果已经是List类型,直接转换 + @SuppressWarnings("unchecked") + List tempList = (List) dObject; + list = tempList; + } else { + // 如果是其他类型(如String),则进行JSON解析 + String dString = dObject.toString(); + // 替换掉inf + if (dString.contains("inf")) { + dString = dString.replace("inf", "0"); + } + + try { + list = JSON.parseObject(dString, new TypeReference>() {}); + } catch (Exception e) { + log.error("JSON解析失败,原始数据: {}", dString, e); + list = new ArrayList<>(); + } } // 假设 data 是一个包含 JSON 数据的对象 - List list = JSON.parseObject(dString, new TypeReference>() {}); for (AdvantechDatas advantechDatas : list) { String tag = advantechDatas.getTag(); String value = String.valueOf(advantechDatas.getValue()); diff --git a/mh-framework/src/main/java/com/mh/framework/mqtt/service/impl/EventsServiceImpl.java b/mh-framework/src/main/java/com/mh/framework/mqtt/service/impl/EventsServiceImpl.java index 9e85a35..fc0b4c1 100644 --- a/mh-framework/src/main/java/com/mh/framework/mqtt/service/impl/EventsServiceImpl.java +++ b/mh-framework/src/main/java/com/mh/framework/mqtt/service/impl/EventsServiceImpl.java @@ -93,6 +93,9 @@ public class EventsServiceImpl implements IEventsService { } else if (topic.contains(Constants.BOILER)) { // 锅炉系统 sendMsgByTopic.sendToBoilerMQ(JSONObject.toJSONString(commonTopicReceiver)); + } else if (topic.contains(Constants.ERS)) { + // 热回收系统:针对昆仑通态触摸屏系统 + sendMsgByTopic.sendToKunLunTDMQ(JSONObject.toJSONString(commonTopicReceiver)); } else { // 非本地主题处理 log.info("非本地主题处理: {}", topic); diff --git a/mh-framework/src/main/java/com/mh/framework/rabbitmq/RabbitMqConfig.java b/mh-framework/src/main/java/com/mh/framework/rabbitmq/RabbitMqConfig.java index 5cdd60f..1995831 100644 --- a/mh-framework/src/main/java/com/mh/framework/rabbitmq/RabbitMqConfig.java +++ b/mh-framework/src/main/java/com/mh/framework/rabbitmq/RabbitMqConfig.java @@ -52,6 +52,24 @@ public class RabbitMqConfig { public static final String QUEUE_BOILER = "queue_boiler"; public static final String ROUTING_KEY_BOILER = "topic.boiler.eemcs.#"; + // ERS队列:广合二厂热回收系统昆仑通态触摸屏 + public static final String QUEUE_ERS = "queue_ers"; + public static final String ROUTING_KEY_ERS = "topic.ers.eemcs.#"; + + /**热回收系统昆仑通态队列绑定交换机*/ + @Bean(ROUTING_KEY_ERS) + public Binding ersBinding(@Qualifier(QUEUE_ERS) Queue queue, + @Qualifier(EXCHANGE_NAME) Exchange exchange){ + return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY_ERS).noargs(); + } + + /**热回收系统昆仑通态触摸屏队列*/ + @Bean(QUEUE_ERS) + public Queue ersQueue(){ + return new Queue(QUEUE_ERS); + } + + /**durable参数表示交换机是否持久化,值为true表示持久化,值为false表示不持久化。 * 在RabbitMQ中,持久化交换机会被存储在磁盘上以便在服务器重启后恢复, * 而非持久化交换机则只存在于内存中,服务器重启后会丢失*/ diff --git a/mh-framework/src/main/java/com/mh/framework/rabbitmq/consumer/ReceiveHandler.java b/mh-framework/src/main/java/com/mh/framework/rabbitmq/consumer/ReceiveHandler.java index 9558a1e..b2ff82c 100644 --- a/mh-framework/src/main/java/com/mh/framework/rabbitmq/consumer/ReceiveHandler.java +++ b/mh-framework/src/main/java/com/mh/framework/rabbitmq/consumer/ReceiveHandler.java @@ -2,6 +2,7 @@ package com.mh.framework.rabbitmq.consumer; import com.alibaba.fastjson2.JSONObject; import com.mh.common.core.redis.RedisCache; +import com.mh.common.model.request.AdvantechJsonParser; import com.mh.common.model.request.AdvantechReceiver; import com.mh.common.model.request.OneTwoThreeTempData; import com.mh.framework.dealdata.DataProcessService; @@ -41,6 +42,37 @@ public class ReceiveHandler { private static final String ALARM_CANCEL_PREFIX = "alarm:cancel:"; + /** + * 处理热回收系统:昆仑通态触摸屏数据相关设备数据 + * + * @param msg + * @param channel + * @param tag + */ + @RabbitListener(bindings = @QueueBinding( + value = @Queue(value = RabbitMqConfig.QUEUE_ERS, durable = "true"), + exchange = @Exchange( + value = RabbitMqConfig.EXCHANGE_NAME, + ignoreDeclarationExceptions = "true", + type = ExchangeTypes.TOPIC + ), + key = {RabbitMqConfig.ROUTING_KEY_ERS} + )) + public void receiveERSData(@Payload String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws InterruptedException, IOException { + try { + log.info("MQ消费者:热回收系统设备采集:{}", msg); + //TODO 数据解析入库操作 msg转成实体类,入库 + AdvantechReceiver boilerData = AdvantechJsonParser.parse(msg); + dataProcessService.insertERSData(boilerData); + // 正常执行,手动确认ack + channel.basicAck(tag, false); + } catch (Exception e) { + log.error("data:{},ddcException:{}", msg, e); + Thread.sleep(100); + channel.basicAck(tag, false); + } + } + /** * 监听主机参数 * queues:指定监听的队列名,可以接收单个队列,也可以接收多个队列的数组或列表。 diff --git a/mh-framework/src/main/java/com/mh/framework/rabbitmq/producer/SendMsgByTopic.java b/mh-framework/src/main/java/com/mh/framework/rabbitmq/producer/SendMsgByTopic.java index 11fe228..62de3ec 100644 --- a/mh-framework/src/main/java/com/mh/framework/rabbitmq/producer/SendMsgByTopic.java +++ b/mh-framework/src/main/java/com/mh/framework/rabbitmq/producer/SendMsgByTopic.java @@ -63,4 +63,14 @@ public class SendMsgByTopic { rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_NAME,"topic.boiler.eemcs.data",data); return "success"; } + + /** + * 昆仑通态触摸屏数据报文注入rabbitmq + * @param data + * @return + */ + public String sendToKunLunTDMQ(String data) { + rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_NAME,"topic.ers.eemcs.data",data); + return "success"; + } }