From 77124eff144362c3a778eb28a9c35f18dd0384fd Mon Sep 17 00:00:00 2001 From: 25604 Date: Thu, 21 Aug 2025 17:12:01 +0800 Subject: [PATCH] =?UTF-8?q?1=E3=80=81=E6=B7=BB=E5=8A=A0=E9=94=85=E7=82=89?= =?UTF-8?q?=E4=B8=BB=E6=9C=BA=E6=95=B0=E6=8D=AE=E7=82=B9=E4=BD=8D=E5=A4=84?= =?UTF-8?q?=E7=90=86=EF=BC=88=E7=A0=94=E5=8D=8E=E7=BD=91=E5=85=B3=E5=8F=AA?= =?UTF-8?q?=E6=94=AF=E6=8C=814=E4=B8=AAmqtt=EF=BC=8C=E7=9B=AE=E5=89=8D?= =?UTF-8?q?=E6=B2=A1=E6=9C=89=E4=BD=BF=E7=94=A8=EF=BC=89;?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/mh/common/constant/Constants.java | 2 ++ .../dealdata/DataProcessService.java | 6 ++++ .../dealdata/impl/DataProcessServiceImpl.java | 10 +++++- .../mqtt/service/impl/EventsServiceImpl.java | 3 ++ .../mh/framework/rabbitmq/RabbitMqConfig.java | 2 ++ .../rabbitmq/consumer/ReceiveHandler.java | 31 +++++++++++++++++++ .../rabbitmq/producer/SendMsgByTopic.java | 10 ++++++ .../java/com/mh/quartz/task/DealDataTask.java | 19 ++++++++++++ 8 files changed, 82 insertions(+), 1 deletion(-) diff --git a/mh-common/src/main/java/com/mh/common/constant/Constants.java b/mh-common/src/main/java/com/mh/common/constant/Constants.java index 2361767..2a741a2 100644 --- a/mh-common/src/main/java/com/mh/common/constant/Constants.java +++ b/mh-common/src/main/java/com/mh/common/constant/Constants.java @@ -183,10 +183,12 @@ public class Constants { public static final String CLOSE_HOST = "close_host_device_id"; // 关闭主机的设备id public static final String OPEN_VALVE = "open_valve_device_id"; // 开启蝶阀的设备id public static final String CHILLERS = "chillers"; + public static final String BOILER = "boiler"; // 锅炉 public static final String OTHER = "other"; public static final String DEVICE = "devices"; public static final String CHILLERS_TYPE = "0"; // 主机类型设备 public static final String OTHER_TYPE = "1"; // 其他设备 + public static final String BOILER_TYPE = "12"; // 锅炉设备 public static boolean CONTROL_WEB_FLAG = false; public static boolean SEND_STATUS = false; // 指令发送状态 public static boolean FLAG = false; diff --git a/mh-framework/src/main/java/com/mh/framework/dealdata/DataProcessService.java b/mh-framework/src/main/java/com/mh/framework/dealdata/DataProcessService.java index 9d97c08..6983306 100644 --- a/mh-framework/src/main/java/com/mh/framework/dealdata/DataProcessService.java +++ b/mh-framework/src/main/java/com/mh/framework/dealdata/DataProcessService.java @@ -113,4 +113,10 @@ public interface DataProcessService { * @param oneTwoThreeTempData */ void insertTempData(OneTwoThreeTempData oneTwoThreeTempData); + + /** + * 插入锅炉数据 + * @param boilerData + */ + void insertBoilerData(AdvantechReceiver boilerData); } diff --git a/mh-framework/src/main/java/com/mh/framework/dealdata/impl/DataProcessServiceImpl.java b/mh-framework/src/main/java/com/mh/framework/dealdata/impl/DataProcessServiceImpl.java index 5c04930..53a7cbe 100644 --- a/mh-framework/src/main/java/com/mh/framework/dealdata/impl/DataProcessServiceImpl.java +++ b/mh-framework/src/main/java/com/mh/framework/dealdata/impl/DataProcessServiceImpl.java @@ -131,8 +131,14 @@ public class DataProcessServiceImpl implements DataProcessService { insertData(data, "DEVICES_REGISTER", Constants.DEVICE); } + @Override + public void insertBoilerData(AdvantechReceiver data) { + insertData(data, "BOILER_REGISTER", Constants.BOILER); + } + + private void insertData(AdvantechReceiver data, String registerKey, String cacheKey) { - log.info("{}数据解析入库:{}", registerKey.equals("CHILLERS_REGISTER") ? "冷水机组" : "计量设备", data); + log.info("{}数据解析入库:{}", registerKey.equals("CHILLERS_REGISTER") ? "机组设备" : "计量设备", data); if (registerKey.equals("CHILLERS_REGISTER")) { databaseMapper.createChillerTable(); } else { @@ -144,6 +150,8 @@ public class DataProcessServiceImpl implements DataProcessService { if (null == registers || registers.isEmpty()) { if (registerKey.equals("CHILLERS_REGISTER")) { registers = collectionParamsManageService.queryCollectionParamsByMtType(Constants.CHILLERS_TYPE); + } else if (cacheKey.equals(Constants.BOILER) && registerKey.equals("BOILER_REGISTER") ) { + registers = collectionParamsManageService.queryCollectionParamsByMtType(Constants.BOILER_TYPE); } else { registers = collectionParamsManageService.queryCollectionParamsByMtType(Constants.OTHER_TYPE); } diff --git a/mh-framework/src/main/java/com/mh/framework/mqtt/service/impl/EventsServiceImpl.java b/mh-framework/src/main/java/com/mh/framework/mqtt/service/impl/EventsServiceImpl.java index 22538d7..9e85a35 100644 --- a/mh-framework/src/main/java/com/mh/framework/mqtt/service/impl/EventsServiceImpl.java +++ b/mh-framework/src/main/java/com/mh/framework/mqtt/service/impl/EventsServiceImpl.java @@ -90,6 +90,9 @@ public class EventsServiceImpl implements IEventsService { OneTwoThreeTempData oneTwoThreeTempData = mapper.readValue(receiver, OneTwoThreeTempData.class); log.info("主题:{},类型:{}: ,数据:{}", topic, logMessage, oneTwoThreeTempData.toString()); sendMsgByTopic.sendToTempMQ(JSONObject.toJSONString(oneTwoThreeTempData)); + } else if (topic.contains(Constants.BOILER)) { + // 锅炉系统 + sendMsgByTopic.sendToBoilerMQ(JSONObject.toJSONString(commonTopicReceiver)); } else { // 非本地主题处理 log.info("非本地主题处理: {}", topic); diff --git a/mh-framework/src/main/java/com/mh/framework/rabbitmq/RabbitMqConfig.java b/mh-framework/src/main/java/com/mh/framework/rabbitmq/RabbitMqConfig.java index 5c2b11e..5cdd60f 100644 --- a/mh-framework/src/main/java/com/mh/framework/rabbitmq/RabbitMqConfig.java +++ b/mh-framework/src/main/java/com/mh/framework/rabbitmq/RabbitMqConfig.java @@ -49,6 +49,8 @@ public class RabbitMqConfig { public static final String QUEUE_ALARM = "device.alarm.queue"; public static final String ROUTING_KEY_ALARM = "topic.alarm.eemcs.#"; + public static final String QUEUE_BOILER = "queue_boiler"; + public static final String ROUTING_KEY_BOILER = "topic.boiler.eemcs.#"; /**durable参数表示交换机是否持久化,值为true表示持久化,值为false表示不持久化。 * 在RabbitMQ中,持久化交换机会被存储在磁盘上以便在服务器重启后恢复, diff --git a/mh-framework/src/main/java/com/mh/framework/rabbitmq/consumer/ReceiveHandler.java b/mh-framework/src/main/java/com/mh/framework/rabbitmq/consumer/ReceiveHandler.java index 4108062..9558a1e 100644 --- a/mh-framework/src/main/java/com/mh/framework/rabbitmq/consumer/ReceiveHandler.java +++ b/mh-framework/src/main/java/com/mh/framework/rabbitmq/consumer/ReceiveHandler.java @@ -173,6 +173,37 @@ public class ReceiveHandler { } } + /** + * 处理锅炉系统相关设备数据 + * + * @param msg + * @param channel + * @param tag + */ + @RabbitListener(bindings = @QueueBinding( + value = @Queue(value = RabbitMqConfig.QUEUE_BOILER, durable = "true"), + exchange = @Exchange( + value = RabbitMqConfig.EXCHANGE_NAME, + ignoreDeclarationExceptions = "true", + type = ExchangeTypes.TOPIC + ), + key = {RabbitMqConfig.ROUTING_KEY_BOILER} + )) + public void receiveBoilerData(@Payload String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws InterruptedException, IOException { + try { + log.info("MQ消费者:锅炉系统设备采集:{}", msg); + //TODO 数据解析入库操作 msg转成实体类,入库 + AdvantechReceiver boilerData = JSONObject.parseObject(msg, AdvantechReceiver.class); + dataProcessService.insertBoilerData(boilerData); + // 正常执行,手动确认ack + channel.basicAck(tag, false); + } catch (Exception e) { + log.error("data:{},ddcException:{}", msg, e); + Thread.sleep(100); + channel.basicAck(tag, false); + } + } + /** * 处理设备报警延时队列数据 * diff --git a/mh-framework/src/main/java/com/mh/framework/rabbitmq/producer/SendMsgByTopic.java b/mh-framework/src/main/java/com/mh/framework/rabbitmq/producer/SendMsgByTopic.java index 5d5d005..11fe228 100644 --- a/mh-framework/src/main/java/com/mh/framework/rabbitmq/producer/SendMsgByTopic.java +++ b/mh-framework/src/main/java/com/mh/framework/rabbitmq/producer/SendMsgByTopic.java @@ -53,4 +53,14 @@ public class SendMsgByTopic { rabbitTemplate.send(RabbitMqConfig.DELAY_EXCHANGE_NAME, RabbitMqConfig.ROUTING_KEY_ALARM, msg); } + + /** + * 锅炉数据报文注入rabbitmq + * @param data + * @return + */ + public String sendToBoilerMQ(String data) { + rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_NAME,"topic.boiler.eemcs.data",data); + return "success"; + } } diff --git a/mh-quartz/src/main/java/com/mh/quartz/task/DealDataTask.java b/mh-quartz/src/main/java/com/mh/quartz/task/DealDataTask.java index 1598182..cc265ad 100644 --- a/mh-quartz/src/main/java/com/mh/quartz/task/DealDataTask.java +++ b/mh-quartz/src/main/java/com/mh/quartz/task/DealDataTask.java @@ -644,6 +644,25 @@ public class DealDataTask { } } + /** + * 处理锅炉数据获取进入chillers表 + */ + public void dealBoilerData() { + List cacheList = redisCache.getCacheList(Constants.BOILER, CollectionParamsManage.class); + if (null == cacheList || cacheList.isEmpty()) { + return; + } + //清空redis + redisCache.deleteObject(Constants.BOILER); + //处理chillers数据 + try { + //todo 处理没有对象curValue和curTime的异常 + dealChillersCollect(cacheList); + } catch (Exception e) { + log.error("处理主机参数异常:{}", e); + } + } + /** * 处理主机秒级数据,再计算主机运行时间 *