Browse Source

1、采用工厂+策略改造修改保存数据库逻辑

dev
mh 2 months ago
parent
commit
7b52dd62f1
  1. 7
      2024新增脚本.sql
  2. 10
      user-service/src/main/java/com/mh/user/entity/DeviceCodeParamEntity.java
  3. 5
      user-service/src/main/java/com/mh/user/factory/CJ188Protocol.java
  4. 5
      user-service/src/main/java/com/mh/user/factory/EleProtocol.java
  5. 5
      user-service/src/main/java/com/mh/user/factory/ModbusProtocol.java
  6. 5
      user-service/src/main/java/com/mh/user/factory/Protocol.java
  7. 4
      user-service/src/main/java/com/mh/user/job/CollectionLoopRunner.java
  8. 19
      user-service/src/main/java/com/mh/user/mapper/DeviceCodeParamMapper.java
  9. 18
      user-service/src/main/java/com/mh/user/mapper/chillers/GatewayManageMapper.java
  10. 269
      user-service/src/main/java/com/mh/user/netty/EchoServerHandler.java
  11. 2
      user-service/src/main/java/com/mh/user/service/MeterManageService.java
  12. 11
      user-service/src/main/java/com/mh/user/service/impl/MeterManageServiceImpl.java
  13. 3
      user-service/src/main/java/com/mh/user/strategy/CJ188ProtocolStrategy.java
  14. 99
      user-service/src/main/java/com/mh/user/strategy/EleProtocolStrategy.java
  15. 149
      user-service/src/main/java/com/mh/user/strategy/ModbusProtocolStrategy.java
  16. 5
      user-service/src/main/java/com/mh/user/strategy/ProtocolStrategy.java
  17. 24
      user-service/src/main/java/com/mh/user/utils/AnalysisReceiveOrder485.java

7
2024新增脚本.sql

@ -234,3 +234,10 @@ EXEC sys.sp_addextendedproperty 'MS_Description', N'寄存器大小', 'schema',
ALTER TABLE meter_manage ADD is_use bit DEFAULT 1 NULL; ALTER TABLE meter_manage ADD is_use bit DEFAULT 1 NULL;
EXEC sys.sp_addextendedproperty 'MS_Description', N'当前采集点位是否启用', 'schema', N'dbo', 'table', N'meter_manage', 'column', N'is_use'; EXEC sys.sp_addextendedproperty 'MS_Description', N'当前采集点位是否启用', 'schema', N'dbo', 'table', N'meter_manage', 'column', N'is_use';
-- 2024-09-23 冗余设备管理采集参数表数据
ALTER TABLE mh_jnd.dbo.device_code_param ADD mm_id bigint NULL;
EXEC mh_jnd.sys.sp_addextendedproperty 'MS_Description', N'采集设备参数表ID值', 'schema', N'dbo', 'table', N'device_code_param', 'column', N'mm_id';
ALTER TABLE mh_jnd.dbo.device_code_param ADD data_type int NULL;
EXEC mh_jnd.sys.sp_addextendedproperty 'MS_Description', N'数据类型', 'schema', N'dbo', 'table', N'device_code_param', 'column', N'data_type';
ALTER TABLE mh_jnd.dbo.device_code_param ADD protocol_type int NULL;
EXEC mh_jnd.sys.sp_addextendedproperty 'MS_Description', N'协议类型(数据字典表)', 'schema', N'dbo', 'table', N'device_code_param', 'column', N'protocol_type';

10
user-service/src/main/java/com/mh/user/entity/DeviceCodeParamEntity.java

@ -28,8 +28,11 @@ public class DeviceCodeParamEntity implements Cloneable {
private int digit; //保留小数位 private int digit; //保留小数位
private int grade; //级别 private int grade; //级别
private String dataValue; //传入值 private String dataValue; //传入值
private Date createTime; private Date createTime; // 创建时间
private String projectId; private String projectId; // 项目id
private Long mmId; // 设备管理ID
private int dataType; // 数据类型
private int protocolType; // 协议类型(数据字典)
/** /**
* 重置 * 重置
@ -51,6 +54,9 @@ public class DeviceCodeParamEntity implements Cloneable {
this.dataValue = null; this.dataValue = null;
this.createTime = null; this.createTime = null;
this.projectId = null; this.projectId = null;
this.mmId = null;
this.dataType = 16;
this.protocolType = 0;
} }
@Override @Override

5
user-service/src/main/java/com/mh/user/factory/CJ188Protocol.java

@ -1,5 +1,6 @@
package com.mh.user.factory; package com.mh.user.factory;
import com.mh.user.entity.DeviceCodeParamEntity;
import com.mh.user.entity.MeterManageEntity; import com.mh.user.entity.MeterManageEntity;
import com.mh.user.strategy.ProtocolStrategy; import com.mh.user.strategy.ProtocolStrategy;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -40,9 +41,9 @@ public class CJ188Protocol implements Protocol {
} }
@Override @Override
public String analysisReceiveData(MeterManageEntity meterManageEntity, String receiveData) { public String analysisReceiveData(DeviceCodeParamEntity deviceCodeParamEntity, String receiveData) {
log.info("水表标准协议:工厂解析报文"); log.info("水表标准协议:工厂解析报文");
return cj188ProtocolStrategy.analysisReceiveData(meterManageEntity, receiveData); return cj188ProtocolStrategy.analysisReceiveData(deviceCodeParamEntity, receiveData);
} }
} }

5
user-service/src/main/java/com/mh/user/factory/EleProtocol.java

@ -1,5 +1,6 @@
package com.mh.user.factory; package com.mh.user.factory;
import com.mh.user.entity.DeviceCodeParamEntity;
import com.mh.user.entity.MeterManageEntity; import com.mh.user.entity.MeterManageEntity;
import com.mh.user.strategy.ProtocolStrategy; import com.mh.user.strategy.ProtocolStrategy;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -40,9 +41,9 @@ public class EleProtocol implements Protocol {
} }
@Override @Override
public String analysisReceiveData(MeterManageEntity meterManageEntity, String receiveData) { public String analysisReceiveData(DeviceCodeParamEntity deviceCodeParamEntity, String receiveData) {
log.info("电表97/07规约协议:工厂解析报文"); log.info("电表97/07规约协议:工厂解析报文");
return eleProtocolStrategy.analysisReceiveData(meterManageEntity, receiveData); return eleProtocolStrategy.analysisReceiveData(deviceCodeParamEntity, receiveData);
} }
} }

5
user-service/src/main/java/com/mh/user/factory/ModbusProtocol.java

@ -1,5 +1,6 @@
package com.mh.user.factory; package com.mh.user.factory;
import com.mh.user.entity.DeviceCodeParamEntity;
import com.mh.user.entity.MeterManageEntity; import com.mh.user.entity.MeterManageEntity;
import com.mh.user.strategy.ProtocolStrategy; import com.mh.user.strategy.ProtocolStrategy;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -40,8 +41,8 @@ public class ModbusProtocol implements Protocol {
} }
@Override @Override
public String analysisReceiveData(MeterManageEntity meterManageEntity, String receiveData) { public String analysisReceiveData(DeviceCodeParamEntity deviceCodeParamEntity, String receiveData) {
log.info("modbus标准协议:工厂解析报文"); log.info("modbus标准协议:工厂解析报文");
return modbusProtocolStrategy.analysisReceiveData(meterManageEntity, receiveData); return modbusProtocolStrategy.analysisReceiveData(deviceCodeParamEntity, receiveData);
} }
} }

5
user-service/src/main/java/com/mh/user/factory/Protocol.java

@ -1,5 +1,6 @@
package com.mh.user.factory; package com.mh.user.factory;
import com.mh.user.entity.DeviceCodeParamEntity;
import com.mh.user.entity.MeterManageEntity; import com.mh.user.entity.MeterManageEntity;
import com.mh.user.strategy.ProtocolStrategy; import com.mh.user.strategy.ProtocolStrategy;
@ -27,10 +28,10 @@ public interface Protocol {
/** /**
* 解析指令 * 解析指令
* @param meterManageEntity * @param deviceCodeParamEntity
* @param receiveData * @param receiveData
* @return * @return
*/ */
String analysisReceiveData(MeterManageEntity meterManageEntity, String receiveData); String analysisReceiveData(DeviceCodeParamEntity deviceCodeParamEntity, String receiveData);
} }

4
user-service/src/main/java/com/mh/user/job/CollectionLoopRunner.java

@ -133,6 +133,10 @@ public class CollectionLoopRunner implements ApplicationRunner {
deviceCodeParamEntity.setProjectId(projectId); deviceCodeParamEntity.setProjectId(projectId);
deviceCodeParamEntity.setCreateTime(new Date()); deviceCodeParamEntity.setCreateTime(new Date());
deviceCodeParamEntity.setMmId(meterManageEntity.getId());
deviceCodeParamEntity.setDataType(meterManageEntity.getDataType());
deviceCodeParamEntity.setProtocolType(meterManageEntity.getProtocolType());
deviceCodeParamEntityList.add(deviceCodeParamEntity.clone()); deviceCodeParamEntityList.add(deviceCodeParamEntity.clone());
} }

