Browse Source

1、热回收系统触摸屏mqtt数据接收

dev_gh_ers
25604 3 weeks ago
parent
commit
f81769b738
  1. 10
      mh-admin/src/main/resources/application-dev.yml
  2. 1
      mh-common/src/main/java/com/mh/common/constant/Constants.java
  3. 7
      mh-common/src/main/java/com/mh/common/model/request/AdvantechDatas.java
  4. 74
      mh-common/src/main/java/com/mh/common/model/request/AdvantechJsonParser.java
  5. 2
      mh-framework/src/main/java/com/mh/framework/dealdata/DataProcessService.java
  6. 25
      mh-framework/src/main/java/com/mh/framework/dealdata/impl/DataProcessServiceImpl.java
  7. 3
      mh-framework/src/main/java/com/mh/framework/mqtt/service/impl/EventsServiceImpl.java
  8. 18
      mh-framework/src/main/java/com/mh/framework/rabbitmq/RabbitMqConfig.java
  9. 32
      mh-framework/src/main/java/com/mh/framework/rabbitmq/consumer/ReceiveHandler.java
  10. 10
      mh-framework/src/main/java/com/mh/framework/rabbitmq/producer/SendMsgByTopic.java

10
mh-admin/src/main/resources/application-dev.yml

@ -1,7 +1,7 @@
# 项目相关配置
mh:
# 名称
name: gh_ers
name: gh_ers_dev
# 版本
version: 1.0.0
# 版权年份
@ -191,10 +191,10 @@ mqttSpring:
# BASIC parameters are required.
BASIC:
protocol: MQTT
host: 127.0.0.1
port: 2883
username: mh
password: mhtech@803
host: mqtt.mhito.net
port: 1883
username: sa
password: sa123
# protocol: MQTT
# host: mqtt.mhito.net
# port: 1883

1
mh-common/src/main/java/com/mh/common/constant/Constants.java

