Browse Source

1、添加锅炉主机数据点位处理(研华网关只支持4个mqtt,目前没有使用);

dev_mz
25604 5 days ago
parent
commit
77124eff14
  1. 2
      mh-common/src/main/java/com/mh/common/constant/Constants.java
  2. 6
      mh-framework/src/main/java/com/mh/framework/dealdata/DataProcessService.java
  3. 10
      mh-framework/src/main/java/com/mh/framework/dealdata/impl/DataProcessServiceImpl.java
  4. 3
      mh-framework/src/main/java/com/mh/framework/mqtt/service/impl/EventsServiceImpl.java
  5. 2
      mh-framework/src/main/java/com/mh/framework/rabbitmq/RabbitMqConfig.java
  6. 31
      mh-framework/src/main/java/com/mh/framework/rabbitmq/consumer/ReceiveHandler.java
  7. 10
      mh-framework/src/main/java/com/mh/framework/rabbitmq/producer/SendMsgByTopic.java
  8. 19
      mh-quartz/src/main/java/com/mh/quartz/task/DealDataTask.java

2
mh-common/src/main/java/com/mh/common/constant/Constants.java

@ -183,10 +183,12 @@ public class Constants {
public static final String CLOSE_HOST = "close_host_device_id"; // 关闭主机的设备id
public static final String OPEN_VALVE = "open_valve_device_id"; // 开启蝶阀的设备id
public static final String CHILLERS = "chillers";
public static final String BOILER = "boiler"; // 锅炉
public static final String OTHER = "other";
public static final String DEVICE = "devices";
public static final String CHILLERS_TYPE = "0"; // 主机类型设备
public static final String OTHER_TYPE = "1"; // 其他设备
public static final String BOILER_TYPE = "12"; // 锅炉设备
public static boolean CONTROL_WEB_FLAG = false;
public static boolean SEND_STATUS = false; // 指令发送状态
public static boolean FLAG = false;

6
mh-framework/src/main/java/com/mh/framework/dealdata/DataProcessService.java

@ -113,4 +113,10 @@ public interface DataProcessService {
* @param oneTwoThreeTempData
*/
void insertTempData(OneTwoThreeTempData oneTwoThreeTempData);
/**
* 插入锅炉数据
* @param boilerData
*/
void insertBoilerData(AdvantechReceiver boilerData);
}

10
mh-framework/src/main/java/com/mh/framework/dealdata/impl/DataProcessServiceImpl.java

@ -131,8 +131,14 @@ public class DataProcessServiceImpl implements DataProcessService {
insertData(data, "DEVICES_REGISTER", Constants.DEVICE);
}
@Override
public void insertBoilerData(AdvantechReceiver data) {
insertData(data, "BOILER_REGISTER", Constants.BOILER);
}
private void insertData(AdvantechReceiver data, String registerKey, String cacheKey) {
log.info("{}数据解析入库:{}", registerKey.equals("CHILLERS_REGISTER") ? "冷水机组" : "计量设备", data);
log.info("{}数据解析入库:{}", registerKey.equals("CHILLERS_REGISTER") ? "机组设备" : "计量设备", data);
if (registerKey.equals("CHILLERS_REGISTER")) {
databaseMapper.createChillerTable();
} else {
@ -144,6 +150,8 @@ public class DataProcessServiceImpl implements DataProcessService {
if (null == registers || registers.isEmpty()) {
if (registerKey.equals("CHILLERS_REGISTER")) {
registers = collectionParamsManageService.queryCollectionParamsByMtType(Constants.CHILLERS_TYPE);
} else if (cacheKey.equals(Constants.BOILER) && registerKey.equals("BOILER_REGISTER") ) {
registers = collectionParamsManageService.queryCollectionParamsByMtType(Constants.BOILER_TYPE);
} else {
registers = collectionParamsManageService.queryCollectionParamsByMtType(Constants.OTHER_TYPE);
}

3
mh-framework/src/main/java/com/mh/framework/mqtt/service/impl/EventsServiceImpl.java

@ -90,6 +90,9 @@ public class EventsServiceImpl implements IEventsService {
OneTwoThreeTempData oneTwoThreeTempData = mapper.readValue(receiver, OneTwoThreeTempData.class);
log.info("主题:{},类型:{}: ,数据:{}", topic, logMessage, oneTwoThreeTempData.toString());
sendMsgByTopic.sendToTempMQ(JSONObject.toJSONString(oneTwoThreeTempData));
} else if (topic.contains(Constants.BOILER)) {
// 锅炉系统
sendMsgByTopic.sendToBoilerMQ(JSONObject.toJSONString(commonTopicReceiver));
} else {
// 非本地主题处理
log.info("非本地主题处理: {}", topic);

2
mh-framework/src/main/java/com/mh/framework/rabbitmq/RabbitMqConfig.java

@ -49,6 +49,8 @@ public class RabbitMqConfig {
public static final String QUEUE_ALARM = "device.alarm.queue";
public static final String ROUTING_KEY_ALARM = "topic.alarm.eemcs.#";
public static final String QUEUE_BOILER = "queue_boiler";
public static final String ROUTING_KEY_BOILER = "topic.boiler.eemcs.#";
/**durable参数表示交换机是否持久化值为true表示持久化值为false表示不持久化
* 在RabbitMQ中持久化交换机会被存储在磁盘上以便在服务器重启后恢复

31
mh-framework/src/main/java/com/mh/framework/rabbitmq/consumer/ReceiveHandler.java

@ -173,6 +173,37 @@ public class ReceiveHandler {
}
}
/**
* 处理锅炉系统相关设备数据
*
* @param msg
* @param channel
* @param tag
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = RabbitMqConfig.QUEUE_BOILER, durable = "true"),
exchange = @Exchange(
value = RabbitMqConfig.EXCHANGE_NAME,
ignoreDeclarationExceptions = "true",
type = ExchangeTypes.TOPIC
),
key = {RabbitMqConfig.ROUTING_KEY_BOILER}
))
public void receiveBoilerData(@Payload String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws InterruptedException, IOException {
try {
log.info("MQ消费者:锅炉系统设备采集:{}", msg);
//TODO 数据解析入库操作 msg转成实体类,入库
AdvantechReceiver boilerData = JSONObject.parseObject(msg, AdvantechReceiver.class);
dataProcessService.insertBoilerData(boilerData);
// 正常执行,手动确认ack
channel.basicAck(tag, false);
} catch (Exception e) {
log.error("data:{},ddcException:{}", msg, e);
Thread.sleep(100);
channel.basicAck(tag, false);
}
}
/**
* 处理设备报警延时队列数据
*

10
mh-framework/src/main/java/com/mh/framework/rabbitmq/producer/SendMsgByTopic.java

@ -53,4 +53,14 @@ public class SendMsgByTopic {
rabbitTemplate.send(RabbitMqConfig.DELAY_EXCHANGE_NAME, RabbitMqConfig.ROUTING_KEY_ALARM, msg);
}
/**
* 锅炉数据报文注入rabbitmq
* @param data
* @return
*/
public String sendToBoilerMQ(String data) {
rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_NAME,"topic.boiler.eemcs.data",data);
return "success";
}
}

19
mh-quartz/src/main/java/com/mh/quartz/task/DealDataTask.java

@ -644,6 +644,25 @@ public class DealDataTask {
}
}
/**
* 处理锅炉数据获取进入chillers表
*/
public void dealBoilerData() {
List<CollectionParamsManage> cacheList = redisCache.getCacheList(Constants.BOILER, CollectionParamsManage.class);
if (null == cacheList || cacheList.isEmpty()) {
return;
}
//清空redis
redisCache.deleteObject(Constants.BOILER);
//处理chillers数据
try {
//todo 处理没有对象curValue和curTime的异常
dealChillersCollect(cacheList);
} catch (Exception e) {
log.error("处理主机参数异常:{}", e);
}
}
/**
* 处理主机秒级数据再计算主机运行时间
*

Loading…
Cancel
Save