19
user-service/src/main/java/com/mh/user/mapper/DeviceCodeParamMapper.java

@ -31,7 +31,10 @@ public interface DeviceCodeParamMapper extends BaseMapper<DeviceCodeParamEntity>
@Result(property ="createTime",column ="create_time"), @Result(property ="createTime",column ="create_time"),
@Result(property ="grade",column ="grade"), @Result(property ="grade",column ="grade"),
@Result(property ="digit",column ="digit"), @Result(property ="digit",column ="digit"),
@Result(property ="projectId",column ="project_id") @Result(property ="projectId",column ="project_id"),
@Result(property ="mmId",column ="mm_id"),
@Result(property ="dataType",column ="data_type"),
@Result(property ="protocolType",column ="protocol_type")
}) })
@Select("select id, " + @Select("select id, " +
"device_addr, " + "device_addr, " +
@ -47,7 +50,10 @@ public interface DeviceCodeParamMapper extends BaseMapper<DeviceCodeParamEntity>
"grade, " + "grade, " +
"project_id, " + "project_id, " +
"digit, " + "digit, " +
"register_name " + "register_name," +
"mm_id," +
"data_type," +
"protocol_type " +
" from device_code_param where data_port=#{gatewayPort} order by device_type ") " from device_code_param where data_port=#{gatewayPort} order by device_type ")
List<DeviceCodeParamEntity> queryCodeParam(@Param("gatewayPort") String gatewayPort); List<DeviceCodeParamEntity> queryCodeParam(@Param("gatewayPort") String gatewayPort);
@ -94,11 +100,16 @@ public interface DeviceCodeParamMapper extends BaseMapper<DeviceCodeParamEntity>
"parity," + "parity," +
"digit," + "digit," +
"grade," + "grade," +
"str_data" + "str_data," +
"mm_id," +
"data_type," +
"protocol_type" +
")" + ")" +
"values " + "values " +
"<foreach collection='deviceCodeParamEntityList' item='item' separator=','>" + "<foreach collection='deviceCodeParamEntityList' item='item' separator=','>" +
"(#{item.deviceAddr},#{item.deviceName},#{item.deviceType},#{item.dataPort},#{item.baudRate},#{item.brand},#{item.funCode},#{item.registerAddr},#{item.registerName},getDate(),#{item.projectId},#{item.parity},#{item.digit},#{item.grade},#{item.strData})" + "(#{item.deviceAddr},#{item.deviceName},#{item.deviceType},#{item.dataPort},#{item.baudRate}," +
"#{item.brand},#{item.funCode},#{item.registerAddr},#{item.registerName},getDate(),#{item.projectId}," +
"#{item.parity},#{item.digit},#{item.grade},#{item.strData},#{item.mmId},#{item.dataType},#{item.protocolType})" +
"</foreach>" + "</foreach>" +
"</script>") "</script>")
void insertDeviceCodeParamList(@Param("deviceCodeParamEntityList") List<DeviceCodeParamEntity> deviceCodeParamEntityList); void insertDeviceCodeParamList(@Param("deviceCodeParamEntityList") List<DeviceCodeParamEntity> deviceCodeParamEntityList);

18
user-service/src/main/java/com/mh/user/mapper/chillers/GatewayManageMapper.java