@ -191,6 +191,7 @@ public class Constants {
public static final String CHILLERS_TYPE = "0"; // 主机类型设备
public static final String OTHER_TYPE = "1"; // 其他设备
public static final String BOILER_TYPE = "12"; // 锅炉设备
public static final String ERS = "ers"; // 热回收系统
public static boolean CONTROL_WEB_FLAG = false;
public static boolean SEND_STATUS = false; // 指令发送状态
public static boolean FLAG = false;

7
mh-common/src/main/java/com/mh/common/model/request/AdvantechDatas.java

@ -27,4 +27,11 @@ public class AdvantechDatas<T extends Number> {
*/
private T quality;
public AdvantechDatas() {}
public AdvantechDatas(String tag, T value) {
this.tag = tag;
this.value = value;
}
}

74
mh-common/src/main/java/com/mh/common/model/request/AdvantechJsonParser.java

@ -0,0 +1,74 @@
package com.mh.common.model.request;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* @author LJF
* @version 1.0
* @project EEMCS
* @description 昆仑通态触摸屏数据转换
* @date 2026-02-02 15:29:47
*/
public class AdvantechJsonParser {
/**
* 将JSON字符串解析为 AdvantechReceiver<AdvantechDatas<Number>>
* @param json 原始MQTT JSON
* @param defaultQuality quality默认值如192表示Good传null则不设置
* @return 解析后的接收对象
*/
public static AdvantechReceiver<AdvantechDatas<Number>> parse(
String json,
Number defaultQuality) {
JSONObject root = JSON.parseObject(json);
AdvantechReceiver<AdvantechDatas<Number>> receiver = new AdvantechReceiver<>();
// 1. 提取ts(保留原始字符串格式)
// "2026-02-02T18:33:57.712049"时间格式是这个,转成yyyy-MM-dd HH:mm:ss
root.put("ts", root.getString("ts").replace("T", " ").substring(0, 19));
receiver.setTs(root.getString("ts"));
// 2. 解析d数组:展开所有键值对 → AdvantechDatas
List<AdvantechDatas<Number>> dataList = new ArrayList<>();
JSONArray dArray = root.getJSONArray("d");
if (dArray != null) {
for (int i = 0; i < dArray.size(); i++) {
JSONObject item = dArray.getJSONObject(i);
if (item == null || item.isEmpty()) continue;
for (Map.Entry<String, Object> entry : item.entrySet()) {
String tag = entry.getKey();
Object val = entry.getValue();
if (val instanceof Number) {
AdvantechDatas<Number> data = new AdvantechDatas<>();
data.setTag(tag);
data.setValue((Number) val);
if (defaultQuality != null) {
data.setQuality(defaultQuality);
}
dataList.add(data);
}
// 非数值字段自动跳过(可扩展日志)
}
}
}
receiver.setD(dataList);
return receiver;
}
// 便捷重载:不设置quality
public static AdvantechReceiver<AdvantechDatas<Number>> parse(String json) {
return parse(json, 0);
}
}

2
mh-framework/src/main/java/com/mh/framework/dealdata/DataProcessService.java

@ -119,4 +119,6 @@ public interface DataProcessService {
* @param boilerData
*/
void insertBoilerData(AdvantechReceiver boilerData);
void insertERSData(AdvantechReceiver boilerData);
}

25
mh-framework/src/main/java/com/mh/framework/dealdata/impl/DataProcessServiceImpl.java

@ -137,6 +137,10 @@ public class DataProcessServiceImpl implements DataProcessService {
insertData(data, "BOILER_REGISTER", Constants.BOILER);
}
@Override
public void insertERSData(AdvantechReceiver data) {
insertData(data, "ERS_REGISTER", Constants.ERS);
}
private void insertData(AdvantechReceiver data, String registerKey, String cacheKey) {
log.info("{}数据解析入库:{}", registerKey.equals("CHILLERS_REGISTER") ? "机组设备" : "计量设备", data);
@ -192,13 +196,30 @@ public class DataProcessServiceImpl implements DataProcessService {
);
}
String dString = data.getD().toString();
List<AdvantechDatas> list;
Object dObject = data.getD();
if (dObject instanceof List) {
// 如果已经是List类型,直接转换
@SuppressWarnings("unchecked")
List<AdvantechDatas> tempList = (List<AdvantechDatas>) dObject;
list = tempList;
} else {
// 如果是其他类型(如String),则进行JSON解析
String dString = dObject.toString();
// 替换掉inf
if (dString.contains("inf")) {
dString = dString.replace("inf", "0");
}
try {
list = JSON.parseObject(dString, new TypeReference<List<AdvantechDatas>>() {});
} catch (Exception e) {
log.error("JSON解析失败,原始数据: {}", dString, e);
list = new ArrayList<>();
}
}
// 假设 data 是一个包含 JSON 数据的对象
List<AdvantechDatas> list = JSON.parseObject(dString, new TypeReference<List<AdvantechDatas>>() {});
for (AdvantechDatas advantechDatas : list) {
String tag = advantechDatas.getTag();
String value = String.valueOf(advantechDatas.getValue());

3
mh-framework/src/main/java/com/mh/framework/mqtt/service/impl/EventsServiceImpl.java

@ -93,6 +93,9 @@ public class EventsServiceImpl implements IEventsService {
} else if (topic.contains(Constants.BOILER)) {
// 锅炉系统
sendMsgByTopic.sendToBoilerMQ(JSONObject.toJSONString(commonTopicReceiver));
} else if (topic.contains(Constants.ERS)) {
// 热回收系统:针对昆仑通态触摸屏系统
sendMsgByTopic.sendToKunLunTDMQ(JSONObject.toJSONString(commonTopicReceiver));
} else {
// 非本地主题处理
log.info("非本地主题处理: {}", topic);

18
mh-framework/src/main/java/com/mh/framework/rabbitmq/RabbitMqConfig.java

@ -52,6 +52,24 @@ public class RabbitMqConfig {
public static final String QUEUE_BOILER = "queue_boiler";
public static final String ROUTING_KEY_BOILER = "topic.boiler.eemcs.#";
// ERS队列:广合二厂热回收系统昆仑通态触摸屏
public static final String QUEUE_ERS = "queue_ers";
public static final String ROUTING_KEY_ERS = "topic.ers.eemcs.#";
/**热回收系统昆仑通态队列绑定交换机*/
@Bean(ROUTING_KEY_ERS)
public Binding ersBinding(@Qualifier(QUEUE_ERS) Queue queue,
@Qualifier(EXCHANGE_NAME) Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY_ERS).noargs();
}
/**热回收系统昆仑通态触摸屏队列*/
@Bean(QUEUE_ERS)
public Queue ersQueue(){
return new Queue(QUEUE_ERS);
}
/**durable参数表示交换机是否持久化值为true表示持久化值为false表示不持久化
* 在RabbitMQ中持久化交换机会被存储在磁盘上以便在服务器重启后恢复
* 而非持久化交换机则只存在于内存中服务器重启后会丢失*/

32
mh-framework/src/main/java/com/mh/framework/rabbitmq/consumer/ReceiveHandler.java

@ -2,6 +2,7 @@ package com.mh.framework.rabbitmq.consumer;
import com.alibaba.fastjson2.JSONObject;
import com.mh.common.core.redis.RedisCache;
import com.mh.common.model.request.AdvantechJsonParser;
import com.mh.common.model.request.AdvantechReceiver;
import com.mh.common.model.request.OneTwoThreeTempData;
import com.mh.framework.dealdata.DataProcessService;
@ -41,6 +42,37 @@ public class ReceiveHandler {
private static final String ALARM_CANCEL_PREFIX = "alarm:cancel:";
/**
* 处理热回收系统昆仑通态触摸屏数据相关设备数据
*
* @param msg
* @param channel
* @param tag
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = RabbitMqConfig.QUEUE_ERS, durable = "true"),
exchange = @Exchange(
value = RabbitMqConfig.EXCHANGE_NAME,
ignoreDeclarationExceptions = "true",
type = ExchangeTypes.TOPIC
),
key = {RabbitMqConfig.ROUTING_KEY_ERS}
))
public void receiveERSData(@Payload String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws InterruptedException, IOException {
try {
log.info("MQ消费者:热回收系统设备采集:{}", msg);
//TODO 数据解析入库操作 msg转成实体类,入库
AdvantechReceiver boilerData = AdvantechJsonParser.parse(msg);
dataProcessService.insertERSData(boilerData);
// 正常执行,手动确认ack
channel.basicAck(tag, false);
} catch (Exception e) {
log.error("data:{},ddcException:{}", msg, e);
Thread.sleep(100);
channel.basicAck(tag, false);
}
}
/**
* 监听主机参数
* queues指定监听的队列名可以接收单个队列也可以接收多个队列的数组或列表

10
mh-framework/src/main/java/com/mh/framework/rabbitmq/producer/SendMsgByTopic.java

@ -63,4 +63,14 @@ public class SendMsgByTopic {
rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_NAME,"topic.boiler.eemcs.data",data);
return "success";
}
/**
* 昆仑通态触摸屏数据报文注入rabbitmq
* @param data
* @return
*/
public String sendToKunLunTDMQ(String data) {
rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_NAME,"topic.ers.eemcs.data",data);
return "success";
}
}

Loading…
Cancel
Save