From 7b52dd62f1c717a5aed62e71d1091edbb230e28e Mon Sep 17 00:00:00 2001 From: mh Date: Mon, 23 Sep 2024 17:54:40 +0800 Subject: [PATCH] =?UTF-8?q?1=E3=80=81=E9=87=87=E7=94=A8=E5=B7=A5=E5=8E=82+?= =?UTF-8?q?=E7=AD=96=E7=95=A5=E6=94=B9=E9=80=A0=E4=BF=AE=E6=94=B9=E4=BF=9D?= =?UTF-8?q?=E5=AD=98=E6=95=B0=E6=8D=AE=E5=BA=93=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- 2024新增脚本.sql | 7 + .../mh/user/entity/DeviceCodeParamEntity.java | 16 +- .../com/mh/user/factory/CJ188Protocol.java | 5 +- .../java/com/mh/user/factory/EleProtocol.java | 5 +- .../com/mh/user/factory/ModbusProtocol.java | 5 +- .../java/com/mh/user/factory/Protocol.java | 5 +- .../com/mh/user/job/CollectionLoopRunner.java | 4 + .../mh/user/mapper/DeviceCodeParamMapper.java | 19 +- .../mapper/chillers/GatewayManageMapper.java | 18 +- .../com/mh/user/netty/EchoServerHandler.java | 303 +++++++----------- .../mh/user/service/MeterManageService.java | 2 + .../service/impl/MeterManageServiceImpl.java | 11 + .../user/strategy/CJ188ProtocolStrategy.java | 3 +- .../mh/user/strategy/EleProtocolStrategy.java | 99 ++++-- .../user/strategy/ModbusProtocolStrategy.java | 149 ++++++++- .../mh/user/strategy/ProtocolStrategy.java | 5 +- .../user/utils/AnalysisReceiveOrder485.java | 24 ++ 17 files changed, 438 insertions(+), 242 deletions(-) diff --git a/2024新增脚本.sql b/2024新增脚本.sql index 6033a94..95ac3c1 100644 --- a/2024新增脚本.sql +++ b/2024新增脚本.sql @@ -234,3 +234,10 @@ EXEC sys.sp_addextendedproperty 'MS_Description', N'寄存器大小', 'schema', ALTER TABLE meter_manage ADD is_use bit DEFAULT 1 NULL; EXEC sys.sp_addextendedproperty 'MS_Description', N'当前采集点位是否启用', 'schema', N'dbo', 'table', N'meter_manage', 'column', N'is_use'; +-- 2024-09-23 冗余设备管理采集参数表数据 +ALTER TABLE mh_jnd.dbo.device_code_param ADD mm_id bigint NULL; +EXEC mh_jnd.sys.sp_addextendedproperty 'MS_Description', N'采集设备参数表ID值', 'schema', N'dbo', 'table', N'device_code_param', 'column', N'mm_id'; +ALTER TABLE mh_jnd.dbo.device_code_param ADD data_type int NULL; +EXEC mh_jnd.sys.sp_addextendedproperty 'MS_Description', N'数据类型', 'schema', N'dbo', 'table', N'device_code_param', 'column', N'data_type'; +ALTER TABLE mh_jnd.dbo.device_code_param ADD protocol_type int NULL; +EXEC mh_jnd.sys.sp_addextendedproperty 'MS_Description', N'协议类型(数据字典表)', 'schema', N'dbo', 'table', N'device_code_param', 'column', N'protocol_type'; diff --git a/user-service/src/main/java/com/mh/user/entity/DeviceCodeParamEntity.java b/user-service/src/main/java/com/mh/user/entity/DeviceCodeParamEntity.java index 17dde8d..d4093dd 100644 --- a/user-service/src/main/java/com/mh/user/entity/DeviceCodeParamEntity.java +++ b/user-service/src/main/java/com/mh/user/entity/DeviceCodeParamEntity.java @@ -25,11 +25,14 @@ public class DeviceCodeParamEntity implements Cloneable { private String funCode; private String registerAddr; private String registerName; - private int digit; //保留小数位 - private int grade; //级别 - private String dataValue; //传入值 - private Date createTime; - private String projectId; + private int digit; //保留小数位 + private int grade; //级别 + private String dataValue; //传入值 + private Date createTime; // 创建时间 + private String projectId; // 项目id + private Long mmId; // 设备管理ID + private int dataType; // 数据类型 + private int protocolType; // 协议类型(数据字典) /** * 重置 @@ -51,6 +54,9 @@ public class DeviceCodeParamEntity implements Cloneable { this.dataValue = null; this.createTime = null; this.projectId = null; + this.mmId = null; + this.dataType = 16; + this.protocolType = 0; } @Override diff --git a/user-service/src/main/java/com/mh/user/factory/CJ188Protocol.java b/user-service/src/main/java/com/mh/user/factory/CJ188Protocol.java index 222d6be..8ccdfb0 100644 --- a/user-service/src/main/java/com/mh/user/factory/CJ188Protocol.java +++ b/user-service/src/main/java/com/mh/user/factory/CJ188Protocol.java @@ -1,5 +1,6 @@ package com.mh.user.factory; +import com.mh.user.entity.DeviceCodeParamEntity; import com.mh.user.entity.MeterManageEntity; import com.mh.user.strategy.ProtocolStrategy; import lombok.extern.slf4j.Slf4j; @@ -40,9 +41,9 @@ public class CJ188Protocol implements Protocol { } @Override - public String analysisReceiveData(MeterManageEntity meterManageEntity, String receiveData) { + public String analysisReceiveData(DeviceCodeParamEntity deviceCodeParamEntity, String receiveData) { log.info("水表标准协议:工厂解析报文"); - return cj188ProtocolStrategy.analysisReceiveData(meterManageEntity, receiveData); + return cj188ProtocolStrategy.analysisReceiveData(deviceCodeParamEntity, receiveData); } } diff --git a/user-service/src/main/java/com/mh/user/factory/EleProtocol.java b/user-service/src/main/java/com/mh/user/factory/EleProtocol.java index 27ba102..d09ab79 100644 --- a/user-service/src/main/java/com/mh/user/factory/EleProtocol.java +++ b/user-service/src/main/java/com/mh/user/factory/EleProtocol.java @@ -1,5 +1,6 @@ package com.mh.user.factory; +import com.mh.user.entity.DeviceCodeParamEntity; import com.mh.user.entity.MeterManageEntity; import com.mh.user.strategy.ProtocolStrategy; import lombok.extern.slf4j.Slf4j; @@ -40,9 +41,9 @@ public class EleProtocol implements Protocol { } @Override - public String analysisReceiveData(MeterManageEntity meterManageEntity, String receiveData) { + public String analysisReceiveData(DeviceCodeParamEntity deviceCodeParamEntity, String receiveData) { log.info("电表97/07规约协议:工厂解析报文"); - return eleProtocolStrategy.analysisReceiveData(meterManageEntity, receiveData); + return eleProtocolStrategy.analysisReceiveData(deviceCodeParamEntity, receiveData); } } diff --git a/user-service/src/main/java/com/mh/user/factory/ModbusProtocol.java b/user-service/src/main/java/com/mh/user/factory/ModbusProtocol.java index 6bf1308..0046470 100644 --- a/user-service/src/main/java/com/mh/user/factory/ModbusProtocol.java +++ b/user-service/src/main/java/com/mh/user/factory/ModbusProtocol.java @@ -1,5 +1,6 @@ package com.mh.user.factory; +import com.mh.user.entity.DeviceCodeParamEntity; import com.mh.user.entity.MeterManageEntity; import com.mh.user.strategy.ProtocolStrategy; import lombok.extern.slf4j.Slf4j; @@ -40,8 +41,8 @@ public class ModbusProtocol implements Protocol { } @Override - public String analysisReceiveData(MeterManageEntity meterManageEntity, String receiveData) { + public String analysisReceiveData(DeviceCodeParamEntity deviceCodeParamEntity, String receiveData) { log.info("modbus标准协议:工厂解析报文"); - return modbusProtocolStrategy.analysisReceiveData(meterManageEntity, receiveData); + return modbusProtocolStrategy.analysisReceiveData(deviceCodeParamEntity, receiveData); } } diff --git a/user-service/src/main/java/com/mh/user/factory/Protocol.java b/user-service/src/main/java/com/mh/user/factory/Protocol.java index 85ec88b..7646dd4 100644 --- a/user-service/src/main/java/com/mh/user/factory/Protocol.java +++ b/user-service/src/main/java/com/mh/user/factory/Protocol.java @@ -1,5 +1,6 @@ package com.mh.user.factory; +import com.mh.user.entity.DeviceCodeParamEntity; import com.mh.user.entity.MeterManageEntity; import com.mh.user.strategy.ProtocolStrategy; @@ -27,10 +28,10 @@ public interface Protocol { /** * 解析指令 - * @param meterManageEntity + * @param deviceCodeParamEntity * @param receiveData * @return */ - String analysisReceiveData(MeterManageEntity meterManageEntity, String receiveData); + String analysisReceiveData(DeviceCodeParamEntity deviceCodeParamEntity, String receiveData); } diff --git a/user-service/src/main/java/com/mh/user/job/CollectionLoopRunner.java b/user-service/src/main/java/com/mh/user/job/CollectionLoopRunner.java index 62409f3..734ada1 100644 --- a/user-service/src/main/java/com/mh/user/job/CollectionLoopRunner.java +++ b/user-service/src/main/java/com/mh/user/job/CollectionLoopRunner.java @@ -133,6 +133,10 @@ public class CollectionLoopRunner implements ApplicationRunner { deviceCodeParamEntity.setProjectId(projectId); deviceCodeParamEntity.setCreateTime(new Date()); + deviceCodeParamEntity.setMmId(meterManageEntity.getId()); + deviceCodeParamEntity.setDataType(meterManageEntity.getDataType()); + deviceCodeParamEntity.setProtocolType(meterManageEntity.getProtocolType()); + deviceCodeParamEntityList.add(deviceCodeParamEntity.clone()); } diff --git a/user-service/src/main/java/com/mh/user/mapper/DeviceCodeParamMapper.java b/user-service/src/main/java/com/mh/user/mapper/DeviceCodeParamMapper.java index 6e94d33..d812099 100644 --- a/user-service/src/main/java/com/mh/user/mapper/DeviceCodeParamMapper.java +++ b/user-service/src/main/java/com/mh/user/mapper/DeviceCodeParamMapper.java @@ -31,7 +31,10 @@ public interface DeviceCodeParamMapper extends BaseMapper @Result(property ="createTime",column ="create_time"), @Result(property ="grade",column ="grade"), @Result(property ="digit",column ="digit"), - @Result(property ="projectId",column ="project_id") + @Result(property ="projectId",column ="project_id"), + @Result(property ="mmId",column ="mm_id"), + @Result(property ="dataType",column ="data_type"), + @Result(property ="protocolType",column ="protocol_type") }) @Select("select id, " + "device_addr, " + @@ -47,7 +50,10 @@ public interface DeviceCodeParamMapper extends BaseMapper "grade, " + "project_id, " + "digit, " + - "register_name " + + "register_name," + + "mm_id," + + "data_type," + + "protocol_type " + " from device_code_param where data_port=#{gatewayPort} order by device_type ") List queryCodeParam(@Param("gatewayPort") String gatewayPort); @@ -94,11 +100,16 @@ public interface DeviceCodeParamMapper extends BaseMapper "parity," + "digit," + "grade," + - "str_data" + + "str_data," + + "mm_id," + + "data_type," + + "protocol_type" + ")" + "values " + "" + - "(#{item.deviceAddr},#{item.deviceName},#{item.deviceType},#{item.dataPort},#{item.baudRate},#{item.brand},#{item.funCode},#{item.registerAddr},#{item.registerName},getDate(),#{item.projectId},#{item.parity},#{item.digit},#{item.grade},#{item.strData})" + + "(#{item.deviceAddr},#{item.deviceName},#{item.deviceType},#{item.dataPort},#{item.baudRate}," + + "#{item.brand},#{item.funCode},#{item.registerAddr},#{item.registerName},getDate(),#{item.projectId}," + + "#{item.parity},#{item.digit},#{item.grade},#{item.strData},#{item.mmId},#{item.dataType},#{item.protocolType})" + "" + "") void insertDeviceCodeParamList(@Param("deviceCodeParamEntityList") List deviceCodeParamEntityList); diff --git a/user-service/src/main/java/com/mh/user/mapper/chillers/GatewayManageMapper.java b/user-service/src/main/java/com/mh/user/mapper/chillers/GatewayManageMapper.java index 3bbda0e..0d4f7e2 100644 --- a/user-service/src/main/java/com/mh/user/mapper/chillers/GatewayManageMapper.java +++ b/user-service/src/main/java/com/mh/user/mapper/chillers/GatewayManageMapper.java @@ -29,21 +29,21 @@ public interface GatewayManageMapper extends BaseMapper { @Result(column = "gateway_address", property = "gatewayAddress"), @Result(column = "data_com", property = "dataCom"), @Result(column = "create_date", property = "createDate"), - @Result(column = "connect_date", property = "connectDate"), + @Result(column = "connect_time", property = "connectDate"), @Result(column = "internet_card", property = "internetCard"), @Result(column = "operator", property = "operator"), - @Result(column = "gateway_port", property = "gatewayPort"), + @Result(column = "port", property = "gatewayPort"), @Result(column = "grade", property = "grade") }) List queryByOther(@Param("grade") Integer grade, @Param("operator") Integer operator); // 查询全部信息 @ResultMap("rs") - @Select("select id,gateway_name, gateway_ip, gateway_address, data_com, connect_date, internet_card, operator, gateway_port, grade from gateway_manage ") + @Select("select id,gateway_name, gateway_ip, gateway_address, data_com, connect_time, internet_card, operator, port, grade from gateway_manage ") List queryAll(); // 添加网关设备 - @Insert("insert into gateway_manage(gateway_name, gateway_ip, gateway_address, data_com, connect_date, grade, internet_card, operator, gateway_port)" + + @Insert("insert into gateway_manage(gateway_name, gateway_ip, gateway_address, data_com, connect_time, grade, internet_card, operator, port)" + " values (#{gatewayName}, #{gatewayIP}, #{gatewayAddress}, #{dataCom}, #{connectDate}, #{grade}, #{internetCard}, #{operator}, #{gatewayPort})") void insertGatewayManage(GatewayManageEntity gatewayManageEntity); @@ -52,7 +52,7 @@ public interface GatewayManageMapper extends BaseMapper { void deleteGatewayManageByID(@Param("gatewayID") int gatewayID); // 根据网关设备ID查询网关对应信息 - @Select("select id, gateway_name, gateway_ip, gateway_address, data_com, collection_loop, connect_date, internet_card, operator, port, grade from gateway_manage where id = #{gatewayID}") + @Select("select id, gateway_name, gateway_ip, gateway_address, data_com, collection_loop, connect_time, internet_card, operator, port, grade from gateway_manage where id = #{gatewayID}") GatewayManageEntity queryGatewayByID(@Param("gatewayID") Long gatewayID); // 查询对应的总数 @@ -60,19 +60,19 @@ public interface GatewayManageMapper extends BaseMapper { int queryByOtherCount(@Param("page") int page, @Param("size") int size, @Param("gatewayID") int gatewayID); // 根据IP和端口号更新服务器在线时间 - @Update("update gateway_manage set connect_date = getDate() where gateway_ip=#{IP} and convert(varchar(20),gateway_port) = #{port}") + @Update("update gateway_manage set connect_time = getDate() where gateway_ip=#{IP} and convert(varchar(20),port) = #{port}") void updateGatewayManage(@Param("IP") String IP,@Param("port") String port); // 根据端口号更新服务器在线时间 - @Update("update gateway_manage set connect_date = getDate() where convert(varchar(20),gateway_port) = #{port}") + @Update("update gateway_manage set connect_time = getDate() where convert(varchar(20),port) = #{port}") void updateGatewayManage2(@Param("port") String port); // 根据设备类型查询数据库,找出对应的详细信息 - @Select("select id, gateway_name, gateway_ip, gateway_address, data_com, connect_date, internet_card, operator, gateway_port, grade from gateway_manage where grade = #{grade}") + @Select("select id, gateway_name, gateway_ip, gateway_address, data_com, connect_time, internet_card, operator, port, grade from gateway_manage where grade = #{grade}") GatewayManageEntity queryGatewayByGrade(@Param("grade") Long grade); //根据端口或者IP或者心跳包查询网关对应的项目名称 - @Select("select project_Name from project_info a join gateway_manage b on a.id=b.project_id and b.gateway_port=#{str} ") + @Select("select project_Name from project_info a join gateway_manage b on a.id=b.project_id and b.port=#{str} ") String selectProjectName(@Param("str") String str); diff --git a/user-service/src/main/java/com/mh/user/netty/EchoServerHandler.java b/user-service/src/main/java/com/mh/user/netty/EchoServerHandler.java index 315d590..66f1969 100644 --- a/user-service/src/main/java/com/mh/user/netty/EchoServerHandler.java +++ b/user-service/src/main/java/com/mh/user/netty/EchoServerHandler.java @@ -14,6 +14,7 @@ import io.netty.handler.timeout.IdleStateEvent; import io.netty.util.ReferenceCountUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.context.ApplicationContext; + import java.text.SimpleDateFormat; import java.util.Date; import java.util.List; @@ -37,7 +38,8 @@ public class EchoServerHandler extends ChannelInboundHandlerAdapter { private int size = 0; private String IP; private String port; - private String receiveStr=""; + private String receiveStr = ""; + /** * 客户端连接会触发 */ @@ -57,7 +59,7 @@ public class EchoServerHandler extends ChannelInboundHandlerAdapter { IdleStateEvent event = (IdleStateEvent) obj; if (IdleState.READER_IDLE.equals(event.state())) { //如果读通道处于空闲状态,说明没有接收到心跳命令 log.info("第{}已经10秒没有接收到客户端的信息了", idleCount); - receiveStr =""; + receiveStr = ""; num = num + 1; if (num > size - 1) { num = 0; @@ -67,7 +69,7 @@ public class EchoServerHandler extends ChannelInboundHandlerAdapter { // SendOrderUtils.sendAllOrder(deviceCodeParamList.get(num),ctx,num,size); } else { // 继续发送下一个采集指令 - SendOrderUtils.sendAllOrder(deviceCodeParamList.get(num),ctx,num,size); + SendOrderUtils.sendAllOrder(deviceCodeParamList.get(num), ctx, num, size); } } } else { @@ -104,29 +106,23 @@ public class EchoServerHandler extends ChannelInboundHandlerAdapter { // 当前批量读取中的最后一条消息 @Override - public void channelReadComplete(ChannelHandlerContext ctx) throws Exception{ + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { //心跳包报文: 24 00 60 95 - receiveStr= receiveStr.toUpperCase();//返回值全部变成大写 - //截取去掉FE - if (receiveStr.length()>8){ - String str1=receiveStr.substring(0,8); - String str2=receiveStr.substring(8); - receiveStr=str1.replace("FE", "")+str2; - }else{ - receiveStr = receiveStr.replace("FE", ""); - } - log.info("channelReadComplete接收到的数据长度: ===> {}", receiveStr.length()); + receiveStr = receiveStr.toUpperCase();//返回值全部变成大写 + log.info("channelReadComplete接收到的数据{}, 长度: ===> {}", receiveStr, receiveStr.length()); //心跳包处理 - if ((receiveStr.length() == 8) && receiveStr.startsWith("24")) { +// if ((receiveStr.length() == 8) && receiveStr.startsWith("24")) { + if ((receiveStr.length() == 8) && receiveStr.startsWith("C0A801FE")) { log.info("接收到心跳包 ===> {}", receiveStr); idleCount = 1; - port=receiveStr.substring(4,8);//心跳包包含网关端口(自己定义返回心跳包) + port = receiveStr.substring(4, 8);//心跳包包含网关端口(自己定义返回心跳包) + port = "6001"; // 清空receiveStr receiveStr = ""; // 更新对应的网关在线情况 gatewayManageService.updateGatewayManage2(port); //根据端口或者IP或者心跳包查询网关对应的项目名称 - String projectName=gatewayManageService.selectProjectName(port); + String projectName = gatewayManageService.selectProjectName(port); log.info("---------------------{}项目网关上线---------------------", projectName); // 生成采集指令 deviceCodeParamList = deviceCodeParamService.queryCodeParam(port); //心跳包包含网关端口(自己定义返回心跳包) @@ -134,172 +130,111 @@ public class EchoServerHandler extends ChannelInboundHandlerAdapter { log.info("deviceCodeParam size ===> {}", size); num = 0; // 发送采集报文 - if (size>0) { - if (idleCount<2){ + if (size > 0) { + if (idleCount < 2) { Thread.sleep(200); - SendOrderUtils.sendAllOrder(deviceCodeParamList.get(num),ctx,num,size); + SendOrderUtils.sendAllOrder(deviceCodeParamList.get(num), ctx, num, size); idleCount++; - }else{ + } else { ctx.channel().close(); } - }else{ - log.info("gateway not find deviceCodeParam!" ); + } else { + log.info("gateway not find deviceCodeParam!"); } - } else if (receiveStr.length() == 36 || receiveStr.length() == 40 || receiveStr.length() == 44 || receiveStr.length() == 50) { + } else if (receiveStr.length() == 34 || receiveStr.length() == 36 || receiveStr.length() == 40 || receiveStr.length() == 44 || receiveStr.length() == 50) { //电表返回数据解析 - idleCount=1; + idleCount = 1; log.info("电表接收===> {},长度:{}", receiveStr, receiveStr.length()); //解析采集的报文,并保存到数据库 - AnalysisReceiveOrder485 analysisReceiveOrder485 = new AnalysisReceiveOrder485(); - analysisReceiveOrder485.analysisMeterOrder485(receiveStr,deviceCodeParamList.get(num)); - //清空receiveStr - receiveStr = ""; - //判断发送的下标,如果不等于指令数组大小 - num = num + 1; - if (num > size - 1) { - num = 0; - Thread.sleep(200); - // 继续发送下一个采集指令 - SendOrderUtils.sendAllOrder(deviceCodeParamList.get(num),ctx,num,size); - log.info("------一轮采集完成,继续下一轮--------"); - } else { - // 添加一个状态值,判断是否继续发送指令 update by ljf on 2020-08-07 - if (Constant.WEB_FLAG) { - num = 0; - // 关闭连接 - receiveStr = null; - ctx.close(); - } else { - Thread.sleep(200); - // 继续发送下一个采集指令 - SendOrderUtils.sendAllOrder(deviceCodeParamList.get(num),ctx,num,size); - } - } + nextSendOrder(ctx); } else if (receiveStr.length() == 18) { //冷量计返回数据解析 - idleCount=1; + idleCount = 1; log.info("冷量计接收==>{},长度:{}", receiveStr, receiveStr.length()); - // 解析采集的报文,并保存到数据库 - AnalysisReceiveOrder485 analysisReceiveOrder485 = new AnalysisReceiveOrder485(); - analysisReceiveOrder485.analysisCloudOrder485(receiveStr,deviceCodeParamList.get(num) ); - // 清空receiveStr - receiveStr = ""; - // 判断发送的下标,如果不等于指令数组大小 - num = num + 1; - if (num > size - 1) { - num = 0; - Thread.sleep(200); - // 继续发送下一个采集指令 - SendOrderUtils.sendAllOrder(deviceCodeParamList.get(num),ctx,num,size); - log.info("------一轮采集完成,继续下一轮--------"); - } else { - // 添加一个状态值,判断是否继续发送指令 update by ljf on 2020-08-07 - if (Constant.WEB_FLAG) { - num = 0; - // 关闭连接 - receiveStr = null; - ctx.close(); - } else { - Thread.sleep(200); - // 继续发送下一个采集指令 - SendOrderUtils.sendAllOrder(deviceCodeParamList.get(num),ctx,num,size); - } - } - }else if (receiveStr.length() == 12 || receiveStr.length() == 14) { + nextSendOrder(ctx); + } else if (receiveStr.length() == 12 || receiveStr.length() == 14) { //冷水机返回数据解析 - idleCount=1; + idleCount = 1; log.info("冷水机接收===>{},长度:{}", receiveStr, receiveStr.length()); - // 解析采集的报文,并保存到数据库 - AnalysisReceiveOrder485 analysisReceiveOrder485 = new AnalysisReceiveOrder485(); - analysisReceiveOrder485.analysisChillerOrder485(receiveStr,deviceCodeParamList.get(num)); - // 清空receiveStr - receiveStr = ""; - // 判断发送的下标,如果不等于指令数组大小 - num = num + 1; - if (num > size - 1) { - num = 0; - Thread.sleep(200); - // 继续发送下一个采集指令 - SendOrderUtils.sendAllOrder(deviceCodeParamList.get(num),ctx,num,size); - log.info("------一轮采集完成,继续下一轮--------"); - } else { - // 添加一个状态值,判断是否继续发送指令 update by ljf on 2020-08-07 - if (Constant.WEB_FLAG) { - num = 0; - // 关闭连接 - receiveStr = null; - ctx.close(); - } else { - Thread.sleep(200); - // 继续发送下一个采集指令 - SendOrderUtils.sendAllOrder(deviceCodeParamList.get(num),ctx,num,size); - } - } + nextSendOrder(ctx); } else if (receiveStr.length() > 50 && receiveStr.length() < 100) { - idleCount=1; + idleCount = 1; // 清空receiveStr - receiveStr = null; - // 判断发送的下标,如果不等于指令数组大小 - num = num + 1; - if (num > size - 1) { - num = 0; - Thread.sleep(200); - // 继续发送下一个采集指令 - SendOrderUtils.sendAllOrder(deviceCodeParamList.get(num),ctx,num,size); - log.info("------一轮采集完成,继续下一轮--------"); - } else { - // 添加一个状态值,判断是否继续发送指令 update by ljf on 2020-08-07 - if (Constant.WEB_FLAG) { - num = 0; - // 关闭连接 - receiveStr = null; - ctx.close(); - } else { - Thread.sleep(200); - // 继续发送下一个采集指令 - SendOrderUtils.sendAllOrder(deviceCodeParamList.get(num),ctx,num,size); - } - } + nextSendOrder(ctx); } // else if (receiveStr.length() >= 100 ){ // whiteGateway(ctx); // } - else { - receiveStr = null; - } +// else { +// receiveStr = null; +// } ctx.flush(); } + private void nextSendOrder(ChannelHandlerContext ctx) throws InterruptedException { + // 解析采集的报文,并保存到数据库 + analysisReceiveData(receiveStr, deviceCodeParamList.get(num)); + // 清空receiveStr + receiveStr = ""; + // 判断发送的下标,如果不等于指令数组大小 + num = num + 1; + if (num > size - 1) { + num = 0; + Thread.sleep(200); + // 继续发送下一个采集指令 + SendOrderUtils.sendAllOrder(deviceCodeParamList.get(num), ctx, num, size); + log.info("------一轮采集完成,继续下一轮--------"); + } else { + // 添加一个状态值,判断是否继续发送指令 update by ljf on 2020-08-07 + if (Constant.WEB_FLAG) { + num = 0; + // 关闭连接 + receiveStr = null; + ctx.close(); + } else { + Thread.sleep(200); + // 继续发送下一个采集指令 + SendOrderUtils.sendAllOrder(deviceCodeParamList.get(num), ctx, num, size); + } + } + } + + private void analysisReceiveData(String receiveStr, DeviceCodeParamEntity deviceCodeParamEntity) { + // 解析采集的报文,并保存到数据库 + AnalysisReceiveOrder485 analysisReceiveOrder485 = new AnalysisReceiveOrder485(); + analysisReceiveOrder485.analysisReceiveOrder485(receiveStr, deviceCodeParamEntity); + } + private void whiteGateway(ChannelHandlerContext ctx) throws InterruptedException { - if (receiveStr.substring(0,2).equalsIgnoreCase("2b") && receiveStr.substring(6,8).equalsIgnoreCase("7b")){ - receiveStr=receiveStr.substring(6); + if (receiveStr.substring(0, 2).equalsIgnoreCase("2b") && receiveStr.substring(6, 8).equalsIgnoreCase("7b")) { + receiveStr = receiveStr.substring(6); } - receiveStr=ExchangeStringUtil.hexStringToString(receiveStr) ; + receiveStr = ExchangeStringUtil.hexStringToString(receiveStr); log.info("白色网关接收===> " + receiveStr); JSONObject jsonObject = JSONObject.parseObject(receiveStr); - receiveStr=""; - port=jsonObject.getString("snr"); //网关ID,从心跳包中获得 - IP=jsonObject.getString("ip"); //ip - String cmd=jsonObject.getString("cmd"); //指令模式dHeartbeat(心跳包),data(主动采集返回),reword - String name=jsonObject.getString("name"); + receiveStr = ""; + port = jsonObject.getString("snr"); //网关ID,从心跳包中获得 + IP = jsonObject.getString("ip"); //ip + String cmd = jsonObject.getString("cmd"); //指令模式dHeartbeat(心跳包),data(主动采集返回),reword + String name = jsonObject.getString("name"); SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Date date = new Date(); String time = sdf1.format(date); - if(cmd.equals("dHeartbeat")){ - JSONObject jsonHeart=new JSONObject(); - jsonHeart.put("snr",port); - jsonHeart.put("cmd","uHeartbeat"); - jsonHeart.put("recordCheckTime","30"); - jsonHeart.put("keepAliveTime","50"); //通讯保持在线间隔,秒80 - jsonHeart.put("resetTime","23:59:59"); - jsonHeart.put("ip",IP); - jsonHeart.put("time",time); - jsonHeart.put("name",name); - jsonHeart.put("heartInterval","20");//网关发起心跳包的时间间隔,秒70 - jsonHeart.put("recordMode","cover"); - String sendStr=jsonHeart.toString(); + if (cmd.equals("dHeartbeat")) { + JSONObject jsonHeart = new JSONObject(); + jsonHeart.put("snr", port); + jsonHeart.put("cmd", "uHeartbeat"); + jsonHeart.put("recordCheckTime", "30"); + jsonHeart.put("keepAliveTime", "50"); //通讯保持在线间隔,秒80 + jsonHeart.put("resetTime", "23:59:59"); + jsonHeart.put("ip", IP); + jsonHeart.put("time", time); + jsonHeart.put("name", name); + jsonHeart.put("heartInterval", "20");//网关发起心跳包的时间间隔,秒70 + jsonHeart.put("recordMode", "cover"); + String sendStr = jsonHeart.toString(); log.info("白色网关回复收到心跳包===>" + sendStr); - sendStr=ExchangeStringUtil.strTo16(sendStr); + sendStr = ExchangeStringUtil.strTo16(sendStr); ByteBuf buffer = ExchangeStringUtil.getByteBuf(ctx, sendStr); Thread.sleep(200); ctx.channel().writeAndFlush(buffer); //发送数据 @@ -311,38 +246,38 @@ public class EchoServerHandler extends ChannelInboundHandlerAdapter { log.info("白色网关接收长度===> " + size); num = 0; // 发送采集报文 - if (size>0) { - if (idleCount<2){ + if (size > 0) { + if (idleCount < 2) { Thread.sleep(200); - SendOrderUtils.sendAllOrder2(deviceCodeParamList.get(num), ctx,port,IP,num,size); + SendOrderUtils.sendAllOrder2(deviceCodeParamList.get(num), ctx, port, IP, num, size); idleCount++; - }else{ + } else { log.info("close this channel!"); ctx.channel().close(); } - }else{ - log.info("white gateway not find deviceCodeParam!" ); + } else { + log.info("white gateway not find deviceCodeParam!"); } - }else{ - idleCount=1; - String data=jsonObject.getString("data"); - String strHex=ExchangeStringUtil.base64ToHex(data); + } else { + idleCount = 1; + String data = jsonObject.getString("data"); + String strHex = ExchangeStringUtil.base64ToHex(data); //返回值全部变成大写 - strHex= strHex.toUpperCase(); + strHex = strHex.toUpperCase(); //截取去掉FE String dataStr; - if (strHex.length()>8){ - String str1=strHex.substring(0,8); - String str2=strHex.substring(8); - dataStr=str1.replace("FE", "")+str2; - }else{ + if (strHex.length() > 8) { + String str1 = strHex.substring(0, 8); + String str2 = strHex.substring(8); + dataStr = str1.replace("FE", "") + str2; + } else { dataStr = strHex.replace("FE", ""); } - if (dataStr.length() == 36 || dataStr.length() == 40 || dataStr.length() == 44 || dataStr.length() == 50){ + if (dataStr.length() == 36 || dataStr.length() == 40 || dataStr.length() == 44 || dataStr.length() == 50) { log.info("白色网关电表接收===>" + dataStr); // 解析采集的报文,并保存到数据库 AnalysisReceiveOrder485 analysisReceiveOrder485 = new AnalysisReceiveOrder485(); - analysisReceiveOrder485.analysisMeterOrder485(dataStr,deviceCodeParamList.get(num)); //电表报文解析 + analysisReceiveOrder485.analysisMeterOrder485(dataStr, deviceCodeParamList.get(num)); //电表报文解析 // 清空dataStr // 判断发送的下标,如果不等于指令数组大小 num = num + 1; @@ -350,7 +285,7 @@ public class EchoServerHandler extends ChannelInboundHandlerAdapter { num = 0; Thread.sleep(200); // 继续发送下一个采集指令 - SendOrderUtils.sendAllOrder2(deviceCodeParamList.get(num), ctx,port,IP,num,size); + SendOrderUtils.sendAllOrder2(deviceCodeParamList.get(num), ctx, port, IP, num, size); log.info("------一轮采集完成,继续下一轮--------"); } else { // 添加一个状态值,判断是否继续发送指令 update by ljf on 2020-08-07 @@ -362,14 +297,14 @@ public class EchoServerHandler extends ChannelInboundHandlerAdapter { } else { Thread.sleep(200); // 继续发送下一个采集指令 - SendOrderUtils.sendAllOrder2(deviceCodeParamList.get(num), ctx,port,IP,num,size); + SendOrderUtils.sendAllOrder2(deviceCodeParamList.get(num), ctx, port, IP, num, size); } } - }else if(dataStr.length() == 12 || dataStr.length() == 14){ + } else if (dataStr.length() == 12 || dataStr.length() == 14) { log.info("白色网关冷水机接收===>" + dataStr); // 解析采集的报文,并保存到数据库 AnalysisReceiveOrder485 analysisReceiveOrder485 = new AnalysisReceiveOrder485(); - analysisReceiveOrder485.analysisChillerOrder485(dataStr,deviceCodeParamList.get(0)); + analysisReceiveOrder485.analysisChillerOrder485(dataStr, deviceCodeParamList.get(0)); // 清空dataStr dataStr = ""; // 判断发送的下标,如果不等于指令数组大小 @@ -378,7 +313,7 @@ public class EchoServerHandler extends ChannelInboundHandlerAdapter { num = 0; Thread.sleep(200); // 继续发送下一个采集指令 - SendOrderUtils.sendAllOrder2(deviceCodeParamList.get(num), ctx,port,IP,num,size); + SendOrderUtils.sendAllOrder2(deviceCodeParamList.get(num), ctx, port, IP, num, size); log.info("------一轮采集完成,继续下一轮--------"); } else { // 添加一个状态值,判断是否继续发送指令 update by ljf on 2020-08-07 @@ -391,15 +326,15 @@ public class EchoServerHandler extends ChannelInboundHandlerAdapter { } else { Thread.sleep(200); // 继续发送下一个采集指令 - SendOrderUtils.sendAllOrder2(deviceCodeParamList.get(num), ctx,port,IP,num,size); + SendOrderUtils.sendAllOrder2(deviceCodeParamList.get(num), ctx, port, IP, num, size); } } - }else if(dataStr.length() == 18){ + } else if (dataStr.length() == 18) { // log.info("white gateway cloud receive message ===> " + dataStr); log.info("白色网关冷量计接收===> " + dataStr); // 解析采集的报文,并保存到数据库 AnalysisReceiveOrder485 analysisReceiveOrder485 = new AnalysisReceiveOrder485(); //冷量机报文解析 - analysisReceiveOrder485.analysisCloudOrder485(dataStr,deviceCodeParamList.get(num) ); + analysisReceiveOrder485.analysisCloudOrder485(dataStr, deviceCodeParamList.get(num)); // 清空dataStr dataStr = ""; // 判断发送的下标,如果不等于指令数组大小 @@ -408,7 +343,7 @@ public class EchoServerHandler extends ChannelInboundHandlerAdapter { num = 0; Thread.sleep(200); // 继续发送下一个采集指令 - SendOrderUtils.sendAllOrder2(deviceCodeParamList.get(num), ctx,port,IP,num,size); + SendOrderUtils.sendAllOrder2(deviceCodeParamList.get(num), ctx, port, IP, num, size); log.info("------一轮采集完成,继续下一轮--------"); } else { // 添加一个状态值,判断是否继续发送指令 update by ljf on 2020-08-07 @@ -421,10 +356,10 @@ public class EchoServerHandler extends ChannelInboundHandlerAdapter { } else { Thread.sleep(200); // 继续发送下一个采集指令 - SendOrderUtils.sendAllOrder2(deviceCodeParamList.get(num), ctx,port,IP,num,size); + SendOrderUtils.sendAllOrder2(deviceCodeParamList.get(num), ctx, port, IP, num, size); } } - }else { //if(dataStr.length() > 50) + } else { //if(dataStr.length() > 50) // 清空dataStr dataStr = null; // 判断发送的下标,如果不等于指令数组大小 @@ -433,7 +368,7 @@ public class EchoServerHandler extends ChannelInboundHandlerAdapter { num = 0; Thread.sleep(200); // 继续发送下一个采集指令 - SendOrderUtils.sendAllOrder2(deviceCodeParamList.get(num), ctx,port,IP,num,size); + SendOrderUtils.sendAllOrder2(deviceCodeParamList.get(num), ctx, port, IP, num, size); log.info("------一轮采集完成,继续下一轮--------"); } else { // 添加一个状态值,判断是否继续发送指令 update by ljf on 2020-08-07 @@ -445,7 +380,7 @@ public class EchoServerHandler extends ChannelInboundHandlerAdapter { } else { Thread.sleep(200); // 继续发送下一个采集指令 - SendOrderUtils.sendAllOrder2(deviceCodeParamList.get(num), ctx,port,IP,num,size); + SendOrderUtils.sendAllOrder2(deviceCodeParamList.get(num), ctx, port, IP, num, size); } } } diff --git a/user-service/src/main/java/com/mh/user/service/MeterManageService.java b/user-service/src/main/java/com/mh/user/service/MeterManageService.java index 4b62262..08ab1e9 100644 --- a/user-service/src/main/java/com/mh/user/service/MeterManageService.java +++ b/user-service/src/main/java/com/mh/user/service/MeterManageService.java @@ -14,4 +14,6 @@ import java.util.List; public interface MeterManageService extends BaseService { List queryBySystemIdAndProjectId(String systemId, String projectId); + + void updateDataById(Long mmId, String analysisData); } diff --git a/user-service/src/main/java/com/mh/user/service/impl/MeterManageServiceImpl.java b/user-service/src/main/java/com/mh/user/service/impl/MeterManageServiceImpl.java index 7077ce6..083118a 100644 --- a/user-service/src/main/java/com/mh/user/service/impl/MeterManageServiceImpl.java +++ b/user-service/src/main/java/com/mh/user/service/impl/MeterManageServiceImpl.java @@ -1,6 +1,7 @@ package com.mh.user.service.impl; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; +import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper; import com.github.pagehelper.PageHelper; import com.github.pagehelper.PageInfo; import com.mh.common.page.MybatisPageHelper; @@ -15,6 +16,7 @@ import com.mh.user.utils.ExchangeStringUtil; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import java.math.BigDecimal; import java.util.Collections; import java.util.Date; import java.util.List; @@ -45,6 +47,15 @@ public class MeterManageServiceImpl implements MeterManageService { return meterManageMapper.selectList(queryWrapper.orderByDesc("create_time")); } + @Override + public void updateDataById(Long mmId, String analysisData) { + UpdateWrapper updateWrapper = new UpdateWrapper<>(); + updateWrapper.eq("id", mmId); + updateWrapper.set("cur_value", analysisData); + updateWrapper.set("cur_time", new Date()); + meterManageMapper.update(updateWrapper); + } + @Override public PageResult queryByPage(PageRequest pageRequest) { String systemID = StringUtils.getColumnFilterValue(pageRequest, "systemId"); diff --git a/user-service/src/main/java/com/mh/user/strategy/CJ188ProtocolStrategy.java b/user-service/src/main/java/com/mh/user/strategy/CJ188ProtocolStrategy.java index f246b01..f43c649 100644 --- a/user-service/src/main/java/com/mh/user/strategy/CJ188ProtocolStrategy.java +++ b/user-service/src/main/java/com/mh/user/strategy/CJ188ProtocolStrategy.java @@ -2,6 +2,7 @@ package com.mh.user.strategy; import com.mh.common.utils.StringUtils; import com.mh.user.constants.Constant; +import com.mh.user.entity.DeviceCodeParamEntity; import com.mh.user.entity.MeterManageEntity; import com.mh.user.factory.Protocol; import com.mh.user.utils.ExchangeStringUtil; @@ -52,7 +53,7 @@ public class CJ188ProtocolStrategy implements ProtocolStrategy { } @Override - public String analysisReceiveData(MeterManageEntity meterManageEntity, String receiveData) { + public String analysisReceiveData(DeviceCodeParamEntity deviceCodeParamEntity, String receiveData) { log.info("水表标准协议:策略解析报文"); String meterId = ""; String data = ""; diff --git a/user-service/src/main/java/com/mh/user/strategy/EleProtocolStrategy.java b/user-service/src/main/java/com/mh/user/strategy/EleProtocolStrategy.java index 518dc5c..6d2e22b 100644 --- a/user-service/src/main/java/com/mh/user/strategy/EleProtocolStrategy.java +++ b/user-service/src/main/java/com/mh/user/strategy/EleProtocolStrategy.java @@ -2,10 +2,27 @@ package com.mh.user.strategy; import com.mh.common.utils.StringUtils; import com.mh.user.constants.Constant; +import com.mh.user.entity.DataResultEntity; +import com.mh.user.entity.DeviceCodeParamEntity; import com.mh.user.entity.MeterManageEntity; -import com.mh.user.factory.Protocol; +import com.mh.user.service.DataResultService; +import com.mh.user.service.DeviceCodeParamService; +import com.mh.user.service.MeterManageService; +import com.mh.user.service.ProjectInfoService; +import com.mh.user.service.chillers.ChillersService; +import com.mh.user.service.chillers.OrderMessageService; import com.mh.user.utils.ExchangeStringUtil; +import com.mh.user.utils.SpringBeanUtil; +import com.mh.user.utils.ThreadPoolService; import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.context.ApplicationContext; + +import java.math.BigDecimal; +import java.text.DecimalFormat; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.concurrent.ThreadPoolExecutor; /** * @author LJF @@ -17,15 +34,23 @@ import lombok.extern.slf4j.Slf4j; @Slf4j public class EleProtocolStrategy implements ProtocolStrategy { - private static class SingletonHolder{ + // 调用service + ApplicationContext context = SpringBeanUtil.getApplicationContext(); + ThreadPoolExecutor threadPoolService = ThreadPoolService.getInstance(); + DataResultService dataResultService = context.getBean(DataResultService.class); + ProjectInfoService projectInfoService = context.getBean(ProjectInfoService.class); + + private static final SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + + private static class SingletonHolder { private static final EleProtocolStrategy INSTANCE = new EleProtocolStrategy(); } - private EleProtocolStrategy(){ + private EleProtocolStrategy() { // 防止外部直接实例化 } - public static EleProtocolStrategy getInstance(){ + public static EleProtocolStrategy getInstance() { return SingletonHolder.INSTANCE; } @@ -68,20 +93,18 @@ public class EleProtocolStrategy implements ProtocolStrategy { return Constant.FAIL; } } - log.info("生成采集电表指令==>表号:{},指令:{}", meterManageEntity.getMtCode(),str); + log.info("生成采集电表指令==>表号:{},指令:{}", meterManageEntity.getMtCode(), str); return str.toUpperCase(); } @Override - public String analysisReceiveData(MeterManageEntity meterManageEntity, String receiveData) { + public String analysisReceiveData(DeviceCodeParamEntity deviceCodeParamEntity, String receiveData) { log.info("电表97/07规约协议:工厂解析报文"); String data = ""; - if (receiveData.length() > 8) { + if (receiveData.length() == 34 || receiveData.length() == 36 || receiveData.length() == 40 || receiveData.length() == 44 || receiveData.length() == 50) { String str1 = receiveData.substring(0, 8); String str2 = receiveData.substring(8); receiveData = str1.replace("FE", "") + str2; - } - if (receiveData.length() == 36 || receiveData.length() == 40 || receiveData.length() == 44 || receiveData.length() == 50) { String checkStr = receiveData.substring(0, receiveData.length() - 4); //减去校验码 String checkNum = ExchangeStringUtil.makeChecksum(checkStr); //生成校验码 //返回的校验码与重新生成的校验码进行校验 @@ -90,18 +113,7 @@ public class EleProtocolStrategy implements ProtocolStrategy { String meterId = checkStr.substring(12, 14) + checkStr.substring(10, 12) + checkStr.substring(8, 10) + checkStr.substring(6, 8) + checkStr.substring(4, 6) + checkStr.substring(2, 4); meterId = String.format("%012d", Long.parseLong(meterId)); - StringBuilder stringBuilder = new StringBuilder(); - if (receiveData.length() == 36) { - for (int i = 0; i < 4; i++) { - String data1 = checkStr.substring(32 - 2 * (i + 1), 32 - 2 * i); - stringBuilder.append(data1); - } - } else { - for (int i = 0; i < 4; i++) { - String data1 = checkStr.substring(36 - 2 * (i + 1), 36 - 2 * i); - stringBuilder.append(data1); - } - } + StringBuilder stringBuilder = getStringBuilder(receiveData, checkStr); data = stringBuilder.toString(); data = ExchangeStringUtil.cutThree(data); // 0 代表前面补充0,4 代表长度为4,d 代表参数为正数型 @@ -115,7 +127,52 @@ public class EleProtocolStrategy implements ProtocolStrategy { if (!StringUtils.isBlank(data)) { data = String.valueOf(Double.valueOf(data)); //00010.76,去除读数前面带0的情况 } + // 解析入库 + analysisMeterOrder485(data, deviceCodeParamEntity); return data; } + private static StringBuilder getStringBuilder(String receiveData, String checkStr) { + StringBuilder stringBuilder = new StringBuilder(); + if (receiveData.length() == 36 || receiveData.length() == 34) { + for (int i = 0; i < 4; i++) { + String data1 = checkStr.substring(32 - 2 * (i + 1), 32 - 2 * i); + stringBuilder.append(data1); + } + } else { + for (int i = 0; i < 4; i++) { + String data1 = checkStr.substring(36 - 2 * (i + 1), 36 - 2 * i); + stringBuilder.append(data1); + } + } + return stringBuilder; + } + + /** + * 解析电表返回的数据 + * + * @param dataStr + */ + public void analysisMeterOrder485(final String dataStr, final DeviceCodeParamEntity deviceCodeParam) { + threadPoolService.execute(() -> { + Date date = new Date(); + String dateStr = sdf1.format(date); + try { + DataResultEntity dataResultEntity = new DataResultEntity(); + dataResultEntity.setDeviceAddr(deviceCodeParam.getDeviceAddr()); //通讯编号 + dataResultEntity.setDeviceType("电表"); //类型 + dataResultEntity.setProjectId(deviceCodeParam.getProjectId()); //所属项目 + dataResultEntity.setCurValue(new BigDecimal(dataStr)); //当前读数 + dataResultEntity.setCurDate(sdf1.parse(dateStr)); //当前日期 + dataResultEntity.setGrade(deviceCodeParam.getGrade()); + String projectName = projectInfoService.selectName(deviceCodeParam.getProjectId()); + log.info("电表==>{},读数==>{},项目名称==>{}", deviceCodeParam.getDeviceAddr(), dataStr, projectName); + dataResultService.saveDataResult(dataResultEntity); + log.info("电表保存数据成功!项目名称:{}", projectName); + } catch (Exception e) { + log.error("保存电表数据失败!", e); + } + }); + } + } diff --git a/user-service/src/main/java/com/mh/user/strategy/ModbusProtocolStrategy.java b/user-service/src/main/java/com/mh/user/strategy/ModbusProtocolStrategy.java index ecb5c2b..70fe2f5 100644 --- a/user-service/src/main/java/com/mh/user/strategy/ModbusProtocolStrategy.java +++ b/user-service/src/main/java/com/mh/user/strategy/ModbusProtocolStrategy.java @@ -2,12 +2,24 @@ package com.mh.user.strategy; import com.mh.common.utils.StringUtils; import com.mh.user.constants.Constant; +import com.mh.user.entity.DataResultChEntity; +import com.mh.user.entity.DataResultClEntity; +import com.mh.user.entity.DeviceCodeParamEntity; import com.mh.user.entity.MeterManageEntity; +import com.mh.user.service.DataResultService; +import com.mh.user.service.ProjectInfoService; import com.mh.user.utils.ExchangeStringUtil; +import com.mh.user.utils.SpringBeanUtil; +import com.mh.user.utils.ThreadPoolService; import lombok.extern.slf4j.Slf4j; +import org.springframework.context.ApplicationContext; import java.math.BigDecimal; import java.math.RoundingMode; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.concurrent.ThreadPoolExecutor; /** * @author LJF @@ -19,6 +31,14 @@ import java.math.RoundingMode; @Slf4j public class ModbusProtocolStrategy implements ProtocolStrategy { + // 调用service + ApplicationContext context = SpringBeanUtil.getApplicationContext(); + DataResultService dataResultService = context.getBean(DataResultService.class); + ProjectInfoService projectInfoService = context.getBean(ProjectInfoService.class); + ThreadPoolExecutor threadPoolService = ThreadPoolService.getInstance(); + + private static final SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + private static class SingletonHolder { private static final ModbusProtocolStrategy INSTANCE = new ModbusProtocolStrategy(); } @@ -55,11 +75,11 @@ public class ModbusProtocolStrategy implements ProtocolStrategy { } @Override - public String analysisReceiveData(MeterManageEntity meterManageEntity, String receiveData) { + public String analysisReceiveData(DeviceCodeParamEntity deviceCodeParamEntity, String receiveData) { log.info("modbus标准协议:策略解析报文"); String checkStr = receiveData.substring(0, receiveData.length() - 4);//检验报文 String checkWord = ExchangeStringUtil.getStrCRC16(checkStr);//生成校验码 - String sValue = null; + String sValue = "0"; String rtData = Constant.FAIL; if (!checkWord.equalsIgnoreCase(receiveData.substring(receiveData.length() - 4))) { log.info("Modbus报文检验失败: {}", receiveData); @@ -67,27 +87,140 @@ public class ModbusProtocolStrategy implements ProtocolStrategy { } // 开始解析: 地址+功能码+数据长度+数据域 // 截取数据长度 - String dataLength = receiveData.substring(6, 8); + String dataLength = receiveData.substring(4, 6); int dataLengthInt = Integer.parseInt(dataLength, 16); // 截取数据域 - String data = receiveData.substring(8, 8 + dataLengthInt * 2); + String data = receiveData.substring(6, 6 + dataLengthInt * 2); // 判断 - switch (meterManageEntity.getDataType()) { + switch (deviceCodeParamEntity.getDataType()) { case 0: + case 1: // 16进制转十进制类型 sValue = ExchangeStringUtil.hexToDec(data); // 保留位数 - sValue = (new BigDecimal(sValue)).divide(new BigDecimal(String.valueOf(meterManageEntity.getDigits() * 10)), 2, RoundingMode.HALF_UP).toString(); + sValue = (new BigDecimal(sValue)).divide(new BigDecimal(String.valueOf(deviceCodeParamEntity.getDigit() * 10)), 2, RoundingMode.HALF_UP).toString(); break; - case 1: + case 2: // 十六进制字符串转IEEE754浮点型 sValue = String.valueOf(ExchangeStringUtil.hexToSingle(data)); break; default: break; } - log.info("解析数据==>表号:{},寄存器地址:{},值:{}", meterManageEntity.getMtNum(), meterManageEntity.getRegisterAddr(), sValue); + log.info("解析数据==>表号:{},寄存器地址:{},值:{}", deviceCodeParamEntity.getDeviceAddr(), deviceCodeParamEntity.getRegisterAddr(), sValue); + // 入库数据 + // 冷量表 + if ("2".equals(deviceCodeParamEntity.getDeviceType())) { + analysisCloudOrder485(sValue, deviceCodeParamEntity); + } else if ("0".equals(deviceCodeParamEntity.getDeviceType())) { + analysisChillerOrder485(sValue, deviceCodeParamEntity); + } return sValue; } + /** + * 解析冷水机组返回的数据 + */ + public void analysisChillerOrder485(final String data, final DeviceCodeParamEntity deviceCodeParam) { + if (!Constant.CONTROL_WEB_FLAG) { + Date date = new Date(); + // 冷水机组的地址 + String chillerAddr = deviceCodeParam.getDeviceAddr(); + DataResultChEntity dataResultCh = new DataResultChEntity(); + // 状态 + try { + // 赋值给dataResultCh + initialDataResultCh(deviceCodeParam, dataResultCh, chillerAddr, date, data); + } catch (Exception e) { + log.error("冷水机报错:", e); + } + } + } + +/** + * 格式化数据 + * + * @param deviceCodeParam + * @param dataResultCh + * @param chillerAddr + * @param date + * @param data + * @throws ParseException + */ +public void initialDataResultCh(DeviceCodeParamEntity deviceCodeParam, DataResultChEntity dataResultCh, String chillerAddr, Date date, String data) throws ParseException { + dataResultCh.setDeviceAddr(chillerAddr); + dataResultCh.setDeviceType(deviceCodeParam.getDeviceType()); + dataResultCh.setCurDate(date); + dataResultCh.setCurValue(data); + dataResultCh.setRegisterAddr(deviceCodeParam.getRegisterAddr()); + dataResultCh.setRegisterName(deviceCodeParam.getRegisterName()); + dataResultCh.setGrade(deviceCodeParam.getGrade()); + dataResultCh.setFunCode(deviceCodeParam.getFunCode()); + dataResultCh.setProjectId(deviceCodeParam.getProjectId()); + String projectName = projectInfoService.selectName(deviceCodeParam.getProjectId()); + log.info("冷水机:" + chillerAddr + ",状态:" + data + ",项目名称:" + projectName); + dataResultService.saveDataResultChiller(dataResultCh); + dataResultService.deleteDataResultNow(deviceCodeParam.getDeviceAddr(), deviceCodeParam.getDeviceType(), deviceCodeParam.getRegisterAddr(), deviceCodeParam.getProjectId()); + log.info("冷水机保存成功!项目名称:" + projectName); +} + +//解析冷量表 +public void analysisCloudOrder485(final String data, final DeviceCodeParamEntity deviceCodeParam) { + threadPoolService.execute(() -> { + //创建SimpleDateFormat对象,指定样式 2019-05-13 22:39:30 + Date date = new Date(); + String dateStr = sdf1.format(date); + String cloudId = deviceCodeParam.getDeviceAddr(); + DataResultChEntity dataResultCh = new DataResultChEntity(); + DataResultClEntity dataResultCl = new DataResultClEntity(); + String registerAddr = deviceCodeParam.getRegisterAddr(); + if (ExchangeStringUtil.isInDate(date, "00:00:00", "00:00:30")) { + dateStr = dateStr.substring(0, 17) + "00"; + System.out.println("插入时间00" + dateStr); + } else if (ExchangeStringUtil.isInDate(date, "00:00:30", "00:00:59")) { + dateStr = dateStr.substring(0, 17) + "30"; + System.out.println("插入时间30" + dateStr); + } + try { + if (registerAddr.equals("0004") || registerAddr.equals("0014")) { + dataResultCh.setDeviceAddr(cloudId); + dataResultCh.setDeviceType(deviceCodeParam.getDeviceType()); + + dataResultCh.setCurDate(sdf1.parse(dateStr)); + dataResultCh.setCurValue(data); + dataResultCh.setRegisterAddr(deviceCodeParam.getRegisterAddr()); + dataResultCh.setRegisterName(deviceCodeParam.getRegisterName()); + dataResultCh.setGrade(deviceCodeParam.getGrade()); + dataResultCh.setProjectId(deviceCodeParam.getProjectId()); + dataResultCh.setGrade(deviceCodeParam.getGrade()); + String projectName = projectInfoService.selectName(deviceCodeParam.getProjectId()); + log.info("冷量计==>{},寄存器地址==>{},读数==>{},项目名称==>{}", cloudId, registerAddr, dataResultCh.getCurValue(), projectName); + dataResultService.saveDataResultCh(dataResultCh); + log.info("冷量计瞬时冷量/流量保存数据库成功!项目名称:{}", projectName); + } else if (registerAddr.equals("0050")) { + dataResultCl.setDeviceAddr(cloudId); + dataResultCl.setDeviceType(deviceCodeParam.getDeviceType()); + dataResultCh.setCurDate(sdf1.parse(dateStr)); + BigDecimal lData = new BigDecimal(data); + dataResultCl.setCurValue(lData);//字符串转整型 + dataResultCl.setRegisterAddr(deviceCodeParam.getRegisterAddr()); + dataResultCl.setRegisterName(deviceCodeParam.getRegisterName()); + dataResultCl.setGrade(deviceCodeParam.getGrade()); + dataResultCl.setProjectId(deviceCodeParam.getProjectId()); + String projectName = projectInfoService.selectName(deviceCodeParam.getProjectId()); + log.info("冷量计==>{},寄存器地址==>{},累计读数==>{},项目名称==>{}", cloudId, registerAddr, lData, projectName); + if (lData.intValue() < 99999999) { + if (lData.intValue() != 2231365) { + dataResultService.saveDataResultCl(dataResultCl); + } + } + log.info("冷量计累计保存数据成功!项目名称:{}", projectName); + dataResultService.saveDataResultCl_bak(dataResultCl); + } + } catch (Exception e) { + log.error("保存冷量计数据失败!", e); + } + }); +} + } diff --git a/user-service/src/main/java/com/mh/user/strategy/ProtocolStrategy.java b/user-service/src/main/java/com/mh/user/strategy/ProtocolStrategy.java index a4392e7..ac6dde5 100644 --- a/user-service/src/main/java/com/mh/user/strategy/ProtocolStrategy.java +++ b/user-service/src/main/java/com/mh/user/strategy/ProtocolStrategy.java @@ -1,5 +1,6 @@ package com.mh.user.strategy; +import com.mh.user.entity.DeviceCodeParamEntity; import com.mh.user.entity.MeterManageEntity; /** @@ -20,9 +21,9 @@ public interface ProtocolStrategy { /** * 解析指令 - * @param meterManageEntity + * @param deviceCodeParamEntity * @param receiveData * @return */ - String analysisReceiveData(MeterManageEntity meterManageEntity, String receiveData); + String analysisReceiveData(DeviceCodeParamEntity deviceCodeParamEntity, String receiveData); } diff --git a/user-service/src/main/java/com/mh/user/utils/AnalysisReceiveOrder485.java b/user-service/src/main/java/com/mh/user/utils/AnalysisReceiveOrder485.java index 88cfa01..eaf7b8c 100644 --- a/user-service/src/main/java/com/mh/user/utils/AnalysisReceiveOrder485.java +++ b/user-service/src/main/java/com/mh/user/utils/AnalysisReceiveOrder485.java @@ -4,12 +4,17 @@ import com.alibaba.fastjson2.JSON; import com.mh.user.config.RabbitmqConfig; import com.mh.user.constants.Constant; import com.mh.user.entity.*; +import com.mh.user.factory.Protocol; +import com.mh.user.factory.ProtocolFactory; import com.mh.user.model.QueueParam; import com.mh.user.service.DataResultService; import com.mh.user.service.DeviceCodeParamService; +import com.mh.user.service.MeterManageService; import com.mh.user.service.ProjectInfoService; import com.mh.user.service.chillers.ChillersService; import com.mh.user.service.chillers.OrderMessageService; +import com.mh.user.strategy.ProtocolStrategy; +import com.mh.user.strategy.ProtocolStrategyFactory; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.ApplicationContext; @@ -40,10 +45,29 @@ public class AnalysisReceiveOrder485 { DeviceCodeParamService deviceCodeParamService = context.getBean(DeviceCodeParamService.class); RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class); ProjectInfoService projectInfoService = context.getBean(ProjectInfoService.class); + MeterManageService meterManageService = context.getBean(MeterManageService.class); private final SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); private final DecimalFormat df = new DecimalFormat("#.##"); + // 根据工厂和策略模式解析接收到的合法数据 + public void analysisReceiveOrder485(final String receiveData, final DeviceCodeParamEntity deviceCodeParam) { + // 创建设备报文 + String protocolType = String.valueOf(deviceCodeParam.getProtocolType()); + Protocol protocol = ProtocolFactory.matchProtocol(protocolType); + ProtocolStrategy strategy = ProtocolStrategyFactory.matchProtocolStrategy(protocolType); + + if (strategy == null) { + return; + } + + protocol.setStrategy(strategy); + String analysisData = protocol.analysisReceiveData(deviceCodeParam, receiveData); + log.info("解析后的数据===> {}", analysisData); + // 更新实时状态数据 + meterManageService.updateDataById(deviceCodeParam.getMmId(), analysisData); + } + //解析冷量表 public void analysisCloudOrder485(final String dataStr1, final DeviceCodeParamEntity deviceCodeParam) { threadPoolService.execute(() -> {