@ -29,21 +29,21 @@ public interface GatewayManageMapper extends BaseMapper<GatewayManageEntity> {
@Result(column = "gateway_address", property = "gatewayAddress"), @Result(column = "gateway_address", property = "gatewayAddress"),
@Result(column = "data_com", property = "dataCom"), @Result(column = "data_com", property = "dataCom"),
@Result(column = "create_date", property = "createDate"), @Result(column = "create_date", property = "createDate"),
@Result(column = "connect_date", property = "connectDate"), @Result(column = "connect_time", property = "connectDate"),
@Result(column = "internet_card", property = "internetCard"), @Result(column = "internet_card", property = "internetCard"),
@Result(column = "operator", property = "operator"), @Result(column = "operator", property = "operator"),
@Result(column = "gateway_port", property = "gatewayPort"), @Result(column = "port", property = "gatewayPort"),
@Result(column = "grade", property = "grade") @Result(column = "grade", property = "grade")
}) })
List<GatewayManageEntity> queryByOther(@Param("grade") Integer grade, @Param("operator") Integer operator); List<GatewayManageEntity> queryByOther(@Param("grade") Integer grade, @Param("operator") Integer operator);
// 查询全部信息 // 查询全部信息
@ResultMap("rs") @ResultMap("rs")
@Select("select id,gateway_name, gateway_ip, gateway_address, data_com, connect_date, internet_card, operator, gateway_port, grade from gateway_manage ") @Select("select id,gateway_name, gateway_ip, gateway_address, data_com, connect_time, internet_card, operator, port, grade from gateway_manage ")
List<GatewayManageEntity> queryAll(); List<GatewayManageEntity> queryAll();
// 添加网关设备 // 添加网关设备
@Insert("insert into gateway_manage(gateway_name, gateway_ip, gateway_address, data_com, connect_date, grade, internet_card, operator, gateway_port)" + @Insert("insert into gateway_manage(gateway_name, gateway_ip, gateway_address, data_com, connect_time, grade, internet_card, operator, port)" +
" values (#{gatewayName}, #{gatewayIP}, #{gatewayAddress}, #{dataCom}, #{connectDate}, #{grade}, #{internetCard}, #{operator}, #{gatewayPort})") " values (#{gatewayName}, #{gatewayIP}, #{gatewayAddress}, #{dataCom}, #{connectDate}, #{grade}, #{internetCard}, #{operator}, #{gatewayPort})")
void insertGatewayManage(GatewayManageEntity gatewayManageEntity); void insertGatewayManage(GatewayManageEntity gatewayManageEntity);
@ -52,7 +52,7 @@ public interface GatewayManageMapper extends BaseMapper<GatewayManageEntity> {
void deleteGatewayManageByID(@Param("gatewayID") int gatewayID); void deleteGatewayManageByID(@Param("gatewayID") int gatewayID);
// 根据网关设备ID查询网关对应信息 // 根据网关设备ID查询网关对应信息
@Select("select id, gateway_name, gateway_ip, gateway_address, data_com, collection_loop, connect_date, internet_card, operator, port, grade from gateway_manage where id = #{gatewayID}") @Select("select id, gateway_name, gateway_ip, gateway_address, data_com, collection_loop, connect_time, internet_card, operator, port, grade from gateway_manage where id = #{gatewayID}")
GatewayManageEntity queryGatewayByID(@Param("gatewayID") Long gatewayID); GatewayManageEntity queryGatewayByID(@Param("gatewayID") Long gatewayID);
// 查询对应的总数 // 查询对应的总数
@ -60,19 +60,19 @@ public interface GatewayManageMapper extends BaseMapper<GatewayManageEntity> {
int queryByOtherCount(@Param("page") int page, @Param("size") int size, @Param("gatewayID") int gatewayID); int queryByOtherCount(@Param("page") int page, @Param("size") int size, @Param("gatewayID") int gatewayID);
// 根据IP和端口号更新服务器在线时间 // 根据IP和端口号更新服务器在线时间
@Update("update gateway_manage set connect_date = getDate() where gateway_ip=#{IP} and convert(varchar(20),gateway_port) = #{port}") @Update("update gateway_manage set connect_time = getDate() where gateway_ip=#{IP} and convert(varchar(20),port) = #{port}")
void updateGatewayManage(@Param("IP") String IP,@Param("port") String port); void updateGatewayManage(@Param("IP") String IP,@Param("port") String port);
// 根据端口号更新服务器在线时间 // 根据端口号更新服务器在线时间
@Update("update gateway_manage set connect_date = getDate() where convert(varchar(20),gateway_port) = #{port}") @Update("update gateway_manage set connect_time = getDate() where convert(varchar(20),port) = #{port}")
void updateGatewayManage2(@Param("port") String port); void updateGatewayManage2(@Param("port") String port);
// 根据设备类型查询数据库,找出对应的详细信息 // 根据设备类型查询数据库,找出对应的详细信息
@Select("select id, gateway_name, gateway_ip, gateway_address, data_com, connect_date, internet_card, operator, gateway_port, grade from gateway_manage where grade = #{grade}") @Select("select id, gateway_name, gateway_ip, gateway_address, data_com, connect_time, internet_card, operator, port, grade from gateway_manage where grade = #{grade}")
GatewayManageEntity queryGatewayByGrade(@Param("grade") Long grade); GatewayManageEntity queryGatewayByGrade(@Param("grade") Long grade);
//根据端口或者IP或者心跳包查询网关对应的项目名称 //根据端口或者IP或者心跳包查询网关对应的项目名称
@Select("select project_Name from project_info a join gateway_manage b on a.id=b.project_id and b.gateway_port=#{str} ") @Select("select project_Name from project_info a join gateway_manage b on a.id=b.project_id and b.port=#{str} ")
String selectProjectName(@Param("str") String str); String selectProjectName(@Param("str") String str);

269
user-service/src/main/java/com/mh/user/netty/EchoServerHandler.java

@ -14,6 +14,7 @@ import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.ReferenceCountUtil; import io.netty.util.ReferenceCountUtil;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContext;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
@ -37,7 +38,8 @@ public class EchoServerHandler extends ChannelInboundHandlerAdapter {
private int size = 0; private int size = 0;
private String IP; private String IP;
private String port; private String port;
private String receiveStr=""; private String receiveStr = "";
/** /**
* 客户端连接会触发 * 客户端连接会触发
*/ */
@ -57,7 +59,7 @@ public class EchoServerHandler extends ChannelInboundHandlerAdapter {
IdleStateEvent event = (IdleStateEvent) obj; IdleStateEvent event = (IdleStateEvent) obj;
if (IdleState.READER_IDLE.equals(event.state())) { //如果读通道处于空闲状态,说明没有接收到心跳命令 if (IdleState.READER_IDLE.equals(event.state())) { //如果读通道处于空闲状态,说明没有接收到心跳命令
log.info("第{}已经10秒没有接收到客户端的信息了", idleCount); log.info("第{}已经10秒没有接收到客户端的信息了", idleCount);
receiveStr =""; receiveStr = "";
num = num + 1; num = num + 1;
if (num > size - 1) { if (num > size - 1) {
num = 0; num = 0;
@ -67,7 +69,7 @@ public class EchoServerHandler extends ChannelInboundHandlerAdapter {
// SendOrderUtils.sendAllOrder(deviceCodeParamList.get(num),ctx,num,size); // SendOrderUtils.sendAllOrder(deviceCodeParamList.get(num),ctx,num,size);
} else { } else {
// 继续发送下一个采集指令 // 继续发送下一个采集指令
SendOrderUtils.sendAllOrder(deviceCodeParamList.get(num),ctx,num,size); SendOrderUtils.sendAllOrder(deviceCodeParamList.get(num), ctx, num, size);
} }
} }
} else { } else {
@ -104,29 +106,23 @@ public class EchoServerHandler extends ChannelInboundHandlerAdapter {
// 当前批量读取中的最后一条消息 // 当前批量读取中的最后一条消息
@Override @Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception{ public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//心跳包报文: 24 00 60 95 //心跳包报文: 24 00 60 95
receiveStr= receiveStr.toUpperCase();//返回值全部变成大写 receiveStr = receiveStr.toUpperCase();//返回值全部变成大写
//截取去掉FE log.info("channelReadComplete接收到的数据{}, 长度: ===> {}", receiveStr, receiveStr.length());
if (receiveStr.length()>8){
String str1=receiveStr.substring(0,8);
String str2=receiveStr.substring(8);
receiveStr=str1.replace("FE", "")+str2;
}else{
receiveStr = receiveStr.replace("FE", "");
}
log.info("channelReadComplete接收到的数据长度: ===> {}", receiveStr.length());
//心跳包处理 //心跳包处理
if ((receiveStr.length() == 8) && receiveStr.startsWith("24")) { // if ((receiveStr.length() == 8) && receiveStr.startsWith("24")) {
if ((receiveStr.length() == 8) && receiveStr.startsWith("C0A801FE")) {
log.info("接收到心跳包 ===> {}", receiveStr); log.info("接收到心跳包 ===> {}", receiveStr);
idleCount = 1; idleCount = 1;
port=receiveStr.substring(4,8);//心跳包包含网关端口(自己定义返回心跳包) port = receiveStr.substring(4, 8);//心跳包包含网关端口(自己定义返回心跳包)
port = "6001";
// 清空receiveStr // 清空receiveStr
receiveStr = ""; receiveStr = "";
// 更新对应的网关在线情况 // 更新对应的网关在线情况
gatewayManageService.updateGatewayManage2(port); gatewayManageService.updateGatewayManage2(port);
//根据端口或者IP或者心跳包查询网关对应的项目名称 //根据端口或者IP或者心跳包查询网关对应的项目名称
String projectName=gatewayManageService.selectProjectName(port); String projectName = gatewayManageService.selectProjectName(port);
log.info("---------------------{}项目网关上线---------------------", projectName); log.info("---------------------{}项目网关上线---------------------", projectName);
// 生成采集指令 // 生成采集指令
deviceCodeParamList = deviceCodeParamService.queryCodeParam(port); //心跳包包含网关端口(自己定义返回心跳包) deviceCodeParamList = deviceCodeParamService.queryCodeParam(port); //心跳包包含网关端口(自己定义返回心跳包)
@ -134,84 +130,50 @@ public class EchoServerHandler extends ChannelInboundHandlerAdapter {
log.info("deviceCodeParam size ===> {}", size); log.info("deviceCodeParam size ===> {}", size);
num = 0; num = 0;
// 发送采集报文 // 发送采集报文
if (size>0) { if (size > 0) {
if (idleCount<2){ if (idleCount < 2) {
Thread.sleep(200); Thread.sleep(200);
SendOrderUtils.sendAllOrder(deviceCodeParamList.get(num),ctx,num,size); SendOrderUtils.sendAllOrder(deviceCodeParamList.get(num), ctx, num, size);
idleCount++; idleCount++;
}else{ } else {
ctx.channel().close(); ctx.channel().close();
} }
}else{ } else {
log.info("gateway not find deviceCodeParam!" ); log.info("gateway not find deviceCodeParam!");
} }
} else if (receiveStr.length() == 36 || receiveStr.length() == 40 || receiveStr.length() == 44 || receiveStr.length() == 50) { } else if (receiveStr.length() == 34 || receiveStr.length() == 36 || receiveStr.length() == 40 || receiveStr.length() == 44 || receiveStr.length() == 50) {
//电表返回数据解析 //电表返回数据解析
idleCount=1; idleCount = 1;
log.info("电表接收===> {},长度:{}", receiveStr, receiveStr.length()); log.info("电表接收===> {},长度:{}", receiveStr, receiveStr.length());
//解析采集的报文,并保存到数据库 //解析采集的报文,并保存到数据库
AnalysisReceiveOrder485 analysisReceiveOrder485 = new AnalysisReceiveOrder485(); nextSendOrder(ctx);
analysisReceiveOrder485.analysisMeterOrder485(receiveStr,deviceCodeParamList.get(num));
//清空receiveStr
receiveStr = "";
//判断发送的下标,如果不等于指令数组大小
num = num + 1;
if (num > size - 1) {
num = 0;
Thread.sleep(200);
// 继续发送下一个采集指令
SendOrderUtils.sendAllOrder(deviceCodeParamList.get(num),ctx,num,size);
log.info("------一轮采集完成,继续下一轮--------");
} else {
// 添加一个状态值,判断是否继续发送指令 update by ljf on 2020-08-07
if (Constant.WEB_FLAG) {
num = 0;
// 关闭连接
receiveStr = null;
ctx.close();
} else {
Thread.sleep(200);
// 继续发送下一个采集指令
SendOrderUtils.sendAllOrder(deviceCodeParamList.get(num),ctx,num,size);
}
}
} else if (receiveStr.length() == 18) { } else if (receiveStr.length() == 18) {
//冷量计返回数据解析 //冷量计返回数据解析
idleCount=1; idleCount = 1;
log.info("冷量计接收==>{},长度:{}", receiveStr, receiveStr.length()); log.info("冷量计接收==>{},长度:{}", receiveStr, receiveStr.length());
// 解析采集的报文,并保存到数据库 nextSendOrder(ctx);
AnalysisReceiveOrder485 analysisReceiveOrder485 = new AnalysisReceiveOrder485(); } else if (receiveStr.length() == 12 || receiveStr.length() == 14) {
analysisReceiveOrder485.analysisCloudOrder485(receiveStr,deviceCodeParamList.get(num) ); //冷水机返回数据解析
idleCount = 1;
log.info("冷水机接收===>{},长度:{}", receiveStr, receiveStr.length());
nextSendOrder(ctx);
} else if (receiveStr.length() > 50 && receiveStr.length() < 100) {
idleCount = 1;
// 清空receiveStr // 清空receiveStr
receiveStr = ""; nextSendOrder(ctx);
// 判断发送的下标,如果不等于指令数组大小
num = num + 1;
if (num > size - 1) {
num = 0;
Thread.sleep(200);
// 继续发送下一个采集指令
SendOrderUtils.sendAllOrder(deviceCodeParamList.get(num),ctx,num,size);
log.info("------一轮采集完成,继续下一轮--------");
} else {
// 添加一个状态值,判断是否继续发送指令 update by ljf on 2020-08-07
if (Constant.WEB_FLAG) {
num = 0;
// 关闭连接
receiveStr = null;
ctx.close();
} else {
Thread.sleep(200);
// 继续发送下一个采集指令
SendOrderUtils.sendAllOrder(deviceCodeParamList.get(num),ctx,num,size);
} }
// else if (receiveStr.length() >= 100 ){
// whiteGateway(ctx);
// }
// else {
// receiveStr = null;
// }
ctx.flush();
} }
}else if (receiveStr.length() == 12 || receiveStr.length() == 14) {
//冷水机返回数据解析 private void nextSendOrder(ChannelHandlerContext ctx) throws InterruptedException {
idleCount=1;
log.info("冷水机接收===>{},长度:{}", receiveStr, receiveStr.length());
// 解析采集的报文,并保存到数据库 // 解析采集的报文,并保存到数据库
AnalysisReceiveOrder485 analysisReceiveOrder485 = new AnalysisReceiveOrder485(); analysisReceiveData(receiveStr, deviceCodeParamList.get(num));
analysisReceiveOrder485.analysisChillerOrder485(receiveStr,deviceCodeParamList.get(num));
// 清空receiveStr // 清空receiveStr
receiveStr = ""; receiveStr = "";
// 判断发送的下标,如果不等于指令数组大小 // 判断发送的下标,如果不等于指令数组大小
@ -220,7 +182,7 @@ public class EchoServerHandler extends ChannelInboundHandlerAdapter {
num = 0; num = 0;
Thread.sleep(200); Thread.sleep(200);
// 继续发送下一个采集指令 // 继续发送下一个采集指令
SendOrderUtils.sendAllOrder(deviceCodeParamList.get(num),ctx,num,size); SendOrderUtils.sendAllOrder(deviceCodeParamList.get(num), ctx, num, size);
log.info("------一轮采集完成,继续下一轮--------"); log.info("------一轮采集完成,继续下一轮--------");
} else { } else {
// 添加一个状态值,判断是否继续发送指令 update by ljf on 2020-08-07 // 添加一个状态值,判断是否继续发送指令 update by ljf on 2020-08-07
@ -232,74 +194,47 @@ public class EchoServerHandler extends ChannelInboundHandlerAdapter {
} else { } else {
Thread.sleep(200); Thread.sleep(200);
// 继续发送下一个采集指令 // 继续发送下一个采集指令
SendOrderUtils.sendAllOrder(deviceCodeParamList.get(num),ctx,num,size); SendOrderUtils.sendAllOrder(deviceCodeParamList.get(num), ctx, num, size);
} }
} }
} else if (receiveStr.length() > 50 && receiveStr.length() < 100) {
idleCount=1;
// 清空receiveStr
receiveStr = null;
// 判断发送的下标,如果不等于指令数组大小
num = num + 1;
if (num > size - 1) {
num = 0;
Thread.sleep(200);
// 继续发送下一个采集指令
SendOrderUtils.sendAllOrder(deviceCodeParamList.get(num),ctx,num,size);
log.info("------一轮采集完成,继续下一轮--------");
} else {
// 添加一个状态值,判断是否继续发送指令 update by ljf on 2020-08-07
if (Constant.WEB_FLAG) {
num = 0;
// 关闭连接
receiveStr = null;
ctx.close();
} else {
Thread.sleep(200);
// 继续发送下一个采集指令
SendOrderUtils.sendAllOrder(deviceCodeParamList.get(num),ctx,num,size);
}
}
}
// else if (receiveStr.length() >= 100 ){
// whiteGateway(ctx);
// }
else {
receiveStr = null;
} }
ctx.flush();
private void analysisReceiveData(String receiveStr, DeviceCodeParamEntity deviceCodeParamEntity) {
// 解析采集的报文,并保存到数据库
AnalysisReceiveOrder485 analysisReceiveOrder485 = new AnalysisReceiveOrder485();
analysisReceiveOrder485.analysisReceiveOrder485(receiveStr, deviceCodeParamEntity);
} }
private void whiteGateway(ChannelHandlerContext ctx) throws InterruptedException { private void whiteGateway(ChannelHandlerContext ctx) throws InterruptedException {
if (receiveStr.substring(0,2).equalsIgnoreCase("2b") && receiveStr.substring(6,8).equalsIgnoreCase("7b")){ if (receiveStr.substring(0, 2).equalsIgnoreCase("2b") && receiveStr.substring(6, 8).equalsIgnoreCase("7b")) {
receiveStr=receiveStr.substring(6); receiveStr = receiveStr.substring(6);
} }
receiveStr=ExchangeStringUtil.hexStringToString(receiveStr) ; receiveStr = ExchangeStringUtil.hexStringToString(receiveStr);
log.info("白色网关接收===> " + receiveStr); log.info("白色网关接收===> " + receiveStr);
JSONObject jsonObject = JSONObject.parseObject(receiveStr); JSONObject jsonObject = JSONObject.parseObject(receiveStr);
receiveStr=""; receiveStr = "";
port=jsonObject.getString("snr"); //网关ID,从心跳包中获得 port = jsonObject.getString("snr"); //网关ID,从心跳包中获得
IP=jsonObject.getString("ip"); //ip IP = jsonObject.getString("ip"); //ip
String cmd=jsonObject.getString("cmd"); //指令模式dHeartbeat(心跳包),data(主动采集返回),reword String cmd = jsonObject.getString("cmd"); //指令模式dHeartbeat(心跳包),data(主动采集返回),reword
String name=jsonObject.getString("name"); String name = jsonObject.getString("name");
SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date date = new Date(); Date date = new Date();
String time = sdf1.format(date); String time = sdf1.format(date);
if(cmd.equals("dHeartbeat")){ if (cmd.equals("dHeartbeat")) {
JSONObject jsonHeart=new JSONObject(); JSONObject jsonHeart = new JSONObject();
jsonHeart.put("snr",port); jsonHeart.put("snr", port);
jsonHeart.put("cmd","uHeartbeat"); jsonHeart.put("cmd", "uHeartbeat");
jsonHeart.put("recordCheckTime","30"); jsonHeart.put("recordCheckTime", "30");
jsonHeart.put("keepAliveTime","50"); //通讯保持在线间隔,秒80 jsonHeart.put("keepAliveTime", "50"); //通讯保持在线间隔,秒80
jsonHeart.put("resetTime","23:59:59"); jsonHeart.put("resetTime", "23:59:59");
jsonHeart.put("ip",IP); jsonHeart.put("ip", IP);
jsonHeart.put("time",time); jsonHeart.put("time", time);
jsonHeart.put("name",name); jsonHeart.put("name", name);
jsonHeart.put("heartInterval","20");//网关发起心跳包的时间间隔,秒70 jsonHeart.put("heartInterval", "20");//网关发起心跳包的时间间隔,秒70
jsonHeart.put("recordMode","cover"); jsonHeart.put("recordMode", "cover");
String sendStr=jsonHeart.toString(); String sendStr = jsonHeart.toString();
log.info("白色网关回复收到心跳包===>" + sendStr); log.info("白色网关回复收到心跳包===>" + sendStr);
sendStr=ExchangeStringUtil.strTo16(sendStr); sendStr = ExchangeStringUtil.strTo16(sendStr);
ByteBuf buffer = ExchangeStringUtil.getByteBuf(ctx, sendStr); ByteBuf buffer = ExchangeStringUtil.getByteBuf(ctx, sendStr);
Thread.sleep(200); Thread.sleep(200);
ctx.channel().writeAndFlush(buffer); //发送数据 ctx.channel().writeAndFlush(buffer); //发送数据
@ -311,38 +246,38 @@ public class EchoServerHandler extends ChannelInboundHandlerAdapter {
log.info("白色网关接收长度===> " + size); log.info("白色网关接收长度===> " + size);
num = 0; num = 0;
// 发送采集报文 // 发送采集报文
if (size>0) { if (size > 0) {
if (idleCount<2){ if (idleCount < 2) {
Thread.sleep(200); Thread.sleep(200);
SendOrderUtils.sendAllOrder2(deviceCodeParamList.get(num), ctx,port,IP,num,size); SendOrderUtils.sendAllOrder2(deviceCodeParamList.get(num), ctx, port, IP, num, size);
idleCount++; idleCount++;
}else{ } else {
log.info("close this channel!"); log.info("close this channel!");
ctx.channel().close(); ctx.channel().close();
} }
}else{ } else {
log.info("white gateway not find deviceCodeParam!" ); log.info("white gateway not find deviceCodeParam!");
} }
}else{ } else {
idleCount=1; idleCount = 1;
String data=jsonObject.getString("data"); String data = jsonObject.getString("data");
String strHex=ExchangeStringUtil.base64ToHex(data); String strHex = ExchangeStringUtil.base64ToHex(data);
//返回值全部变成大写 //返回值全部变成大写
strHex= strHex.toUpperCase(); strHex = strHex.toUpperCase();
//截取去掉FE //截取去掉FE
String dataStr; String dataStr;
if (strHex.length()>8){ if (strHex.length() > 8) {
String str1=strHex.substring(0,8); String str1 = strHex.substring(0, 8);
String str2=strHex.substring(8); String str2 = strHex.substring(8);
dataStr=str1.replace("FE", "")+str2; dataStr = str1.replace("FE", "") + str2;
}else{ } else {
dataStr = strHex.replace("FE", ""); dataStr = strHex.replace("FE", "");
} }
if (dataStr.length() == 36 || dataStr.length() == 40 || dataStr.length() == 44 || dataStr.length() == 50){ if (dataStr.length() == 36 || dataStr.length() == 40 || dataStr.length() == 44 || dataStr.length() == 50) {
log.info("白色网关电表接收===>" + dataStr); log.info("白色网关电表接收===>" + dataStr);
// 解析采集的报文,并保存到数据库 // 解析采集的报文,并保存到数据库
AnalysisReceiveOrder485 analysisReceiveOrder485 = new AnalysisReceiveOrder485(); AnalysisReceiveOrder485 analysisReceiveOrder485 = new AnalysisReceiveOrder485();
analysisReceiveOrder485.analysisMeterOrder485(dataStr,deviceCodeParamList.get(num)); //电表报文解析 analysisReceiveOrder485.analysisMeterOrder485(dataStr, deviceCodeParamList.get(num)); //电表报文解析
// 清空dataStr // 清空dataStr
// 判断发送的下标,如果不等于指令数组大小 // 判断发送的下标,如果不等于指令数组大小
num = num + 1; num = num + 1;
@ -350,7 +285,7 @@ public class EchoServerHandler extends ChannelInboundHandlerAdapter {
num = 0; num = 0;
Thread.sleep(200); Thread.sleep(200);
// 继续发送下一个采集指令 // 继续发送下一个采集指令
SendOrderUtils.sendAllOrder2(deviceCodeParamList.get(num), ctx,port,IP,num,size); SendOrderUtils.sendAllOrder2(deviceCodeParamList.get(num), ctx, port, IP, num, size);
log.info("------一轮采集完成,继续下一轮--------"); log.info("------一轮采集完成,继续下一轮--------");
} else { } else {
// 添加一个状态值,判断是否继续发送指令 update by ljf on 2020-08-07 // 添加一个状态值,判断是否继续发送指令 update by ljf on 2020-08-07
@ -362,14 +297,14 @@ public class EchoServerHandler extends ChannelInboundHandlerAdapter {
} else { } else {
Thread.sleep(200); Thread.sleep(200);
// 继续发送下一个采集指令 // 继续发送下一个采集指令
SendOrderUtils.sendAllOrder2(deviceCodeParamList.get(num), ctx,port,IP,num,size); SendOrderUtils.sendAllOrder2(deviceCodeParamList.get(num), ctx, port, IP, num, size);
} }
} }
}else if(dataStr.length() == 12 || dataStr.length() == 14){ } else if (dataStr.length() == 12 || dataStr.length() == 14) {
log.info("白色网关冷水机接收===>" + dataStr); log.info("白色网关冷水机接收===>" + dataStr);
// 解析采集的报文,并保存到数据库 // 解析采集的报文,并保存到数据库
AnalysisReceiveOrder485 analysisReceiveOrder485 = new AnalysisReceiveOrder485(); AnalysisReceiveOrder485 analysisReceiveOrder485 = new AnalysisReceiveOrder485();
analysisReceiveOrder485.analysisChillerOrder485(dataStr,deviceCodeParamList.get(0)); analysisReceiveOrder485.analysisChillerOrder485(dataStr, deviceCodeParamList.get(0));
// 清空dataStr // 清空dataStr
dataStr = ""; dataStr = "";
// 判断发送的下标,如果不等于指令数组大小 // 判断发送的下标,如果不等于指令数组大小
@ -378,7 +313,7 @@ public class EchoServerHandler extends ChannelInboundHandlerAdapter {
num = 0; num = 0;
Thread.sleep(200); Thread.sleep(200);
// 继续发送下一个采集指令 // 继续发送下一个采集指令
SendOrderUtils.sendAllOrder2(deviceCodeParamList.get(num), ctx,port,IP,num,size); SendOrderUtils.sendAllOrder2(deviceCodeParamList.get(num), ctx, port, IP, num, size);
log.info("------一轮采集完成,继续下一轮--------"); log.info("------一轮采集完成,继续下一轮--------");
} else { } else {
// 添加一个状态值,判断是否继续发送指令 update by ljf on 2020-08-07 // 添加一个状态值,判断是否继续发送指令 update by ljf on 2020-08-07
@ -391,15 +326,15 @@ public class EchoServerHandler extends ChannelInboundHandlerAdapter {
} else { } else {
Thread.sleep(200); Thread.sleep(200);
// 继续发送下一个采集指令 // 继续发送下一个采集指令
SendOrderUtils.sendAllOrder2(deviceCodeParamList.get(num), ctx,port,IP,num,size); SendOrderUtils.sendAllOrder2(deviceCodeParamList.get(num), ctx, port, IP, num, size);
} }
} }
}else if(dataStr.length() == 18){ } else if (dataStr.length() == 18) {
// log.info("white gateway cloud receive message ===> " + dataStr); // log.info("white gateway cloud receive message ===> " + dataStr);
log.info("白色网关冷量计接收===> " + dataStr); log.info("白色网关冷量计接收===> " + dataStr);
// 解析采集的报文,并保存到数据库 // 解析采集的报文,并保存到数据库
AnalysisReceiveOrder485 analysisReceiveOrder485 = new AnalysisReceiveOrder485(); //冷量机报文解析 AnalysisReceiveOrder485 analysisReceiveOrder485 = new AnalysisReceiveOrder485(); //冷量机报文解析
analysisReceiveOrder485.analysisCloudOrder485(dataStr,deviceCodeParamList.get(num) ); analysisReceiveOrder485.analysisCloudOrder485(dataStr, deviceCodeParamList.get(num));
// 清空dataStr // 清空dataStr
dataStr = ""; dataStr = "";
// 判断发送的下标,如果不等于指令数组大小 // 判断发送的下标,如果不等于指令数组大小
@ -408,7 +343,7 @@ public class EchoServerHandler extends ChannelInboundHandlerAdapter {
num = 0; num = 0;
Thread.sleep(200); Thread.sleep(200);
// 继续发送下一个采集指令 // 继续发送下一个采集指令
SendOrderUtils.sendAllOrder2(deviceCodeParamList.get(num), ctx,port,IP,num,size); SendOrderUtils.sendAllOrder2(deviceCodeParamList.get(num), ctx, port, IP, num, size);
log.info("------一轮采集完成,继续下一轮--------"); log.info("------一轮采集完成,继续下一轮--------");
} else { } else {
// 添加一个状态值,判断是否继续发送指令 update by ljf on 2020-08-07 // 添加一个状态值,判断是否继续发送指令 update by ljf on 2020-08-07
@ -421,10 +356,10 @@ public class EchoServerHandler extends ChannelInboundHandlerAdapter {
} else { } else {
Thread.sleep(200); Thread.sleep(200);
// 继续发送下一个采集指令 // 继续发送下一个采集指令
SendOrderUtils.sendAllOrder2(deviceCodeParamList.get(num), ctx,port,IP,num,size); SendOrderUtils.sendAllOrder2(deviceCodeParamList.get(num), ctx, port, IP, num, size);
} }
} }
}else { //if(dataStr.length() > 50) } else { //if(dataStr.length() > 50)
// 清空dataStr // 清空dataStr
dataStr = null; dataStr = null;
// 判断发送的下标,如果不等于指令数组大小 // 判断发送的下标,如果不等于指令数组大小
@ -433,7 +368,7 @@ public class EchoServerHandler extends ChannelInboundHandlerAdapter {
num = 0; num = 0;
Thread.sleep(200); Thread.sleep(200);
// 继续发送下一个采集指令 // 继续发送下一个采集指令
SendOrderUtils.sendAllOrder2(deviceCodeParamList.get(num), ctx,port,IP,num,size); SendOrderUtils.sendAllOrder2(deviceCodeParamList.get(num), ctx, port, IP, num, size);
log.info("------一轮采集完成,继续下一轮--------"); log.info("------一轮采集完成,继续下一轮--------");
} else { } else {
// 添加一个状态值,判断是否继续发送指令 update by ljf on 2020-08-07 // 添加一个状态值,判断是否继续发送指令 update by ljf on 2020-08-07
@ -445,7 +380,7 @@ public class EchoServerHandler extends ChannelInboundHandlerAdapter {
} else { } else {
Thread.sleep(200); Thread.sleep(200);
// 继续发送下一个采集指令 // 继续发送下一个采集指令
SendOrderUtils.sendAllOrder2(deviceCodeParamList.get(num), ctx,port,IP,num,size); SendOrderUtils.sendAllOrder2(deviceCodeParamList.get(num), ctx, port, IP, num, size);
} }
} }
} }

