diff --git a/mh-admin/src/main/java/com/mh/web/controller/device/OperationController.java b/mh-admin/src/main/java/com/mh/web/controller/device/OperationController.java index 8481dc8..89f59d3 100644 --- a/mh-admin/src/main/java/com/mh/web/controller/device/OperationController.java +++ b/mh-admin/src/main/java/com/mh/web/controller/device/OperationController.java @@ -9,9 +9,11 @@ import com.mh.common.core.domain.vo.DeviceOperateMonitorVO; import com.mh.common.core.page.TableDataInfo; import com.mh.framework.mqtt.service.IMqttGatewayService; import com.mh.system.service.device.ICollectionParamsManageService; +import com.mh.system.service.mqtt.IMqttSubscriptionService; import com.mh.system.service.operation.IOperationDeviceService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.security.access.prepost.PreAuthorize; import org.springframework.web.bind.annotation.*; @@ -29,6 +31,9 @@ import java.util.List; @RequestMapping("/device") public class OperationController extends BaseController { + @Value("${control.topic}") + String controlTopic; + private final ICollectionParamsManageService iCollectionParamsManageService; private final IOperationDeviceService iOperationService; @@ -36,7 +41,9 @@ public class OperationController extends BaseController { private final IMqttGatewayService iMqttGatewayService; @Autowired - public OperationController(ICollectionParamsManageService iCollectionParamsManageService, IOperationDeviceService iOperationService, IMqttGatewayService iMqttGatewayService) { + public OperationController(ICollectionParamsManageService iCollectionParamsManageService, + IOperationDeviceService iOperationService, + IMqttGatewayService iMqttGatewayService) { this.iCollectionParamsManageService = iCollectionParamsManageService; this.iOperationService = iOperationService; this.iMqttGatewayService = iMqttGatewayService; @@ -83,7 +90,9 @@ public class OperationController extends BaseController { public AjaxResult operationDevice(@RequestBody List changeValues) { try { String sendOrder = iOperationService.operationDevice(changeValues); - iMqttGatewayService.publish("mh_control/events_upload/devices", sendOrder, 1); + // 获取mqtt操作队列(后期通过mqtt队列配置发送主题) + log.info("发送主题:{},消息:{}", controlTopic, sendOrder); +// iMqttGatewayService.publish(controlTopic, sendOrder, 1); } catch (Exception e) { log.error("设备操作失败", e); return AjaxResult.error(); diff --git a/mh-admin/src/main/java/com/mh/web/controller/monitor/CoolingSystemMonitorController.java b/mh-admin/src/main/java/com/mh/web/controller/monitor/CoolingSystemMonitorController.java index 3ba4024..b2117b3 100644 --- a/mh-admin/src/main/java/com/mh/web/controller/monitor/CoolingSystemMonitorController.java +++ b/mh-admin/src/main/java/com/mh/web/controller/monitor/CoolingSystemMonitorController.java @@ -89,9 +89,26 @@ public class CoolingSystemMonitorController extends BaseController { return AjaxResult.success(iCoolingSystemMonitorService.getSysPerformance()); } + /** + * 启动按钮参数获取 + * @param systemType + * @param paramType + * @return + */ @GetMapping("/oneKeyButton") public AjaxResult getOneKeyButton(@RequestParam(name = "systemType") String systemType, @RequestParam(name = "paramType") String paramType) { return AjaxResult.success(iCoolingSystemMonitorService.getOneKeyButton(systemType, paramType)); } + /** + * 获取当前系统模式以及现在的运行状态 + * @param systemType + * @param paramType + * @return + */ + @GetMapping("/getSystemMode") + public AjaxResult getSystemMode(@RequestParam(name = "systemType") String systemType, @RequestParam(name = "paramType") String paramType) { + return AjaxResult.success(iCoolingSystemMonitorService.getSystemMode(systemType, paramType)); + } + } diff --git a/mh-admin/src/main/resources/application-dev.yml b/mh-admin/src/main/resources/application-dev.yml index 46c918d..ef61494 100644 --- a/mh-admin/src/main/resources/application-dev.yml +++ b/mh-admin/src/main/resources/application-dev.yml @@ -99,7 +99,7 @@ spring: master: #添加allowMultiQueries=true 在批量更新时才不会出错 url: jdbc:postgresql://127.0.0.1:5432/eemcs_hw -# url: jdbc:postgresql://127.0.0.1:5432/eemcs + # url: jdbc:postgresql://127.0.0.1:5432/eemcs username: postgres password: mh@803 # 从库数据源 @@ -191,11 +191,11 @@ mqttSpring: # BASIC parameters are required. BASIC: protocol: MQTT - host: mqtt.mhito.net - port: 1883 - username: sa - password: sa123 - client-id: eemcs_hw_mqtt_dev + host: 127.0.0.1 + port: 2883 + username: mh + password: mhtech@803 + client-id: mqtt_mz_producer_dev # If the protocol is ws/wss, this value is required. path: # Topics that need to be subscribed when initially connecting to mqtt, multiple topics are divided by ",". @@ -207,5 +207,7 @@ mqttSpring: host: 127.0.0.1 port: 8083 path: /mqtt +control: + topic: mh_control/events_upload/devices/dev amap: key: fc4e79719daca2d0b8a11ba3124e1bd5 diff --git a/mh-admin/src/main/resources/application-druid.yml b/mh-admin/src/main/resources/application-druid.yml index f45367c..8f7d78a 100644 --- a/mh-admin/src/main/resources/application-druid.yml +++ b/mh-admin/src/main/resources/application-druid.yml @@ -7,8 +7,8 @@ spring: # 主库数据源 master: #添加allowMultiQueries=true 在批量更新时才不会出错 -# url: jdbc:postgresql://127.0.0.1:5432/eemcs_hw - url: jdbc:postgresql://127.0.0.1:5432/eemcs + url: jdbc:postgresql://127.0.0.1:5432/eemcs_hw +# url: jdbc:postgresql://127.0.0.1:5432/eemcs username: postgres password: mh@803 # 从库数据源 diff --git a/mh-admin/src/main/resources/application-prod.yml b/mh-admin/src/main/resources/application-prod.yml index 3610e24..3390710 100644 --- a/mh-admin/src/main/resources/application-prod.yml +++ b/mh-admin/src/main/resources/application-prod.yml @@ -207,5 +207,7 @@ mqttSpring: host: 127.0.0.1 port: 8083 path: /mqtt +control: + topic: mh_control/events_upload/devices amap: key: fc4e79719daca2d0b8a11ba3124e1bd5 diff --git a/mh-admin/src/main/resources/application-test.yml b/mh-admin/src/main/resources/application-test.yml index 401a3ac..c0a3054 100644 --- a/mh-admin/src/main/resources/application-test.yml +++ b/mh-admin/src/main/resources/application-test.yml @@ -206,5 +206,7 @@ mqttSpring: host: 127.0.0.1 port: 8083 path: /mqtt +control: + topic: mh_control/events_upload/devices/test amap: key: fc4e79719daca2d0b8a11ba3124e1bd5 diff --git a/mh-common/src/main/java/com/mh/common/constant/Constants.java b/mh-common/src/main/java/com/mh/common/constant/Constants.java index c3079a3..2361767 100644 --- a/mh-common/src/main/java/com/mh/common/constant/Constants.java +++ b/mh-common/src/main/java/com/mh/common/constant/Constants.java @@ -182,9 +182,9 @@ public class Constants { public static final String COOL_TOWER_TYPE = "tower"; // 冷却塔 public static final String CLOSE_HOST = "close_host_device_id"; // 关闭主机的设备id public static final String OPEN_VALVE = "open_valve_device_id"; // 开启蝶阀的设备id - public static final CharSequence CHILLERS = "chillers"; - public static final CharSequence OTHER = "other"; - public static final CharSequence DEVICE = "devices"; + public static final String CHILLERS = "chillers"; + public static final String OTHER = "other"; + public static final String DEVICE = "devices"; public static final String CHILLERS_TYPE = "0"; // 主机类型设备 public static final String OTHER_TYPE = "1"; // 其他设备 public static boolean CONTROL_WEB_FLAG = false; @@ -215,4 +215,10 @@ public class Constants { // 项目图片存储路径 public static String PROJECT_IMG_PATH = "\\images\\"; + + /** + * 温度 + */ + public static final String TEMP = "temp"; + } diff --git a/mh-common/src/main/java/com/mh/common/core/domain/entity/PolicyManage.java b/mh-common/src/main/java/com/mh/common/core/domain/entity/PolicyManage.java index c2952e3..c12ca39 100644 --- a/mh-common/src/main/java/com/mh/common/core/domain/entity/PolicyManage.java +++ b/mh-common/src/main/java/com/mh/common/core/domain/entity/PolicyManage.java @@ -74,6 +74,21 @@ public class PolicyManage { */ private int orderNum; + /** + * 最小值 + */ + private int min; + + /** + * 最大值 + */ + private int max; + + /** + * 保留位数 + */ + private int digits; + @Override public String toString() { return new ToStringBuilder(this) diff --git a/mh-common/src/main/java/com/mh/common/model/request/CurrentHumidity.java b/mh-common/src/main/java/com/mh/common/model/request/CurrentHumidity.java new file mode 100644 index 0000000..9638bb6 --- /dev/null +++ b/mh-common/src/main/java/com/mh/common/model/request/CurrentHumidity.java @@ -0,0 +1,8 @@ +package com.mh.common.model.request; + +import lombok.Data; + +@Data +public class CurrentHumidity { + private T value; +} diff --git a/mh-common/src/main/java/com/mh/common/model/request/CurrentTemperature.java b/mh-common/src/main/java/com/mh/common/model/request/CurrentTemperature.java new file mode 100644 index 0000000..82267f3 --- /dev/null +++ b/mh-common/src/main/java/com/mh/common/model/request/CurrentTemperature.java @@ -0,0 +1,8 @@ +package com.mh.common.model.request; + +import lombok.Data; + +@Data +public class CurrentTemperature { + private T value; +} diff --git a/mh-common/src/main/java/com/mh/common/model/request/OneTwoThreeTempData.java b/mh-common/src/main/java/com/mh/common/model/request/OneTwoThreeTempData.java new file mode 100644 index 0000000..7dcdc41 --- /dev/null +++ b/mh-common/src/main/java/com/mh/common/model/request/OneTwoThreeTempData.java @@ -0,0 +1,41 @@ +package com.mh.common.model.request; + +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.Data; +import lombok.Getter; +import lombok.Setter; +import org.apache.commons.lang3.builder.ToStringBuilder; + +/** + * @author LJF + * @version 1.0 + * @project EEMCS + * @description 具体温湿度数据 + * @date 2025-04-17 17:17:06 + */ +@Setter +@Getter +public class OneTwoThreeTempData{ + private String id; + private String time; + @JsonProperty("RSSI") + private String rssi; + private String version; + @JsonProperty("sensor-type") + private String sensorType; + private Params params; + + @Override + public String toString() { + return new ToStringBuilder(this) + .append("id", id) + .append("time", time) + .append("rssi", rssi) + .append("version", version) + .append("sensorType", sensorType) + .append("params", params) + .toString(); + } + +} + diff --git a/mh-common/src/main/java/com/mh/common/model/request/Params.java b/mh-common/src/main/java/com/mh/common/model/request/Params.java new file mode 100644 index 0000000..acee19f --- /dev/null +++ b/mh-common/src/main/java/com/mh/common/model/request/Params.java @@ -0,0 +1,12 @@ +package com.mh.common.model.request; + +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.Data; + +@Data +public class Params { + @JsonProperty("CurrentTemperature") + private CurrentTemperature currentTemperature; + @JsonProperty("CurrentHumidity") + private CurrentHumidity currentHumidity; +} diff --git a/mh-framework/src/main/java/com/mh/framework/dealdata/DataProcessService.java b/mh-framework/src/main/java/com/mh/framework/dealdata/DataProcessService.java index 70e4112..6fd5177 100644 --- a/mh-framework/src/main/java/com/mh/framework/dealdata/DataProcessService.java +++ b/mh-framework/src/main/java/com/mh/framework/dealdata/DataProcessService.java @@ -3,6 +3,7 @@ package com.mh.framework.dealdata; import com.mh.common.core.domain.entity.CollectionParamsManage; import com.mh.common.core.domain.entity.DeviceReport; import com.mh.common.model.request.AdvantechReceiver; +import com.mh.common.model.request.OneTwoThreeTempData; import java.util.List; import java.util.Map; @@ -106,4 +107,10 @@ public interface DataProcessService { * @param time */ void calculateCopByTime(String time); + + /** + * 插入温度数据 + * @param oneTwoThreeTempData + */ + void insertTempData(OneTwoThreeTempData oneTwoThreeTempData); } diff --git a/mh-framework/src/main/java/com/mh/framework/dealdata/impl/DataProcessServiceImpl.java b/mh-framework/src/main/java/com/mh/framework/dealdata/impl/DataProcessServiceImpl.java index 55e979d..6b5c9eb 100644 --- a/mh-framework/src/main/java/com/mh/framework/dealdata/impl/DataProcessServiceImpl.java +++ b/mh-framework/src/main/java/com/mh/framework/dealdata/impl/DataProcessServiceImpl.java @@ -8,6 +8,8 @@ import com.mh.common.core.domain.entity.DeviceReport; import com.mh.common.core.redis.RedisCache; import com.mh.common.model.request.AdvantechDatas; import com.mh.common.model.request.AdvantechReceiver; +import com.mh.common.model.request.OneTwoThreeTempData; +import com.mh.common.model.request.Params; import com.mh.common.utils.DateUtils; import com.mh.common.utils.EnergyThreadPoolService; import com.mh.common.utils.StringUtils; @@ -38,16 +40,6 @@ import java.util.stream.Collectors; @Slf4j public class DataProcessServiceImpl implements DataProcessService { - /** - * 主机 - */ - private static final String CHILLERS = "CHILLERS"; - - /** - * 计量设备 - */ - private static final String DEVICES = "DEVICES"; - @Autowired DatabaseMapper databaseMapper; @@ -62,14 +54,82 @@ public class DataProcessServiceImpl implements DataProcessService { ThreadPoolExecutor threadPoolService = EnergyThreadPoolService.getInstance(); + @Override + public void insertTempData(OneTwoThreeTempData oneTwoThreeTempData) { + insertTempDataToDb(oneTwoThreeTempData, "SENSOR_REGISTER", Constants.TEMP); + } + + private void insertTempDataToDb(OneTwoThreeTempData data, String registerKey, String cacheKey) { + log.info("{}数据解析入库:{}", registerKey.equals("SENSOR_REGISTER") ? "温湿度传感器" : "其他设备", data); + if (registerKey.equals("SENSOR_REGISTER")) { + databaseMapper.createChillerTable(); + } else { + databaseMapper.createDataTable(); + } + ArrayList entities = new ArrayList<>(); + + List registers = redisCache.getCacheList(registerKey, CollectionParamsManage.class); + if (null == registers || registers.isEmpty()) { + if (registerKey.equals("SENSOR_REGISTER")) { + registers = collectionParamsManageService.queryCollectionParamsByMtType(Constants.CHILLERS_TYPE); + } else { + registers = collectionParamsManageService.queryCollectionParamsByMtType(Constants.OTHER_TYPE); + } + redisCache.setCacheList(registerKey, registers); + } + + // stream流过滤mtNum中包含data.getId()的集合 + List collect = registers.stream().filter(entity -> entity.getMtNum().contains(data.getId())).toList(); + // 温湿度传感器命名规则id+1:温度,id+2:湿度 + if (collect.isEmpty()) { + return; + } + Date curTime = new Date(); + for (CollectionParamsManage entity : collect) { + BigDecimal temp = new BigDecimal(String.valueOf(data.getParams().getCurrentTemperature().getValue())); + BigDecimal humidity = new BigDecimal(String.valueOf(data.getParams().getCurrentHumidity().getValue())); + if ((data.getId()+"1").equals(String.valueOf(entity.getMtNum()))) { + CollectionParamsManage collectionParamsManage = new CollectionParamsManage(); + collectionParamsManage = entity; + try { + collectionParamsManage.setCurValue(temp); + } catch (NumberFormatException e) { + log.error("数值格式解析异常", e); + continue; + } + collectionParamsManage.setCurTime(curTime); + entities.add(collectionParamsManage); + } else if ((data.getId()+"2").equals(String.valueOf(entity.getMtNum()))) { + CollectionParamsManage collectionParamsManage = new CollectionParamsManage(); + collectionParamsManage = entity; + try { + collectionParamsManage.setCurValue(humidity); + } catch (NumberFormatException e) { + log.error("数值格式解析异常", e); + continue; + } + collectionParamsManage.setCurTime(curTime); + entities.add(collectionParamsManage); + } + } + + redisCache.setCacheList(cacheKey, entities); + + threadPoolService.execute(() -> { + if (!entities.isEmpty()) { + collectionParamsManageService.updateCollectionParamsManages(entities); + } + }); + } + @Override public void insertChillerData(AdvantechReceiver data) { - insertData(data, "CHILLERS_REGISTER", CHILLERS); + insertData(data, "CHILLERS_REGISTER", Constants.CHILLERS); } @Override public void insertDeviceData(AdvantechReceiver data) { - insertData(data, "DEVICES_REGISTER", DEVICES); + insertData(data, "DEVICES_REGISTER", Constants.DEVICE); } private void insertData(AdvantechReceiver data, String registerKey, String cacheKey) { diff --git a/mh-framework/src/main/java/com/mh/framework/mqtt/service/impl/EventsServiceImpl.java b/mh-framework/src/main/java/com/mh/framework/mqtt/service/impl/EventsServiceImpl.java index 81cd0b9..22538d7 100644 --- a/mh-framework/src/main/java/com/mh/framework/mqtt/service/impl/EventsServiceImpl.java +++ b/mh-framework/src/main/java/com/mh/framework/mqtt/service/impl/EventsServiceImpl.java @@ -6,6 +6,7 @@ import com.mh.common.constant.ChannelName; import com.mh.common.constant.Constants; import com.mh.common.model.request.AdvantechDatas; import com.mh.common.model.request.AdvantechReceiver; +import com.mh.common.model.request.OneTwoThreeTempData; import com.mh.framework.mqtt.service.IEventsService; import com.mh.framework.rabbitmq.producer.SendMsgByTopic; import io.netty.util.CharsetUtil; @@ -68,16 +69,27 @@ public class EventsServiceImpl implements IEventsService { private void handleInboundData(byte[] receiver,String topic, String logMessage) { try { - AdvantechReceiver commonTopicReceiver = mapper.readValue(receiver, AdvantechReceiver.class); - log.info("{}: {}", logMessage, commonTopicReceiver); + AdvantechReceiver commonTopicReceiver = new AdvantechReceiver(); + if (!topic.contains(Constants.TEMP)) { + commonTopicReceiver = mapper.readValue(receiver, AdvantechReceiver.class); + log.info("主题:{},类型:{}: ,数据:{}", topic, logMessage, commonTopicReceiver.toString()); + } // 接入消息队列,利用消息对接进行数据处理 // 判断当前主题属于哪种主动上报数据 if (topic.contains(Constants.CHILLERS)) { + // 主机参数数据 sendMsgByTopic.sendToChillerMQ(JSONObject.toJSONString(commonTopicReceiver)); } else if (topic.contains(Constants.OTHER)) { + // 其他数据 sendMsgByTopic.sendToOtherMQ(JSONObject.toJSONString(commonTopicReceiver)); } else if (topic.contains(Constants.DEVICE)) { + // 设备数据 sendMsgByTopic.sendToDeviceMQ(JSONObject.toJSONString(commonTopicReceiver)); + } else if (topic.contains(Constants.TEMP)) { + // 温湿度数据 + OneTwoThreeTempData oneTwoThreeTempData = mapper.readValue(receiver, OneTwoThreeTempData.class); + log.info("主题:{},类型:{}: ,数据:{}", topic, logMessage, oneTwoThreeTempData.toString()); + sendMsgByTopic.sendToTempMQ(JSONObject.toJSONString(oneTwoThreeTempData)); } else { // 非本地主题处理 log.info("非本地主题处理: {}", topic); diff --git a/mh-framework/src/main/java/com/mh/framework/rabbitmq/RabbitMqConfig.java b/mh-framework/src/main/java/com/mh/framework/rabbitmq/RabbitMqConfig.java index fa5cc38..d7d740c 100644 --- a/mh-framework/src/main/java/com/mh/framework/rabbitmq/RabbitMqConfig.java +++ b/mh-framework/src/main/java/com/mh/framework/rabbitmq/RabbitMqConfig.java @@ -29,6 +29,12 @@ public class RabbitMqConfig { /**DDC routing-key*/ public static final String ROUTING_KEY_OTHER = "topic.other.eemcs.#"; + /**温湿度设备队列*/ + public static final String QUEUE_TEMP = "queue_temp"; + /**温湿度 routing-key*/ + public static final String ROUTING_KEY_TEMP = "topic.temp.eemcs.#"; + + /**durable参数表示交换机是否持久化,值为true表示持久化,值为false表示不持久化。 * 在RabbitMQ中,持久化交换机会被存储在磁盘上以便在服务器重启后恢复, * 而非持久化交换机则只存在于内存中,服务器重启后会丢失*/ @@ -77,6 +83,19 @@ public class RabbitMqConfig { return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY_OTHER).noargs(); } + /**温湿度传感器设备队列*/ + @Bean(QUEUE_TEMP) + public Queue tempQueue(){ + return new Queue(QUEUE_TEMP); + } + + /**温湿度传感器队列绑定交换机*/ + @Bean(ROUTING_KEY_TEMP) + public Binding tempBinding(@Qualifier(QUEUE_TEMP) Queue queue, + @Qualifier(EXCHANGE_NAME) Exchange exchange){ + return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY_TEMP).noargs(); + } + /** * 默认消费者数量1 * setConcurrentConsumers(10); diff --git a/mh-framework/src/main/java/com/mh/framework/rabbitmq/consumer/ReceiveHandler.java b/mh-framework/src/main/java/com/mh/framework/rabbitmq/consumer/ReceiveHandler.java index 43bde35..cfde10c 100644 --- a/mh-framework/src/main/java/com/mh/framework/rabbitmq/consumer/ReceiveHandler.java +++ b/mh-framework/src/main/java/com/mh/framework/rabbitmq/consumer/ReceiveHandler.java @@ -3,6 +3,7 @@ package com.mh.framework.rabbitmq.consumer; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONObject; import com.mh.common.model.request.AdvantechReceiver; +import com.mh.common.model.request.OneTwoThreeTempData; import com.mh.framework.dealdata.DataProcessService; import com.mh.framework.rabbitmq.RabbitMqConfig; import com.rabbitmq.client.Channel; @@ -133,4 +134,35 @@ public class ReceiveHandler { } } + /** + * 处理温湿度设备数据 + * + * @param msg + * @param channel + * @param tag + */ + @RabbitListener(bindings = @QueueBinding( + value = @Queue(value = RabbitMqConfig.QUEUE_TEMP, durable = "true"), + exchange = @Exchange( + value = RabbitMqConfig.EXCHANGE_NAME, + ignoreDeclarationExceptions = "true", + type = ExchangeTypes.TOPIC + ), + key = {RabbitMqConfig.ROUTING_KEY_TEMP} + )) + public void receiveTempData(@Payload String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws InterruptedException, IOException { + try { + log.info("MQ消费者:温湿度设备采集:{}", msg); + //TODO 数据解析入库操作 msg转成实体类,入库 + OneTwoThreeTempData oneTwoThreeTempData = JSONObject.parseObject(msg, OneTwoThreeTempData.class); + dataProcessService.insertTempData(oneTwoThreeTempData); + // 正常执行,手动确认ack + channel.basicAck(tag, false); + } catch (Exception e) { + log.error("data:{},ddcException:{}", msg, e); + Thread.sleep(100); + channel.basicAck(tag, false); + } + } + } diff --git a/mh-framework/src/main/java/com/mh/framework/rabbitmq/producer/SendMsgByTopic.java b/mh-framework/src/main/java/com/mh/framework/rabbitmq/producer/SendMsgByTopic.java index 5cc2535..c9c1b54 100644 --- a/mh-framework/src/main/java/com/mh/framework/rabbitmq/producer/SendMsgByTopic.java +++ b/mh-framework/src/main/java/com/mh/framework/rabbitmq/producer/SendMsgByTopic.java @@ -34,4 +34,10 @@ public class SendMsgByTopic { rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_NAME,"topic.other.eemcs.data",data); return "success"; } + + /**温湿度报文注入rabbitmq*/ + public String sendToTempMQ(String data){ + rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_NAME,"topic.temp.eemcs.data",data); + return "success"; + } } diff --git a/mh-quartz/src/main/java/com/mh/quartz/task/DealDataTask.java b/mh-quartz/src/main/java/com/mh/quartz/task/DealDataTask.java index 63fe885..64a1568 100644 --- a/mh-quartz/src/main/java/com/mh/quartz/task/DealDataTask.java +++ b/mh-quartz/src/main/java/com/mh/quartz/task/DealDataTask.java @@ -1,5 +1,6 @@ package com.mh.quartz.task; +import com.mh.common.constant.Constants; import com.mh.common.core.domain.entity.CollectionParamsManage; import com.mh.common.core.domain.entity.DeviceReport; import com.mh.common.core.redis.RedisCache; @@ -45,12 +46,12 @@ public class DealDataTask { * 不要修改时间!!!不要修改时间!!!5分钟 */ public void dealDeviceData() { - List cacheList = redisCache.getCacheList("DEVICES", CollectionParamsManage.class); + List cacheList = redisCache.getCacheList(Constants.DEVICE, CollectionParamsManage.class); if (null == cacheList || cacheList.isEmpty()) { return; } //清空redis - redisCache.deleteObject("DEVICES"); + redisCache.deleteObject(Constants.DEVICE); //处理chillers数据 try { //todo 处理没有对象curValue和curTime的异常 @@ -595,12 +596,31 @@ public class DealDataTask { * 处理冷水机组数据获取只要实时数据的设备 */ public void dealChillersData() { - List cacheList = redisCache.getCacheList("CHILLERS", CollectionParamsManage.class); + List cacheList = redisCache.getCacheList(Constants.CHILLERS, CollectionParamsManage.class); if (null == cacheList || cacheList.isEmpty()) { return; } //清空redis - redisCache.deleteObject("CHILLERS"); + redisCache.deleteObject(Constants.CHILLERS); + //处理chillers数据 + try { + //todo 处理没有对象curValue和curTime的异常 + dealChillersCollect(cacheList); + } catch (Exception e) { + log.error("处理主机参数异常:{}", e); + } + } + + /** + * 处理温湿度传感器数据获取进入chillers表 + */ + public void dealTempData() { + List cacheList = redisCache.getCacheList(Constants.TEMP, CollectionParamsManage.class); + if (null == cacheList || cacheList.isEmpty()) { + return; + } + //清空redis + redisCache.deleteObject(Constants.TEMP); //处理chillers数据 try { //todo 处理没有对象curValue和curTime的异常 diff --git a/mh-system/src/main/java/com/mh/system/mapper/device/CollectionParamsManageMapper.java b/mh-system/src/main/java/com/mh/system/mapper/device/CollectionParamsManageMapper.java index 44757ff..d03634a 100644 --- a/mh-system/src/main/java/com/mh/system/mapper/device/CollectionParamsManageMapper.java +++ b/mh-system/src/main/java/com/mh/system/mapper/device/CollectionParamsManageMapper.java @@ -8,6 +8,8 @@ import com.mh.common.core.domain.vo.DeviceMonitorVO; import com.mh.common.core.domain.vo.HotWaterControlListVO; import org.apache.ibatis.annotations.*; +import java.math.BigDecimal; +import java.util.Date; import java.util.List; import java.util.Map; import java.util.Objects; @@ -293,4 +295,7 @@ public interface CollectionParamsManageMapper extends BaseMapper selectBySystemTypeAndHouseId(@Param("systemType") String systemType, @Param("houseId") String houseId); + + @Update("update collection_params_manage set cur_value = #{curValue}, cur_time = #{curTime} where id = #{id} ") + void updateCurValueById(@Param("id") String id, @Param("curValue") BigDecimal curValue, @Param("curTime") Date curTime); } diff --git a/mh-system/src/main/java/com/mh/system/mapper/policy/PolicyManageMapper.java b/mh-system/src/main/java/com/mh/system/mapper/policy/PolicyManageMapper.java index 21720bb..a380b5b 100644 --- a/mh-system/src/main/java/com/mh/system/mapper/policy/PolicyManageMapper.java +++ b/mh-system/src/main/java/com/mh/system/mapper/policy/PolicyManageMapper.java @@ -23,4 +23,7 @@ public interface PolicyManageMapper extends BaseMapper { " where pm.system_type = #{systemType} and pm.fun_policy_type = #{funPolicyType} order by pm.policy_type, pm.order_num ") List selectPolicyList(@Param("systemType") String systemType, @Param("funPolicyType") String funPolicyType); + + @Select("select pm.* from policy_manage pm where pm.cpm_id = #{cpmId} limit 1") + PolicyManage selectOntByCpmId(@Param("cpmId") String cpmId); } diff --git a/mh-system/src/main/java/com/mh/system/service/device/ICoolingSystemMonitorService.java b/mh-system/src/main/java/com/mh/system/service/device/ICoolingSystemMonitorService.java index 0dd009a..74b663b 100644 --- a/mh-system/src/main/java/com/mh/system/service/device/ICoolingSystemMonitorService.java +++ b/mh-system/src/main/java/com/mh/system/service/device/ICoolingSystemMonitorService.java @@ -3,6 +3,7 @@ package com.mh.system.service.device; import com.mh.common.core.domain.dto.SysPerformanceDTO; import com.mh.common.core.domain.entity.CollectionParamsManage; +import java.util.HashMap; import java.util.Map; /** @@ -33,4 +34,6 @@ public interface ICoolingSystemMonitorService { SysPerformanceDTO getSysPerformance(); CollectionParamsManage getOneKeyButton(String systemType, String paramType); + + HashMap getSystemMode(String systemType, String paramType); } diff --git a/mh-system/src/main/java/com/mh/system/service/device/impl/CollectionParamsManageServiceImpl.java b/mh-system/src/main/java/com/mh/system/service/device/impl/CollectionParamsManageServiceImpl.java index 8d0a3df..5cc39db 100644 --- a/mh-system/src/main/java/com/mh/system/service/device/impl/CollectionParamsManageServiceImpl.java +++ b/mh-system/src/main/java/com/mh/system/service/device/impl/CollectionParamsManageServiceImpl.java @@ -159,7 +159,7 @@ public class CollectionParamsManageServiceImpl implements ICollectionParamsManag chillersEntities.forEach(chillerEntity -> { log.info("chillerEntity: {}", chillerEntity.toString()); if (chillerEntity.getId()!= null && !StringUtils.isEmpty(chillerEntity.getId())) { - collectionParamsManageMapper.updateById(chillerEntity); + collectionParamsManageMapper.updateCurValueById(chillerEntity.getId(), chillerEntity.getCurValue(), chillerEntity.getCurTime()); } }); // 根据台账id更新台账在线情况 @@ -246,7 +246,13 @@ public class CollectionParamsManageServiceImpl implements ICollectionParamsManag deviceMonitorVO.setDeviceType(param.getMtType()); deviceMonitorVO.setCollectName(param.getOtherName()); deviceMonitorVO.setCollectTime(param.getCurTime()); - deviceMonitorVO.setCollectValue(String.valueOf(param.getCurValue())); + // 判断如果是压力,放大1000倍,并保留一位小数 + if (param.getParamType().equals("13")) { + BigDecimal bigDecimal = param.getCurValue(); + bigDecimal = bigDecimal.multiply(new BigDecimal(1000)); + param.setCurValue(bigDecimal.setScale(1, RoundingMode.HALF_UP)); + } + deviceMonitorVO.setCollectValue(String.valueOf(param.getCurValue().setScale(1, RoundingMode.HALF_UP))); deviceMonitorVO.setParamType(param.getParamType()); deviceMonitorVO.setOrderNum(param.getCpmOrderNum()); } diff --git a/mh-system/src/main/java/com/mh/system/service/device/impl/CoolingSystemMonitorServiceImpl.java b/mh-system/src/main/java/com/mh/system/service/device/impl/CoolingSystemMonitorServiceImpl.java index bdb955f..1a667db 100644 --- a/mh-system/src/main/java/com/mh/system/service/device/impl/CoolingSystemMonitorServiceImpl.java +++ b/mh-system/src/main/java/com/mh/system/service/device/impl/CoolingSystemMonitorServiceImpl.java @@ -43,9 +43,116 @@ public class CoolingSystemMonitorServiceImpl implements ICoolingSystemMonitorSer this.collectionParamsManageMapper = collectionParamsManageMapper; } + @Override + public HashMap getSystemMode(String systemType, String paramType) { + HashMap result = new HashMap<>(); + CollectionParamsManage collectionParamsManage = collectionParamsManageMapper.selectOne(new QueryWrapper().eq("system_type", systemType).eq("param_type", paramType)); + // 判断当前系统启动状态 + List collectionParamsManages = collectionParamsManageMapper.selectList(new QueryWrapper().eq("system_type", systemType).eq("param_type", "30")); + if (null != collectionParamsManage && null != collectionParamsManage.getCurValue()) { + // 判断启动模式是什么 + if (collectionParamsManage.getCurValue().compareTo(new BigDecimal("1")) == 0) { + // 如果是1,说明是"一键开关模式" + result.put("autoType", "一键开关模式"); + // 获取一键启动标识 + CollectionParamsManage collectionParamsManage1 = collectionParamsManageMapper.selectOne(new QueryWrapper().eq("system_type", systemType).eq("param_type", "23")); + // 判断当前是开还是关闭 + getStartOrStopStatus(result, collectionParamsManages, collectionParamsManage1); + } else if (collectionParamsManage.getCurValue().compareTo(new BigDecimal("2")) == 0) { + // 如果是2,说明是"定时模式" + result.put("autoType", "定时模式"); + // 获取定时启动标识标识 + CollectionParamsManage collectionParamsManage1 = collectionParamsManageMapper.selectOne(new QueryWrapper().eq("system_type", systemType).eq("param_type", "29")); + // 判断当前是开还是关闭 + getStartOrStopStatus(result, collectionParamsManages, collectionParamsManage1); + } + } + // 查询报警内容 + List alarmList = collectionParamsManageMapper.selectList(new QueryWrapper() + .eq("system_type", systemType) + .eq("param_type", "5") + .eq("cur_value", 1)); + result.put("alarmList", alarmList); + return result; + } + + private void getStartOrStopStatus(HashMap result, List collectionParamsManages, CollectionParamsManage collectionParamsManage1) { + if (null != collectionParamsManage1 && null != collectionParamsManage1.getCurValue()) { + String startOrStopStatus = ""; + if (collectionParamsManage1.getCurValue().compareTo(new BigDecimal("1")) == 0) { + // 如果是1,说明是"开启",匹配系统启动状态是开启的 + CollectionParamsManage statusParam = collectionParamsManages.stream().filter(val -> val.getOtherName().contains("一键启动")).findFirst().get(); + switch (statusParam.getCurValue().intValue()) { + case 0: + startOrStopStatus = ""; + break; + case 1: + startOrStopStatus = "开阀中......"; + break; + case 2: + startOrStopStatus = "阀开失败检测报警,开启冷却泵......"; + break; + case 3: + startOrStopStatus = "开启冷却塔风机中......"; + break; + case 4: + startOrStopStatus = "开启冷冻泵中......"; + break; + case 5: + startOrStopStatus = "开启主机中......"; + break; + default: + break; + } + } else if (collectionParamsManage1.getCurValue().compareTo(new BigDecimal("0")) == 0) { + // 如果是0,说明是"关闭" + // 如果是0,说明是"开启",匹配系统启动状态是关闭的 + CollectionParamsManage statusParam = collectionParamsManages.stream().filter(val -> val.getOtherName().contains("一键关机")).findFirst().get(); + switch (statusParam.getCurValue().intValue()) { + case 0: + startOrStopStatus = ""; + break; + case 1: + startOrStopStatus = "关主机中......"; + break; + case 2: + startOrStopStatus = "关闭冷却塔风机中......"; + break; + case 3: + startOrStopStatus = "关闭冷却泵中......"; + break; + case 4: + startOrStopStatus = "关闭冷冻泵中......"; + break; + case 5: + startOrStopStatus = "关闭冷却阀门中......"; + break; + case 6: + startOrStopStatus = "关闭冷冻阀门中......"; + break; + default: + break; + } + } + result.put("startOrStopStatus", startOrStopStatus); + } + } + @Override public CollectionParamsManage getOneKeyButton(String systemType, String paramType) { - return collectionParamsManageMapper.selectOne(new QueryWrapper().eq("system_type", systemType).eq("param_type", paramType)); + CollectionParamsManage collectionParamsManage = collectionParamsManageMapper.selectOne(new QueryWrapper().eq("system_type", systemType).eq("param_type", paramType)); + // 判断启动模式是什么 + if (null != collectionParamsManage && null != collectionParamsManage.getCurValue()) { + // 判断启动模式是什么 + if (collectionParamsManage.getCurValue().compareTo(new BigDecimal("1")) == 0) { + // 如果是1,说明是"一键开关模式" + return collectionParamsManageMapper.selectOne(new QueryWrapper().eq("system_type", systemType).eq("param_type", "23")); + } else if (collectionParamsManage.getCurValue().compareTo(new BigDecimal("2")) == 0) { + // 如果是2,说明是"定时模式" + return collectionParamsManageMapper.selectOne(new QueryWrapper().eq("system_type", systemType).eq("param_type", "29")); + } + } + return null; } @Override diff --git a/mh-system/src/main/java/com/mh/system/service/operation/impl/OperationDeviceServiceImpl.java b/mh-system/src/main/java/com/mh/system/service/operation/impl/OperationDeviceServiceImpl.java index 538250c..b8d38f3 100644 --- a/mh-system/src/main/java/com/mh/system/service/operation/impl/OperationDeviceServiceImpl.java +++ b/mh-system/src/main/java/com/mh/system/service/operation/impl/OperationDeviceServiceImpl.java @@ -4,11 +4,13 @@ import com.alibaba.fastjson2.JSONObject; import com.mh.common.constant.Constants; import com.mh.common.core.domain.entity.CollectionParamsManage; import com.mh.common.core.domain.entity.OrderEntity; +import com.mh.common.core.domain.entity.PolicyManage; import com.mh.common.model.request.AdvantechDatas; import com.mh.common.model.response.AdvantechResponse; import com.mh.common.utils.DateUtils; import com.mh.common.utils.StringUtils; import com.mh.system.mapper.device.CollectionParamsManageMapper; +import com.mh.system.mapper.policy.PolicyManageMapper; import com.mh.system.service.operation.IOperationDeviceService; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; @@ -32,6 +34,9 @@ public class OperationDeviceServiceImpl implements IOperationDeviceService { @Resource private CollectionParamsManageMapper collectionParamsManageMapper; + @Resource + private PolicyManageMapper policyManageMapper; + @Override public String operationDevice(List changeValues) { // 拼接发送的报文 @@ -60,7 +65,7 @@ public class OperationDeviceServiceImpl implements IOperationDeviceService { if (type != null) { if (type == 1) { // 设置延时开关时间,参数需要*1000 - message = new BigDecimal(message).multiply(new BigDecimal("1000")).toString(); + message = String.valueOf(new BigDecimal(message).multiply(new BigDecimal("1000")).intValue()); } } // 获取报文其他信息 @@ -73,6 +78,11 @@ public class OperationDeviceServiceImpl implements IOperationDeviceService { } else { return null; } + // 从策略管理中获取数据,判断是否需要放大10的n次方倍 + PolicyManage policyManage = policyManageMapper.selectOntByCpmId(changeValue.getId()); + if (null != policyManage) { + message = String.valueOf(new BigDecimal(message).multiply(BigDecimal.valueOf(Math.pow(10, policyManage.getDigits()))).intValue()); + } } // 发送报文 AdvantechDatas data = new AdvantechDatas(); diff --git a/mh-system/src/main/java/com/mh/system/service/policy/impl/PolicyManageServiceImpl.java b/mh-system/src/main/java/com/mh/system/service/policy/impl/PolicyManageServiceImpl.java index 3a31bcc..5416f81 100644 --- a/mh-system/src/main/java/com/mh/system/service/policy/impl/PolicyManageServiceImpl.java +++ b/mh-system/src/main/java/com/mh/system/service/policy/impl/PolicyManageServiceImpl.java @@ -3,6 +3,7 @@ package com.mh.system.service.policy.impl; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.mh.common.core.domain.dto.DeviceMonitorDTO; import com.mh.common.core.domain.entity.PolicyManage; +import com.mh.system.mapper.device.CollectionParamsManageMapper; import com.mh.system.mapper.policy.PolicyManageMapper; import com.mh.system.service.policy.IPolicyManageService; import org.springframework.stereotype.Service; @@ -26,8 +27,11 @@ public class PolicyManageServiceImpl implements IPolicyManageService { private final PolicyManageMapper policyManageMapper; - public PolicyManageServiceImpl(PolicyManageMapper policyManageMapper) { + private final CollectionParamsManageMapper collectionParamsManageMapper; + + public PolicyManageServiceImpl(PolicyManageMapper policyManageMapper, CollectionParamsManageMapper collectionParamsManageMapper) { this.policyManageMapper = policyManageMapper; + this.collectionParamsManageMapper = collectionParamsManageMapper; } @Override @@ -43,9 +47,21 @@ public class PolicyManageServiceImpl implements IPolicyManageService { .sorted(Comparator.comparing(PolicyManage::getPolicyType)) // 处理每个PolicyManage对象的curValue字段 .peek(policy -> { - if (funPolicyType.equals("2")) { + // 自动开关机时间,反馈是ms,页面是s,所以除以1000 +// if (funPolicyType.equals("2")) { policy.setCurValue(BigDecimal.valueOf(policy.getCurValue() - .divide(new BigDecimal(1000)).intValue())); // 除以1000并保留整数 + .divide(new BigDecimal( (int) Math.pow(10, policy.getDigits()))).intValue())); // 除以1000并保留整数 +// } + // 判断系统启动模式值是多少,如果是1:一键启动,如果是2:定时启动 + if (policy.getPolicyType().equals("5") && policy.getFunPolicyType().equals("1")) { + // 查询当前系统启动模式值 + int systemStartMode = collectionParamsManageMapper.selectById(policy.getCpmId()).getCurValue().intValue(); + // 匹配orderNum值,如果相等,哪个的curValue=1 + if (policy.getOrderNum() == systemStartMode) { + policy.setCurValue(BigDecimal.valueOf(1)); + } else { + policy.setCurValue(BigDecimal.valueOf(0)); + } } }) // 再按PolicyName分组 diff --git a/sql/表结构设计.sql b/sql/表结构设计.sql index 64c23dc..99dbb3a 100644 --- a/sql/表结构设计.sql +++ b/sql/表结构设计.sql @@ -686,4 +686,18 @@ COMMENT ON COLUMN policy_manage.system_type IS '项目类型'; COMMENT ON COLUMN policy_manage.fun_policy_type IS '功能策略类型:0 设备策略,1 定时策略'; -- 创建索引 -CREATE INDEX idx_binding_param ON policy_manage(cpm_id); \ No newline at end of file +CREATE INDEX idx_binding_param ON policy_manage(cpm_id); + +-- 2025-04-17 策略管理添加字段 +ALTER TABLE public.policy_manage ADD min int NULL; +COMMENT ON COLUMN public.policy_manage.min IS '最低值'; +ALTER TABLE public.policy_manage ADD max int NULL; +COMMENT ON COLUMN public.policy_manage.max IS '最大值'; +ALTER TABLE public.policy_manage ADD digits int NULL DEFAULT 0; +COMMENT ON COLUMN public.policy_manage.digits IS '保留的小数位'; + +-- mqtt消息队列 +ALTER TABLE public.mqtt_subscription ADD queue_type int NULL; +COMMENT ON COLUMN public.mqtt_subscription.queue_type IS '主题类型:1(接收),2(发送)'; + +