From 6973b889c8afa222fb37655371f4d8815c01f007 Mon Sep 17 00:00:00 2001 From: 25604 Date: Wed, 11 Jun 2025 19:39:18 +0800 Subject: [PATCH] =?UTF-8?q?1=E3=80=81netty=E6=8E=A7=E5=88=B6=E7=83=AD?= =?UTF-8?q?=E6=B3=B5=E8=AE=BE=E7=BD=AE=EF=BC=9B=202=E3=80=81=E5=A4=A7?= =?UTF-8?q?=E5=B1=8F=E6=95=B0=E6=8D=AE=E6=95=B4=E5=90=88=EF=BC=9B=203?= =?UTF-8?q?=E3=80=81=E9=A3=8E=E6=9F=9C=E7=B3=BB=E7=BB=9F=E7=AD=96=E7=95=A5?= =?UTF-8?q?=E7=AE=A1=E7=90=86=E4=BC=98=E5=8C=96=EF=BC=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/com/mh/common/utils/ModbusUtils.java | 20 +++++ .../java/com/mh/common/utils/NettyTools.java | 2 +- .../com/mh/common/utils/SendOrderUtils.java | 13 +++- .../dealdata/DataProcessService.java | 2 +- .../dealdata/impl/DataProcessServiceImpl.java | 4 +- .../mh/framework/netty/EchoServerHandler.java | 77 ++++++++++++++++--- .../mh/framework/netty/NettyServiceImpl.java | 12 ++- .../java/com/mh/quartz/task/DealDataTask.java | 7 +- .../mapper/device/DataProcessMapper.java | 17 ++-- .../mapper/policy/PolicyManageMapper.java | 2 +- .../overview/impl/BigScreenServiceImpl.java | 50 +++++++++--- .../policy/impl/PolicyManageServiceImpl.java | 2 +- 12 files changed, 169 insertions(+), 39 deletions(-) diff --git a/mh-common/src/main/java/com/mh/common/utils/ModbusUtils.java b/mh-common/src/main/java/com/mh/common/utils/ModbusUtils.java index 36a3637..9ec7f83 100644 --- a/mh-common/src/main/java/com/mh/common/utils/ModbusUtils.java +++ b/mh-common/src/main/java/com/mh/common/utils/ModbusUtils.java @@ -3,6 +3,7 @@ package com.mh.common.utils; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; +import lombok.extern.slf4j.Slf4j; /** * @author LJF @@ -11,7 +12,24 @@ import io.netty.channel.ChannelHandlerContext; * @description Modbus协议工具类 * @date 2025-06-06 14:40:24 */ +@Slf4j public class ModbusUtils { + + public static String createReadOrder(String mtCode, String funCode, String registerAddr, String registerNum) { + // 开始创建指令 + // 拼接指令 + String sendOrderStr = ExchangeStringUtil.addZeroForNum(mtCode, 2) + + ExchangeStringUtil.addZeroForNum(funCode, 2) + + ExchangeStringUtil.addZeroForNum(registerAddr, 4) + + ExchangeStringUtil.addZeroForNum(registerNum, 4); + byte[] strOrder = ExchangeStringUtil.hexStrToBinaryStr(sendOrderStr); + int checkNum = CRC16.CRC16_MODBUS(strOrder); + String checkWord = ExchangeStringUtil.decToHex(String.valueOf(checkNum)); + checkWord = checkWord.substring(2, 4) + checkWord.substring(0, 2); + sendOrderStr = sendOrderStr + checkWord; + return sendOrderStr; + } + public static String createControlCode(String mtCode, Integer type, String registerAddr, String param) { String orderStr; mtCode = ExchangeStringUtil.addZeroForNum(mtCode, 2); @@ -22,6 +40,8 @@ public class ModbusUtils { int checkNum = CRC16.CRC16_MODBUS(strOrder); String checkWord = ExchangeStringUtil.decToHex(String.valueOf(checkNum)); checkWord = checkWord.substring(2, 4) + checkWord.substring(0, 2); + // 发送的指令 + log.info("发送指令:{}", orderStr+checkWord); return orderStr + checkWord; } diff --git a/mh-common/src/main/java/com/mh/common/utils/NettyTools.java b/mh-common/src/main/java/com/mh/common/utils/NettyTools.java index 123a574..879b3a1 100644 --- a/mh-common/src/main/java/com/mh/common/utils/NettyTools.java +++ b/mh-common/src/main/java/com/mh/common/utils/NettyTools.java @@ -67,7 +67,7 @@ public class NettyTools { public static void setReceiveMsg(String key, String msg) { if(responseMsgCache.getIfPresent(key) != null){ - responseMsgCache.getIfPresent(key).add(msg); + Objects.requireNonNull(responseMsgCache.getIfPresent(key)).add(msg); return; } diff --git a/mh-common/src/main/java/com/mh/common/utils/SendOrderUtils.java b/mh-common/src/main/java/com/mh/common/utils/SendOrderUtils.java index 054ca31..dd695a9 100644 --- a/mh-common/src/main/java/com/mh/common/utils/SendOrderUtils.java +++ b/mh-common/src/main/java/com/mh/common/utils/SendOrderUtils.java @@ -37,7 +37,7 @@ public class SendOrderUtils { ctx.channel().writeAndFlush(buffer); log.info("sends :" + sendOrderStr + ",num:" + num + ",records:" + size); try { - Thread.sleep(1000); + Thread.sleep(500); } catch (InterruptedException e) { log.error("线程休眠异常", e); } @@ -50,4 +50,15 @@ public class SendOrderUtils { buffer.writeBytes(ExchangeStringUtil.hexStrToBinaryStr(sendStr));//对接需要16进制 return buffer; } + + public static void sendOrderToDTU(ChannelHandlerContext ctx, String sendStr) { + ByteBuf buffer = getByteBuf(ctx, sendStr); + // 发送数据 + ctx.channel().writeAndFlush(buffer); + try { + Thread.sleep(500); + } catch (InterruptedException e) { + log.error("线程休眠异常", e); + } + } } diff --git a/mh-framework/src/main/java/com/mh/framework/dealdata/DataProcessService.java b/mh-framework/src/main/java/com/mh/framework/dealdata/DataProcessService.java index 0764e6d..9d97c08 100644 --- a/mh-framework/src/main/java/com/mh/framework/dealdata/DataProcessService.java +++ b/mh-framework/src/main/java/com/mh/framework/dealdata/DataProcessService.java @@ -37,7 +37,7 @@ public interface DataProcessService { * @param deviceNum * @return */ - String queryInitValue(String deviceNum, String mtCode); + String queryInitValue(String deviceNum, String mtCode, String registerAddr); /** * 查询上一次采集数据、时间等参数 diff --git a/mh-framework/src/main/java/com/mh/framework/dealdata/impl/DataProcessServiceImpl.java b/mh-framework/src/main/java/com/mh/framework/dealdata/impl/DataProcessServiceImpl.java index 9f74ffa..5c04930 100644 --- a/mh-framework/src/main/java/com/mh/framework/dealdata/impl/DataProcessServiceImpl.java +++ b/mh-framework/src/main/java/com/mh/framework/dealdata/impl/DataProcessServiceImpl.java @@ -234,11 +234,11 @@ public class DataProcessServiceImpl implements DataProcessService { } @Override - public String queryInitValue(String deviceNum, String mtCode) { + public String queryInitValue(String deviceNum, String mtCode, String registerAddr) { if (StringUtils.isEmpty(mtCode)) { return dataProcessMapper.queryInitValue1(deviceNum); } else { - return dataProcessMapper.queryInitValue(deviceNum, mtCode); + return dataProcessMapper.queryInitValue(deviceNum, mtCode, registerAddr); } } diff --git a/mh-framework/src/main/java/com/mh/framework/netty/EchoServerHandler.java b/mh-framework/src/main/java/com/mh/framework/netty/EchoServerHandler.java index 20ca39c..e74c710 100644 --- a/mh-framework/src/main/java/com/mh/framework/netty/EchoServerHandler.java +++ b/mh-framework/src/main/java/com/mh/framework/netty/EchoServerHandler.java @@ -30,6 +30,7 @@ import java.math.BigDecimal; import java.util.ArrayList; import java.util.Date; import java.util.List; +import java.util.concurrent.TimeUnit; @Slf4j public class EchoServerHandler extends ChannelInboundHandlerAdapter { @@ -143,7 +144,7 @@ public class EchoServerHandler extends ChannelInboundHandlerAdapter { deviceCodeParamList = arrayCache.toList(CollectionParamsManage.class); } size = deviceCodeParamList.size(); - log.info("deviceCodeParam size ===> {}", size); +// log.info("deviceCodeParam size ===> {}", size); // 清空receiveStr receiveStr = ""; num = 0; @@ -160,30 +161,79 @@ public class EchoServerHandler extends ChannelInboundHandlerAdapter { log.info("gateway not find deviceCodeParam!"); } } else if (receiveStr.length() == 18) { - // 水电表、热泵设置返回数据解析 + // 水电表返回数据解析 idleCount = 1; log.info("水电表、热泵设置接收==>{},长度:{}", receiveStr, receiveStr.length()); nextSendOrder(ctx); } else if (receiveStr.length() == 12 || receiveStr.length() == 14) { // 热泵返回数据解析 idleCount = 1; - log.info("热泵读取接收===>{},长度:{}", receiveStr, receiveStr.length()); - nextSendOrder(ctx); - } else if (receiveStr.length() == 16) { - // 热泵设置指令返回 - // 判断是否有指令发送 - if (redisCache.hasKey("order_send") && redisCache.getCacheObject("order_send").equals(receiveStr)) { - NettyTools.setReceiveMsg("order_wait", receiveStr); + log.info("热泵读取接收===>{},长度:{},是否存在order_send_read: {}", receiveStr, receiveStr.length(), redisCache.hasKey("order_send_read")); + if (redisCache.hasKey("order_send_read")) { + log.error("order_send_read存在,接收到指令是{}", receiveStr); + CollectionParamsManage collectionParamsManage = new CollectionParamsManage(); + collectionParamsManage.setDataType(2); + collectionParamsManage.setParamType("14"); + Object orderSend = redisCache.getCacheObject("order_send_read"); + String orderSendStr = String.valueOf(orderSend); + String substring = orderSendStr.substring(0, 2); + collectionParamsManage.setOtherName(ExchangeStringUtil.hexToDec(substring) + "号热泵温度设置"); + analysisReceiveData(receiveStr, collectionParamsManage); + redisCache.deleteObject("order_send_read"); + } else { + nextSendOrder(ctx); } + } else if (receiveStr.length() == 16) { + idleCount = 1; nextSendOrder(ctx); - } else if (receiveStr.length() > 50 && receiveStr.length() < 100) { + // 热泵设置指令返回 + controlOrder(ctx); + } else if (receiveStr.length() > 20 && receiveStr.length() < 100) { idleCount = 1; // 清空receiveStr nextSendOrder(ctx); + controlOrder(ctx); } ctx.flush(); } + private void controlOrder(ChannelHandlerContext ctx) { + // 热泵设置指令返回 + if (redisCache.hasKey("order_send")) { + // 判断是否有指令发送 + Object orderSend = redisCache.getCacheObject("order_send"); + String orderSendStr = String.valueOf(orderSend); +// // 发送读取热泵设置温度 +// String controlCode = ModbusUtils.createReadOrder(orderSendStr.substring(0, 2), +// "03", +// "0003", +// "1"); +// SendOrderUtils.sendOrderToDTU(ctx, controlCode); + if (receiveStr.contains(orderSendStr)) { + String readOrder = ModbusUtils.createReadOrder(orderSendStr.substring(0, 2), + "03", + "0003", + "1"); + // 初始化发送指令 +// NettyTools.initReceiveMsg("order_wait_read"); + // 发送读取指令 + redisCache.setCacheObject("order_send_read", readOrder, 10, TimeUnit.SECONDS); + ctx.writeAndFlush(ModbusUtils.createByteBuf(readOrder)); + // 发送读取指令 + log.error("热泵设置读取指令发送:{},order_send_read键值:{}", readOrder, redisCache.hasKey("order_send_read")); + log.error("热泵设置指令返回:{}", receiveStr); + NettyTools.setReceiveMsg("order_wait", receiveStr); + redisCache.deleteObject("order_send"); + receiveStr = ""; + try { + Thread.sleep(500); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + } + private void dealSession(ChannelHandlerContext ctx) { // 获取表号 String deviceCode =receiveStr; @@ -227,10 +277,15 @@ public class EchoServerHandler extends ChannelInboundHandlerAdapter { private void nextSendOrder(ChannelHandlerContext ctx) throws InterruptedException { // 发送指令响应不用解析 - if (receiveStr.length() != 16) { + if (receiveStr.length() != 16 && receiveStr.length() < 20) { // 解析采集的报文,并保存到数据库 analysisReceiveData(receiveStr, deviceCodeParamList.get(num)); } + // 判断是否有远程指令发送,如果有先不采集 + if (redisCache.hasKey("order_send")) { + log.error("有远程设置指令发送,不进行采集"); + return; + } // 清空receiveStr receiveStr = ""; // 判断发送的下标,如果不等于指令数组大小 diff --git a/mh-framework/src/main/java/com/mh/framework/netty/NettyServiceImpl.java b/mh-framework/src/main/java/com/mh/framework/netty/NettyServiceImpl.java index 4a92e8e..efb88cf 100644 --- a/mh-framework/src/main/java/com/mh/framework/netty/NettyServiceImpl.java +++ b/mh-framework/src/main/java/com/mh/framework/netty/NettyServiceImpl.java @@ -95,12 +95,20 @@ public class NettyServiceImpl implements INettyService { NettyTools.initReceiveMsg("order_wait"); // 设置缓存,方便在netty中判断发送的指令 redisCache.setCacheObject("order_send", controlCode, 10, TimeUnit.SECONDS); - // 发送指令 + Thread.sleep(500); + // 发送控制指令 serverSession.getChannel().writeAndFlush(ModbusUtils.createByteBuf(controlCode)); // 等待指令 if (NettyTools.waitReceiveMsg("order_wait")) { - log.error("发送指令成功,心跳包:{}", gatewayManage.getHeartBeat()); + // 初始化发送指令 + Thread.sleep(3000); return true; +// if (NettyTools.waitReceiveMsg("order_wait_read")) { +// return true; +// } else { +// log.error("读取指令异常,心跳包:{}", gatewayManage.getHeartBeat()); +// return false; +// } } else { log.error("发送指令异常,心跳包:{}", gatewayManage.getHeartBeat()); return false; diff --git a/mh-quartz/src/main/java/com/mh/quartz/task/DealDataTask.java b/mh-quartz/src/main/java/com/mh/quartz/task/DealDataTask.java index a59a518..1598182 100644 --- a/mh-quartz/src/main/java/com/mh/quartz/task/DealDataTask.java +++ b/mh-quartz/src/main/java/com/mh/quartz/task/DealDataTask.java @@ -115,7 +115,7 @@ public class DealDataTask { int ratio = entity.getMtRatio(); if (ObjectUtils.isEmpty(lastData) || ObjectUtils.isEmpty(lastData.getLastValue())) { //从device_manage取出初始值 - String initValue = dataProcessService.queryInitValue(entity.getMtNum(), entity.getMtCode()); + String initValue = dataProcessService.queryInitValue(entity.getMtNum(), entity.getMtCode(), entity.getRegisterAddr()); if (StringUtils.isEmpty(initValue)) { initValue = "0"; } @@ -207,6 +207,7 @@ public class DealDataTask { } temp.setDeviceType(deviceType); temp.setRatio(ratio); + temp.setRegisterAddr(dataI.getRegisterAddr()); dataMinList.add(temp); i++; j++; @@ -324,7 +325,7 @@ public class DealDataTask { DeviceReport hourEntity = dataProcessService.queryLastValue(key, tableType); if (ObjectUtils.isEmpty(hourEntity)) { //查询设备信息初始值 - lastValue = dataProcessService.queryInitValue(key, null); + lastValue = dataProcessService.queryInitValue(key, null, null); if (StringUtils.isEmpty(lastValue)) { lastValue = "0"; } @@ -505,7 +506,7 @@ public class DealDataTask { DeviceReport entity = deviceList.get(i); DeviceReport lastEntity = dataProcessService.queryLastValue(deviceNum, "month"); if (ObjectUtils.isEmpty(lastEntity)) { - lastValue = dataProcessService.queryInitValue(deviceNum, null); + lastValue = dataProcessService.queryInitValue(deviceNum, null, null); lastTime = entity.getCurTime(); } else { lastValue = lastEntity.getLastValue(); diff --git a/mh-system/src/main/java/com/mh/system/mapper/device/DataProcessMapper.java b/mh-system/src/main/java/com/mh/system/mapper/device/DataProcessMapper.java index 18965dc..dbb21a3 100644 --- a/mh-system/src/main/java/com/mh/system/mapper/device/DataProcessMapper.java +++ b/mh-system/src/main/java/com/mh/system/mapper/device/DataProcessMapper.java @@ -75,7 +75,7 @@ public interface DataProcessMapper { */ @Select("") + String queryInitValue(@Param("deviceNum") String deviceNum, @Param("mtCode") String mtCode, @Param("registerAddr") String registerAddr); /** * 查询初始数据 diff --git a/mh-system/src/main/java/com/mh/system/mapper/policy/PolicyManageMapper.java b/mh-system/src/main/java/com/mh/system/mapper/policy/PolicyManageMapper.java index f975aa1..8ddcfcd 100644 --- a/mh-system/src/main/java/com/mh/system/mapper/policy/PolicyManageMapper.java +++ b/mh-system/src/main/java/com/mh/system/mapper/policy/PolicyManageMapper.java @@ -32,7 +32,7 @@ public interface PolicyManageMapper extends BaseMapper { " where pm.system_type = #{systemType} " + " and pm.fun_policy_type = #{funPolicyType} " + " and house_id = #{houseId} " + - " order by pm.policy_type, pm.order_num ") + " order by pm.policy_type::int, pm.order_num ") List selectPolicyListByParams(@Param("systemType") String systemType, @Param("funPolicyType") String funPolicyType, @Param("houseId") String houseId); diff --git a/mh-system/src/main/java/com/mh/system/service/overview/impl/BigScreenServiceImpl.java b/mh-system/src/main/java/com/mh/system/service/overview/impl/BigScreenServiceImpl.java index dcc584d..fd9f39b 100644 --- a/mh-system/src/main/java/com/mh/system/service/overview/impl/BigScreenServiceImpl.java +++ b/mh-system/src/main/java/com/mh/system/service/overview/impl/BigScreenServiceImpl.java @@ -50,7 +50,7 @@ public class BigScreenServiceImpl implements IBigScreenService { BigScreenOverviewAndBasicDTO bigScreenOverviewAndBasicDTO = new BigScreenOverviewAndBasicDTO(); for (String paramType : paramTypes) { int mtIsSum = queryCollectionParams(paramType, 40); - BigDecimal useValue = bigScreenMapper.selectOverviewData(tableName, paramType, vo.getStartTime(), vo.getEndTime(), mtIsSum, 40, null); + BigDecimal useValue = bigScreenMapper.selectOverviewData(tableName, paramType, vo.getStartTime(), vo.getEndTime(), mtIsSum, 40, vo.getSystemType()); switch (paramType) { case "16": bigScreenOverviewAndBasicDTO.setTotalEle(useValue); @@ -70,13 +70,40 @@ public class BigScreenServiceImpl implements IBigScreenService { } // 换算标准煤 // 用电转换,公式:标准煤量(kgce)= 用电量(kWh)×0.1229 - bigScreenOverviewAndBasicDTO.setEleStandardCoal(bigScreenOverviewAndBasicDTO.getTotalEle().multiply(new BigDecimal("0.1229"))); - // 用冷转换,公式:标准煤量(kgce)= 3.6*用冷量(kW)➗29.3076 - bigScreenOverviewAndBasicDTO.setColdStandardCoal(bigScreenOverviewAndBasicDTO.getTotalCold().multiply(new BigDecimal("3.6")).divide(new BigDecimal("29.3076"))); - // 用水转换,公式:标准煤量(kgce)=用水量(吨)×0.0857 - bigScreenOverviewAndBasicDTO.setWaterStandardCoal(bigScreenOverviewAndBasicDTO.getTotalWater().multiply(new BigDecimal("0.0857"))); - // 用气转换,公式:标准煤量(kgce)=用蒸汽量(吨)×0.0948 - bigScreenOverviewAndBasicDTO.setGasStandardCoal(bigScreenOverviewAndBasicDTO.getTotalGas().multiply(new BigDecimal("0.0948"))); + if (bigScreenOverviewAndBasicDTO.getTotalEle() != null) { + bigScreenOverviewAndBasicDTO.setEleStandardCoal( + bigScreenOverviewAndBasicDTO.getTotalEle() + .multiply(new BigDecimal("0.1229")) + .setScale(2, BigDecimal.ROUND_HALF_UP) + ); + } + + // 用冷转换,公式:标准煤量(kgce)= 3.6 × 用冷量(kW) ÷ 29.3076 + if (bigScreenOverviewAndBasicDTO.getTotalCold() != null) { + bigScreenOverviewAndBasicDTO.setColdStandardCoal( + bigScreenOverviewAndBasicDTO.getTotalCold() + .multiply(new BigDecimal("3.6")) + .divide(new BigDecimal("29.3076"), 2, BigDecimal.ROUND_HALF_UP) + ); + } + + // 用水转换,公式:标准煤量(kgce)= 用水量(吨) × 0.0857 + if (bigScreenOverviewAndBasicDTO.getTotalWater() != null) { + bigScreenOverviewAndBasicDTO.setWaterStandardCoal( + bigScreenOverviewAndBasicDTO.getTotalWater() + .multiply(new BigDecimal("0.0857")) + .setScale(2, BigDecimal.ROUND_HALF_UP) + ); + } + + // 用气转换,公式:标准煤量(kgce)= 用蒸汽量(吨) × 0.0948 + if (bigScreenOverviewAndBasicDTO.getTotalGas() != null) { + bigScreenOverviewAndBasicDTO.setGasStandardCoal( + bigScreenOverviewAndBasicDTO.getTotalGas() + .multiply(new BigDecimal("0.0948")) + .setScale(2, BigDecimal.ROUND_HALF_UP) + ); + } // 获取建筑面积 List sysParams = sysParamsMapper.selectSysParamsList(); if (!sysParams.isEmpty()) { @@ -182,7 +209,8 @@ public class BigScreenServiceImpl implements IBigScreenService { dateFormat = "yyyy-MM"; break; case "month": - tableName = "data_day"; + tableName = "data_day" + vo.getStartTime().substring(0, 4); + ; dateFormat = "yyyy-MM-dd"; break; case "day": @@ -206,9 +234,9 @@ public class BigScreenServiceImpl implements IBigScreenService { // 根据时间分组,求和取总和,再根据时间升序排序 // 使用LinkedHashMap保持时间顺序 LinkedHashMap eleMap = dataList.stream() - .sorted(Comparator.comparing(val->DateUtils.dateToString(val.getCurTime(), finalDateFormat))) // 按时间字段排序 + .sorted(Comparator.comparing(val -> DateUtils.dateToString(val.getCurTime(), finalDateFormat))) // 按时间字段排序 .collect(Collectors.groupingBy( - val->DateUtils.dateToString(val.getCurTime(), finalDateFormat), + val -> DateUtils.dateToString(val.getCurTime(), finalDateFormat), LinkedHashMap::new, Collectors.summingDouble(value -> value.getCalcValue().doubleValue()) diff --git a/mh-system/src/main/java/com/mh/system/service/policy/impl/PolicyManageServiceImpl.java b/mh-system/src/main/java/com/mh/system/service/policy/impl/PolicyManageServiceImpl.java index a40ba59..0372a51 100644 --- a/mh-system/src/main/java/com/mh/system/service/policy/impl/PolicyManageServiceImpl.java +++ b/mh-system/src/main/java/com/mh/system/service/policy/impl/PolicyManageServiceImpl.java @@ -65,7 +65,7 @@ public class PolicyManageServiceImpl implements IPolicyManageService { List policyManages = policyManageMapper.selectPolicyListByParams(systemType, funPolicyType, houseId); Map> listMap = policyManages.stream() // 先排序(例如按 policyType) - .sorted(Comparator.comparing(PolicyManage::getPolicyType)) +// .sorted(Comparator.comparing(PolicyManage::getPolicyType)) // 处理每个PolicyManage对象的curValue字段 .peek(policy -> { policy.setCurValue(BigDecimal.valueOf(policy.getCurValue().intValue())); // 除以1000并保留整数