2
user-service/src/main/java/com/mh/user/service/MeterManageService.java

@ -14,4 +14,6 @@ import java.util.List;
public interface MeterManageService extends BaseService<MeterManageEntity> { public interface MeterManageService extends BaseService<MeterManageEntity> {
List<MeterManageEntity> queryBySystemIdAndProjectId(String systemId, String projectId); List<MeterManageEntity> queryBySystemIdAndProjectId(String systemId, String projectId);
void updateDataById(Long mmId, String analysisData);
} }

11
user-service/src/main/java/com/mh/user/service/impl/MeterManageServiceImpl.java

@ -1,6 +1,7 @@
package com.mh.user.service.impl; package com.mh.user.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
import com.github.pagehelper.PageHelper; import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo; import com.github.pagehelper.PageInfo;
import com.mh.common.page.MybatisPageHelper; import com.mh.common.page.MybatisPageHelper;
@ -15,6 +16,7 @@ import com.mh.user.utils.ExchangeStringUtil;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.math.BigDecimal;
import java.util.Collections; import java.util.Collections;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
@ -45,6 +47,15 @@ public class MeterManageServiceImpl implements MeterManageService {
return meterManageMapper.selectList(queryWrapper.orderByDesc("create_time")); return meterManageMapper.selectList(queryWrapper.orderByDesc("create_time"));
} }
@Override
public void updateDataById(Long mmId, String analysisData) {
UpdateWrapper<MeterManageEntity> updateWrapper = new UpdateWrapper<>();
updateWrapper.eq("id", mmId);
updateWrapper.set("cur_value", analysisData);
updateWrapper.set("cur_time", new Date());
meterManageMapper.update(updateWrapper);
}
@Override @Override
public PageResult queryByPage(PageRequest pageRequest) { public PageResult queryByPage(PageRequest pageRequest) {
String systemID = StringUtils.getColumnFilterValue(pageRequest, "systemId"); String systemID = StringUtils.getColumnFilterValue(pageRequest, "systemId");

3
user-service/src/main/java/com/mh/user/strategy/CJ188ProtocolStrategy.java

@ -2,6 +2,7 @@ package com.mh.user.strategy;
import com.mh.common.utils.StringUtils; import com.mh.common.utils.StringUtils;
import com.mh.user.constants.Constant; import com.mh.user.constants.Constant;
import com.mh.user.entity.DeviceCodeParamEntity;
import com.mh.user.entity.MeterManageEntity; import com.mh.user.entity.MeterManageEntity;
import com.mh.user.factory.Protocol; import com.mh.user.factory.Protocol;
import com.mh.user.utils.ExchangeStringUtil; import com.mh.user.utils.ExchangeStringUtil;
@ -52,7 +53,7 @@ public class CJ188ProtocolStrategy implements ProtocolStrategy {
} }
@Override @Override
public String analysisReceiveData(MeterManageEntity meterManageEntity, String receiveData) { public String analysisReceiveData(DeviceCodeParamEntity deviceCodeParamEntity, String receiveData) {
log.info("水表标准协议:策略解析报文"); log.info("水表标准协议:策略解析报文");
String meterId = ""; String meterId = "";
String data = ""; String data = "";

99
user-service/src/main/java/com/mh/user/strategy/EleProtocolStrategy.java

@ -2,10 +2,27 @@ package com.mh.user.strategy;
import com.mh.common.utils.StringUtils; import com.mh.common.utils.StringUtils;
import com.mh.user.constants.Constant; import com.mh.user.constants.Constant;
import com.mh.user.entity.DataResultEntity;
import com.mh.user.entity.DeviceCodeParamEntity;
import com.mh.user.entity.MeterManageEntity; import com.mh.user.entity.MeterManageEntity;
import com.mh.user.factory.Protocol; import com.mh.user.service.DataResultService;
import com.mh.user.service.DeviceCodeParamService;
import com.mh.user.service.MeterManageService;
import com.mh.user.service.ProjectInfoService;
import com.mh.user.service.chillers.ChillersService;
import com.mh.user.service.chillers.OrderMessageService;
import com.mh.user.utils.ExchangeStringUtil; import com.mh.user.utils.ExchangeStringUtil;
import com.mh.user.utils.SpringBeanUtil;
import com.mh.user.utils.ThreadPoolService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.ApplicationContext;
import java.math.BigDecimal;
import java.text.DecimalFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.ThreadPoolExecutor;
/** /**
* @author LJF * @author LJF
@ -17,15 +34,23 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j @Slf4j
public class EleProtocolStrategy implements ProtocolStrategy { public class EleProtocolStrategy implements ProtocolStrategy {
private static class SingletonHolder{ // 调用service
ApplicationContext context = SpringBeanUtil.getApplicationContext();
ThreadPoolExecutor threadPoolService = ThreadPoolService.getInstance();
DataResultService dataResultService = context.getBean(DataResultService.class);
ProjectInfoService projectInfoService = context.getBean(ProjectInfoService.class);
private static final SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
private static class SingletonHolder {
private static final EleProtocolStrategy INSTANCE = new EleProtocolStrategy(); private static final EleProtocolStrategy INSTANCE = new EleProtocolStrategy();
} }
private EleProtocolStrategy(){ private EleProtocolStrategy() {
// 防止外部直接实例化 // 防止外部直接实例化
} }
public static EleProtocolStrategy getInstance(){ public static EleProtocolStrategy getInstance() {
return SingletonHolder.INSTANCE; return SingletonHolder.INSTANCE;
} }
@ -68,20 +93,18 @@ public class EleProtocolStrategy implements ProtocolStrategy {
return Constant.FAIL; return Constant.FAIL;
} }
} }
log.info("生成采集电表指令==>表号:{},指令:{}", meterManageEntity.getMtCode(),str); log.info("生成采集电表指令==>表号:{},指令:{}", meterManageEntity.getMtCode(), str);
return str.toUpperCase(); return str.toUpperCase();
} }
@Override @Override
public String analysisReceiveData(MeterManageEntity meterManageEntity, String receiveData) { public String analysisReceiveData(DeviceCodeParamEntity deviceCodeParamEntity, String receiveData) {
log.info("电表97/07规约协议:工厂解析报文"); log.info("电表97/07规约协议:工厂解析报文");
String data = ""; String data = "";
if (receiveData.length() > 8) { if (receiveData.length() == 34 || receiveData.length() == 36 || receiveData.length() == 40 || receiveData.length() == 44 || receiveData.length() == 50) {
String str1 = receiveData.substring(0, 8); String str1 = receiveData.substring(0, 8);
String str2 = receiveData.substring(8); String str2 = receiveData.substring(8);
receiveData = str1.replace("FE", "") + str2; receiveData = str1.replace("FE", "") + str2;
}
if (receiveData.length() == 36 || receiveData.length() == 40 || receiveData.length() == 44 || receiveData.length() == 50) {
String checkStr = receiveData.substring(0, receiveData.length() - 4); //减去校验码 String checkStr = receiveData.substring(0, receiveData.length() - 4); //减去校验码
String checkNum = ExchangeStringUtil.makeChecksum(checkStr); //生成校验码 String checkNum = ExchangeStringUtil.makeChecksum(checkStr); //生成校验码
//返回的校验码与重新生成的校验码进行校验 //返回的校验码与重新生成的校验码进行校验
@ -90,18 +113,7 @@ public class EleProtocolStrategy implements ProtocolStrategy {
String meterId = checkStr.substring(12, 14) + checkStr.substring(10, 12) + checkStr.substring(8, 10) String meterId = checkStr.substring(12, 14) + checkStr.substring(10, 12) + checkStr.substring(8, 10)
+ checkStr.substring(6, 8) + checkStr.substring(4, 6) + checkStr.substring(2, 4); + checkStr.substring(6, 8) + checkStr.substring(4, 6) + checkStr.substring(2, 4);
meterId = String.format("%012d", Long.parseLong(meterId)); meterId = String.format("%012d", Long.parseLong(meterId));
StringBuilder stringBuilder = new StringBuilder(); StringBuilder stringBuilder = getStringBuilder(receiveData, checkStr);
if (receiveData.length() == 36) {
for (int i = 0; i < 4; i++) {
String data1 = checkStr.substring(32 - 2 * (i + 1), 32 - 2 * i);
stringBuilder.append(data1);
}
} else {
for (int i = 0; i < 4; i++) {
String data1 = checkStr.substring(36 - 2 * (i + 1), 36 - 2 * i);
stringBuilder.append(data1);
}
}
data = stringBuilder.toString(); data = stringBuilder.toString();
data = ExchangeStringUtil.cutThree(data); data = ExchangeStringUtil.cutThree(data);
// 0 代表前面补充0,4 代表长度为4,d 代表参数为正数型 // 0 代表前面补充0,4 代表长度为4,d 代表参数为正数型
@ -115,7 +127,52 @@ public class EleProtocolStrategy implements ProtocolStrategy {
if (!StringUtils.isBlank(data)) { if (!StringUtils.isBlank(data)) {
data = String.valueOf(Double.valueOf(data)); //00010.76,去除读数前面带0的情况 data = String.valueOf(Double.valueOf(data)); //00010.76,去除读数前面带0的情况
} }
// 解析入库
analysisMeterOrder485(data, deviceCodeParamEntity);
return data; return data;
} }
private static StringBuilder getStringBuilder(String receiveData, String checkStr) {
StringBuilder stringBuilder = new StringBuilder();
if (receiveData.length() == 36 || receiveData.length() == 34) {
for (int i = 0; i < 4; i++) {
String data1 = checkStr.substring(32 - 2 * (i + 1), 32 - 2 * i);
stringBuilder.append(data1);
}
} else {
for (int i = 0; i < 4; i++) {
String data1 = checkStr.substring(36 - 2 * (i + 1), 36 - 2 * i);
stringBuilder.append(data1);
}
}
return stringBuilder;
}
/**
* 解析电表返回的数据
*
* @param dataStr
*/
public void analysisMeterOrder485(final String dataStr, final DeviceCodeParamEntity deviceCodeParam) {
threadPoolService.execute(() -> {
Date date = new Date();
String dateStr = sdf1.format(date);
try {
DataResultEntity dataResultEntity = new DataResultEntity();
dataResultEntity.setDeviceAddr(deviceCodeParam.getDeviceAddr()); //通讯编号
dataResultEntity.setDeviceType("电表"); //类型
dataResultEntity.setProjectId(deviceCodeParam.getProjectId()); //所属项目
dataResultEntity.setCurValue(new BigDecimal(dataStr)); //当前读数
dataResultEntity.setCurDate(sdf1.parse(dateStr)); //当前日期
dataResultEntity.setGrade(deviceCodeParam.getGrade());
String projectName = projectInfoService.selectName(deviceCodeParam.getProjectId());
log.info("电表==>{},读数==>{},项目名称==>{}", deviceCodeParam.getDeviceAddr(), dataStr, projectName);
dataResultService.saveDataResult(dataResultEntity);
log.info("电表保存数据成功!项目名称:{}", projectName);
} catch (Exception e) {
log.error("保存电表数据失败!", e);
}
});
}
} }

149
user-service/src/main/java/com/mh/user/strategy/ModbusProtocolStrategy.java

@ -2,12 +2,24 @@ package com.mh.user.strategy;
import com.mh.common.utils.StringUtils; import com.mh.common.utils.StringUtils;
import com.mh.user.constants.Constant; import com.mh.user.constants.Constant;
import com.mh.user.entity.DataResultChEntity;
import com.mh.user.entity.DataResultClEntity;
import com.mh.user.entity.DeviceCodeParamEntity;
import com.mh.user.entity.MeterManageEntity; import com.mh.user.entity.MeterManageEntity;
import com.mh.user.service.DataResultService;
import com.mh.user.service.ProjectInfoService;
import com.mh.user.utils.ExchangeStringUtil; import com.mh.user.utils.ExchangeStringUtil;
import com.mh.user.utils.SpringBeanUtil;
import com.mh.user.utils.ThreadPoolService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationContext;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.math.RoundingMode; import java.math.RoundingMode;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.ThreadPoolExecutor;
/** /**
* @author LJF * @author LJF
@ -19,6 +31,14 @@ import java.math.RoundingMode;
@Slf4j @Slf4j
public class ModbusProtocolStrategy implements ProtocolStrategy { public class ModbusProtocolStrategy implements ProtocolStrategy {
// 调用service
ApplicationContext context = SpringBeanUtil.getApplicationContext();
DataResultService dataResultService = context.getBean(DataResultService.class);
ProjectInfoService projectInfoService = context.getBean(ProjectInfoService.class);
ThreadPoolExecutor threadPoolService = ThreadPoolService.getInstance();
private static final SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
private static class SingletonHolder { private static class SingletonHolder {
private static final ModbusProtocolStrategy INSTANCE = new ModbusProtocolStrategy(); private static final ModbusProtocolStrategy INSTANCE = new ModbusProtocolStrategy();
} }
@ -55,11 +75,11 @@ public class ModbusProtocolStrategy implements ProtocolStrategy {
} }
@Override @Override
public String analysisReceiveData(MeterManageEntity meterManageEntity, String receiveData) { public String analysisReceiveData(DeviceCodeParamEntity deviceCodeParamEntity, String receiveData) {
log.info("modbus标准协议:策略解析报文"); log.info("modbus标准协议:策略解析报文");
String checkStr = receiveData.substring(0, receiveData.length() - 4);//检验报文 String checkStr = receiveData.substring(0, receiveData.length() - 4);//检验报文
String checkWord = ExchangeStringUtil.getStrCRC16(checkStr);//生成校验码 String checkWord = ExchangeStringUtil.getStrCRC16(checkStr);//生成校验码
String sValue = null; String sValue = "0";
String rtData = Constant.FAIL; String rtData = Constant.FAIL;
if (!checkWord.equalsIgnoreCase(receiveData.substring(receiveData.length() - 4))) { if (!checkWord.equalsIgnoreCase(receiveData.substring(receiveData.length() - 4))) {
log.info("Modbus报文检验失败: {}", receiveData); log.info("Modbus报文检验失败: {}", receiveData);
@ -67,27 +87,140 @@ public class ModbusProtocolStrategy implements ProtocolStrategy {
} }
// 开始解析: 地址+功能码+数据长度+数据域 // 开始解析: 地址+功能码+数据长度+数据域
// 截取数据长度 // 截取数据长度
String dataLength = receiveData.substring(6, 8); String dataLength = receiveData.substring(4, 6);
int dataLengthInt = Integer.parseInt(dataLength, 16); int dataLengthInt = Integer.parseInt(dataLength, 16);
// 截取数据域 // 截取数据域
String data = receiveData.substring(8, 8 + dataLengthInt * 2); String data = receiveData.substring(6, 6 + dataLengthInt * 2);
// 判断 // 判断
switch (meterManageEntity.getDataType()) { switch (deviceCodeParamEntity.getDataType()) {
case 0: case 0:
case 1:
// 16进制转十进制类型 // 16进制转十进制类型
sValue = ExchangeStringUtil.hexToDec(data); sValue = ExchangeStringUtil.hexToDec(data);
// 保留位数 // 保留位数
sValue = (new BigDecimal(sValue)).divide(new BigDecimal(String.valueOf(meterManageEntity.getDigits() * 10)), 2, RoundingMode.HALF_UP).toString(); sValue = (new BigDecimal(sValue)).divide(new BigDecimal(String.valueOf(deviceCodeParamEntity.getDigit() * 10)), 2, RoundingMode.HALF_UP).toString();
break; break;
case 1: case 2:
// 十六进制字符串转IEEE754浮点型 // 十六进制字符串转IEEE754浮点型
sValue = String.valueOf(ExchangeStringUtil.hexToSingle(data)); sValue = String.valueOf(ExchangeStringUtil.hexToSingle(data));
break; break;
default: default:
break; break;
} }
log.info("解析数据==>表号:{},寄存器地址:{},值:{}", meterManageEntity.getMtNum(), meterManageEntity.getRegisterAddr(), sValue); log.info("解析数据==>表号:{},寄存器地址:{},值:{}", deviceCodeParamEntity.getDeviceAddr(), deviceCodeParamEntity.getRegisterAddr(), sValue);
// 入库数据
// 冷量表
if ("2".equals(deviceCodeParamEntity.getDeviceType())) {
analysisCloudOrder485(sValue, deviceCodeParamEntity);
} else if ("0".equals(deviceCodeParamEntity.getDeviceType())) {
analysisChillerOrder485(sValue, deviceCodeParamEntity);
}
return sValue; return sValue;
} }
/**
* 解析冷水机组返回的数据
*/
public void analysisChillerOrder485(final String data, final DeviceCodeParamEntity deviceCodeParam) {
if (!Constant.CONTROL_WEB_FLAG) {
Date date = new Date();
// 冷水机组的地址
String chillerAddr = deviceCodeParam.getDeviceAddr();
DataResultChEntity dataResultCh = new DataResultChEntity();
// 状态
try {
// 赋值给dataResultCh
initialDataResultCh(deviceCodeParam, dataResultCh, chillerAddr, date, data);
} catch (Exception e) {
log.error("冷水机报错:", e);
}
}
}
/**
* 格式化数据
*
* @param deviceCodeParam
* @param dataResultCh
* @param chillerAddr
* @param date
* @param data
* @throws ParseException
*/
public void initialDataResultCh(DeviceCodeParamEntity deviceCodeParam, DataResultChEntity dataResultCh, String chillerAddr, Date date, String data) throws ParseException {
dataResultCh.setDeviceAddr(chillerAddr);
dataResultCh.setDeviceType(deviceCodeParam.getDeviceType());
dataResultCh.setCurDate(date);
dataResultCh.setCurValue(data);
dataResultCh.setRegisterAddr(deviceCodeParam.getRegisterAddr());
dataResultCh.setRegisterName(deviceCodeParam.getRegisterName());
dataResultCh.setGrade(deviceCodeParam.getGrade());
dataResultCh.setFunCode(deviceCodeParam.getFunCode());
dataResultCh.setProjectId(deviceCodeParam.getProjectId());
String projectName = projectInfoService.selectName(deviceCodeParam.getProjectId());
log.info("冷水机:" + chillerAddr + ",状态:" + data + ",项目名称:" + projectName);
dataResultService.saveDataResultChiller(dataResultCh);
dataResultService.deleteDataResultNow(deviceCodeParam.getDeviceAddr(), deviceCodeParam.getDeviceType(), deviceCodeParam.getRegisterAddr(), deviceCodeParam.getProjectId());
log.info("冷水机保存成功!项目名称:" + projectName);
}
//解析冷量表
public void analysisCloudOrder485(final String data, final DeviceCodeParamEntity deviceCodeParam) {
threadPoolService.execute(() -> {
//创建SimpleDateFormat对象,指定样式 2019-05-13 22:39:30
Date date = new Date();
String dateStr = sdf1.format(date);
String cloudId = deviceCodeParam.getDeviceAddr();
DataResultChEntity dataResultCh = new DataResultChEntity();
DataResultClEntity dataResultCl = new DataResultClEntity();
String registerAddr = deviceCodeParam.getRegisterAddr();
if (ExchangeStringUtil.isInDate(date, "00:00:00", "00:00:30")) {
dateStr = dateStr.substring(0, 17) + "00";
System.out.println("插入时间00" + dateStr);
} else if (ExchangeStringUtil.isInDate(date, "00:00:30", "00:00:59")) {
dateStr = dateStr.substring(0, 17) + "30";
System.out.println("插入时间30" + dateStr);
}
try {
if (registerAddr.equals("0004") || registerAddr.equals("0014")) {
dataResultCh.setDeviceAddr(cloudId);
dataResultCh.setDeviceType(deviceCodeParam.getDeviceType());
dataResultCh.setCurDate(sdf1.parse(dateStr));
dataResultCh.setCurValue(data);
dataResultCh.setRegisterAddr(deviceCodeParam.getRegisterAddr());
dataResultCh.setRegisterName(deviceCodeParam.getRegisterName());
dataResultCh.setGrade(deviceCodeParam.getGrade());
dataResultCh.setProjectId(deviceCodeParam.getProjectId());
dataResultCh.setGrade(deviceCodeParam.getGrade());
String projectName = projectInfoService.selectName(deviceCodeParam.getProjectId());
log.info("冷量计==>{},寄存器地址==>{},读数==>{},项目名称==>{}", cloudId, registerAddr, dataResultCh.getCurValue(), projectName);
dataResultService.saveDataResultCh(dataResultCh);
log.info("冷量计瞬时冷量/流量保存数据库成功!项目名称:{}", projectName);
} else if (registerAddr.equals("0050")) {
dataResultCl.setDeviceAddr(cloudId);
dataResultCl.setDeviceType(deviceCodeParam.getDeviceType());
dataResultCh.setCurDate(sdf1.parse(dateStr));
BigDecimal lData = new BigDecimal(data);
dataResultCl.setCurValue(lData);//字符串转整型
dataResultCl.setRegisterAddr(deviceCodeParam.getRegisterAddr());
dataResultCl.setRegisterName(deviceCodeParam.getRegisterName());
dataResultCl.setGrade(deviceCodeParam.getGrade());
dataResultCl.setProjectId(deviceCodeParam.getProjectId());
String projectName = projectInfoService.selectName(deviceCodeParam.getProjectId());
log.info("冷量计==>{},寄存器地址==>{},累计读数==>{},项目名称==>{}", cloudId, registerAddr, lData, projectName);
if (lData.intValue() < 99999999) {
if (lData.intValue() != 2231365) {
dataResultService.saveDataResultCl(dataResultCl);
}
}
log.info("冷量计累计保存数据成功!项目名称:{}", projectName);
dataResultService.saveDataResultCl_bak(dataResultCl);
}
} catch (Exception e) {
log.error("保存冷量计数据失败!", e);
}
});
}
} }

5
user-service/src/main/java/com/mh/user/strategy/ProtocolStrategy.java

@ -1,5 +1,6 @@
package com.mh.user.strategy; package com.mh.user.strategy;
import com.mh.user.entity.DeviceCodeParamEntity;
import com.mh.user.entity.MeterManageEntity; import com.mh.user.entity.MeterManageEntity;
/** /**
@ -20,9 +21,9 @@ public interface ProtocolStrategy {
/** /**
* 解析指令 * 解析指令
* @param meterManageEntity * @param deviceCodeParamEntity
* @param receiveData * @param receiveData
* @return * @return
*/ */
String analysisReceiveData(MeterManageEntity meterManageEntity, String receiveData); String analysisReceiveData(DeviceCodeParamEntity deviceCodeParamEntity, String receiveData);
} }

24
user-service/src/main/java/com/mh/user/utils/AnalysisReceiveOrder485.java

@ -4,12 +4,17 @@ import com.alibaba.fastjson2.JSON;
import com.mh.user.config.RabbitmqConfig; import com.mh.user.config.RabbitmqConfig;
import com.mh.user.constants.Constant; import com.mh.user.constants.Constant;
import com.mh.user.entity.*; import com.mh.user.entity.*;
import com.mh.user.factory.Protocol;
import com.mh.user.factory.ProtocolFactory;
import com.mh.user.model.QueueParam; import com.mh.user.model.QueueParam;
import com.mh.user.service.DataResultService; import com.mh.user.service.DataResultService;
import com.mh.user.service.DeviceCodeParamService; import com.mh.user.service.DeviceCodeParamService;
import com.mh.user.service.MeterManageService;
import com.mh.user.service.ProjectInfoService; import com.mh.user.service.ProjectInfoService;
import com.mh.user.service.chillers.ChillersService; import com.mh.user.service.chillers.ChillersService;
import com.mh.user.service.chillers.OrderMessageService; import com.mh.user.service.chillers.OrderMessageService;
import com.mh.user.strategy.ProtocolStrategy;
import com.mh.user.strategy.ProtocolStrategyFactory;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContext;
@ -40,10 +45,29 @@ public class AnalysisReceiveOrder485 {
DeviceCodeParamService deviceCodeParamService = context.getBean(DeviceCodeParamService.class); DeviceCodeParamService deviceCodeParamService = context.getBean(DeviceCodeParamService.class);
RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class); RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
ProjectInfoService projectInfoService = context.getBean(ProjectInfoService.class); ProjectInfoService projectInfoService = context.getBean(ProjectInfoService.class);
MeterManageService meterManageService = context.getBean(MeterManageService.class);
private final SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); private final SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
private final DecimalFormat df = new DecimalFormat("#.##"); private final DecimalFormat df = new DecimalFormat("#.##");
// 根据工厂和策略模式解析接收到的合法数据
public void analysisReceiveOrder485(final String receiveData, final DeviceCodeParamEntity deviceCodeParam) {
// 创建设备报文
String protocolType = String.valueOf(deviceCodeParam.getProtocolType());
Protocol protocol = ProtocolFactory.matchProtocol(protocolType);
ProtocolStrategy strategy = ProtocolStrategyFactory.matchProtocolStrategy(protocolType);
if (strategy == null) {
return;
}
protocol.setStrategy(strategy);
String analysisData = protocol.analysisReceiveData(deviceCodeParam, receiveData);
log.info("解析后的数据===> {}", analysisData);
// 更新实时状态数据
meterManageService.updateDataById(deviceCodeParam.getMmId(), analysisData);
}
//解析冷量表 //解析冷量表
public void analysisCloudOrder485(final String dataStr1, final DeviceCodeParamEntity deviceCodeParam) { public void analysisCloudOrder485(final String dataStr1, final DeviceCodeParamEntity deviceCodeParam) {
threadPoolService.execute(() -> { threadPoolService.execute(() -> {

Loading…
Cancel
Save