Browse Source

1、添加netty采集生活热水水电表、热泵信息;

dev_mz
mh 1 week ago
parent
commit
1603247df1
  1. 33
      mh-admin/src/main/java/com/mh/MHRunner.java
  2. 2
      mh-admin/src/main/java/com/mh/web/controller/device/OperationController.java
  3. 9
      mh-common/src/main/java/com/mh/common/core/domain/entity/GatewayManage.java
  4. 277
      mh-common/src/main/java/com/mh/common/utils/AnalysisReceiveOrder485.java
  5. 253
      mh-common/src/main/java/com/mh/common/utils/CRC16.java
  6. 1350
      mh-common/src/main/java/com/mh/common/utils/ExchangeStringUtil.java
  7. 53
      mh-common/src/main/java/com/mh/common/utils/SendOrderUtils.java
  8. 2
      mh-framework/src/main/java/com/mh/framework/dealdata/DataProcessService.java
  9. 17
      mh-framework/src/main/java/com/mh/framework/dealdata/impl/DataProcessServiceImpl.java
  10. 57
      mh-framework/src/main/java/com/mh/framework/netty/EchoServer.java
  11. 264
      mh-framework/src/main/java/com/mh/framework/netty/EchoServerHandler.java
  12. 33
      mh-framework/src/main/java/com/mh/framework/netty/ServerChannelInitializer.java
  13. 12
      mh-quartz/src/main/java/com/mh/quartz/task/AHUTask.java
  14. 17
      mh-quartz/src/main/java/com/mh/quartz/task/DealDataTask.java
  15. 6
      mh-quartz/src/main/java/com/mh/quartz/util/AHUPIDControlUtil.java
  16. 5
      mh-system/src/main/java/com/mh/system/mapper/device/CollectionParamsManageMapper.java
  17. 14
      mh-system/src/main/java/com/mh/system/mapper/device/DataProcessMapper.java
  18. 2
      mh-system/src/main/java/com/mh/system/mapper/device/GatewayManageMapper.java
  19. 2
      mh-system/src/main/java/com/mh/system/service/device/ICollectionParamsManageService.java
  20. 2
      mh-system/src/main/java/com/mh/system/service/device/IGatewayManageService.java
  21. 53
      mh-system/src/main/java/com/mh/system/service/device/impl/CollectionParamsManageServiceImpl.java
  22. 6
      mh-system/src/main/java/com/mh/system/service/device/impl/GatewayManageServiceImpl.java

33
mh-admin/src/main/java/com/mh/MHRunner.java

@ -1,14 +1,19 @@
package com.mh; package com.mh;
import com.mh.common.core.domain.entity.GatewayManage;
import com.mh.common.core.domain.entity.MqttSubscription; import com.mh.common.core.domain.entity.MqttSubscription;
import com.mh.common.utils.StringUtils; import com.mh.common.utils.StringUtils;
import com.mh.framework.mqtt.service.IMqttTopicService; import com.mh.framework.mqtt.service.IMqttTopicService;
import com.mh.framework.netty.EchoServer;
import com.mh.system.service.device.ICollectionParamsManageService;
import com.mh.system.service.device.IGatewayManageService;
import com.mh.system.service.mqtt.IMqttSubscriptionService; import com.mh.system.service.mqtt.IMqttSubscriptionService;
import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner; import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.List; import java.util.List;
import java.util.stream.Collectors;
/** /**
* @author LJF * @author LJF
@ -24,15 +29,41 @@ public class MHRunner implements ApplicationRunner {
private final IMqttTopicService iMqttTopicService; private final IMqttTopicService iMqttTopicService;
public MHRunner(IMqttSubscriptionService iMqttSubscriptionService, IMqttTopicService iMqttTopicService) { private final ICollectionParamsManageService collectionParamsManageService;
private final IGatewayManageService gatewayManageService;
public MHRunner(IMqttSubscriptionService iMqttSubscriptionService, IMqttTopicService iMqttTopicService, ICollectionParamsManageService collectionParamsManageService, IGatewayManageService gatewayManageService) {
this.iMqttSubscriptionService = iMqttSubscriptionService; this.iMqttSubscriptionService = iMqttSubscriptionService;
this.iMqttTopicService = iMqttTopicService; this.iMqttTopicService = iMqttTopicService;
this.collectionParamsManageService = collectionParamsManageService;
this.gatewayManageService = gatewayManageService;
} }
@Override @Override
public void run(ApplicationArguments args) throws Exception { public void run(ApplicationArguments args) throws Exception {
// 初始化mqtt订阅记录 // 初始化mqtt订阅记录
initializeMqttSubscription(); initializeMqttSubscription();
// 生成DTU采集参数
createDtuCollectionParams();
// 启动netty服务端
startNettyServer();
}
private void startNettyServer() {
List<GatewayManage> gatewayManages = gatewayManageService.selectGwManageList(new GatewayManage());
if (gatewayManages != null && !gatewayManages.isEmpty()) {
// 根据端口号分组
gatewayManages.stream().collect(Collectors.groupingBy(GatewayManage::getPort)).forEach((k, v) -> {
// 启动网关
GatewayManage gatewayManage = v.getFirst();
new Thread(() -> new EchoServer(gatewayManage.getPort()).start()).start();
});
}
}
private void createDtuCollectionParams() {
collectionParamsManageService.createDtuCollectionParams();
} }
/** /**

2
mh-admin/src/main/java/com/mh/web/controller/device/OperationController.java

@ -99,7 +99,7 @@ public class OperationController extends BaseController {
String name = mhConfig.getName(); String name = mhConfig.getName();
// 获取mqtt操作队列(后期通过mqtt队列配置发送主题) // 获取mqtt操作队列(后期通过mqtt队列配置发送主题)
log.info("发送主题:{},消息:{}", name + "/"+ controlTopic, sendOrder); log.info("发送主题:{},消息:{}", name + "/"+ controlTopic, sendOrder);
// iMqttGatewayService.publish(name + "/"+ controlTopic, sendOrder, 1); iMqttGatewayService.publish(name + "/"+ controlTopic, sendOrder, 1);
} catch (Exception e) { } catch (Exception e) {
log.error("设备操作失败", e); log.error("设备操作失败", e);
return AjaxResult.error(); return AjaxResult.error();

9
mh-common/src/main/java/com/mh/common/core/domain/entity/GatewayManage.java

@ -35,6 +35,7 @@ public class GatewayManage extends BaseEntity {
private int communicationType; // 通讯类型 private int communicationType; // 通讯类型
private int grade; // 标志位(连接状态) 0:正常;1:不在线;2:异常 private int grade; // 标志位(连接状态) 0:正常;1:不在线;2:异常
private Integer status; // (连接状态) 0:正常;1:不在线;2:异常 private Integer status; // (连接状态) 0:正常;1:不在线;2:异常
private String heartBeat; // 心跳包
@JsonIgnore @JsonIgnore
@TableField(exist = false) @TableField(exist = false)
@ -47,6 +48,14 @@ public class GatewayManage extends BaseEntity {
@JsonInclude(JsonInclude.Include.NON_EMPTY) @JsonInclude(JsonInclude.Include.NON_EMPTY)
private Map<String, Object> params; private Map<String, Object> params;
public String getHeartBeat() {
return heartBeat;
}
public void setHeartBeat(String heartBeat) {
this.heartBeat = heartBeat;
}
public Integer getStatus() { public Integer getStatus() {
return status; return status;
} }

277
mh-common/src/main/java/com/mh/common/utils/AnalysisReceiveOrder485.java

@ -0,0 +1,277 @@
package com.mh.common.utils;
import com.mh.common.core.domain.entity.CollectionParamsManage;
import lombok.extern.slf4j.Slf4j;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.text.DecimalFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;
/**
* @author ljf
* @title
* @description 解析485接收的数据
* @updateTime 2020-04-23
* @throws
*/
@Slf4j
public class AnalysisReceiveOrder485 {
// 调用service
private final SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
private final DecimalFormat df = new DecimalFormat("#.##");
//解析冷量表
public void analysisCloudOrder485(final String dataStr1, final CollectionParamsManage deviceCodeParam) {
// 去掉空格
String dataStr = dataStr1.replace(" ", "").toUpperCase();
// 检验报文
String checkStr = dataStr.substring(0, dataStr.length() - 4);
byte[] strOrder = ExchangeStringUtil.hexStrToBinaryStr(checkStr);
int checkNum = CRC16.CRC16_MODBUS(strOrder);
String checkWord = ExchangeStringUtil.decToHex(String.valueOf(checkNum));
checkWord = checkWord.substring(2, 4) + checkWord.substring(0, 2);
if (checkWord.equalsIgnoreCase(dataStr.substring(dataStr.length() - 4))) {
//创建SimpleDateFormat对象,指定样式 2019-05-13 22:39:30
Date date = new Date();
String dateStr = sdf1.format(date);
;
//保留两位小数处理
DecimalFormat decimalFormat = new DecimalFormat("0.00");
// 表号
String cloudId = ExchangeStringUtil.hexToDec(dataStr.substring(0, 2));
// 读数
String data = "";
data = dataStr.substring(dataStr.length() - 8, dataStr.length() - 6)
+ dataStr.substring(dataStr.length() - 6, dataStr.length() - 4)
+ dataStr.substring(dataStr.length() - 12, dataStr.length() - 10)
+ dataStr.substring(dataStr.length() - 10, dataStr.length() - 8);
String registerAddr = deviceCodeParam.getRegisterAddr();
if (ExchangeStringUtil.isInDate(date, "00:00:00", "00:00:30")) {
dateStr = dateStr.substring(0, 17) + "00";
} else if (ExchangeStringUtil.isInDate(date, "00:00:30", "00:00:59")) {
dateStr = dateStr.substring(0, 17) + "30";
}
try {
if (registerAddr.equals("32") || registerAddr.equals("33") || registerAddr.equals("35") || registerAddr.equals("36")) {
data = decimalFormat.format(Math.abs(ExchangeStringUtil.hexToSingle(data)));//十六进制字符串转IEEE754浮点型
log.info("冷量计==>{},寄存器地址==>{},读数==>{}", cloudId, registerAddr, data);
} else if (registerAddr.equals("31") || registerAddr.equals("34")) {
long lData = Long.parseLong(ExchangeStringUtil.hexToDec(data));
log.info("冷量计==>{},寄存器地址==>{},累计读数==>{}", cloudId, registerAddr, lData);
}
} catch (Exception e) {
log.error("保存冷量计数据失败!", e);
}
} else {
log.info("冷量计校验失败===>{}", dataStr);
}
}
/**
* 解析水表返回的数据
*
* @param dataStr1
*/
public String analysisWaterOrder485(final String dataStr1, final CollectionParamsManage deviceCodeParam) {
// 去掉空格
String dataStr = dataStr1.replace(" ", "").toUpperCase();
// 检验报文
String checkStr = dataStr.substring(0, dataStr.length() - 4);
byte[] strOrder = ExchangeStringUtil.hexStrToBinaryStr(checkStr);
int checkNum = CRC16.CRC16_MODBUS(strOrder);
String checkWord = ExchangeStringUtil.decToHex(String.valueOf(checkNum));
checkWord = checkWord.substring(2, 4) + checkWord.substring(0, 2);
if (checkWord.equalsIgnoreCase(dataStr.substring(dataStr.length() - 4))) {
//创建SimpleDateFormat对象,指定样式 2019-05-13 22:39:30
Date date = new Date();
String dateStr = sdf1.format(date);
;
//保留两位小数处理
DecimalFormat decimalFormat = new DecimalFormat("0.00");
// 表号
String cloudId = ExchangeStringUtil.hexToDec(dataStr.substring(0, 2));
// 读数
String data = "";
data = dataStr.substring(dataStr.length() - 8, dataStr.length() - 6)
+ dataStr.substring(dataStr.length() - 6, dataStr.length() - 4)
+ dataStr.substring(dataStr.length() - 12, dataStr.length() - 10)
+ dataStr.substring(dataStr.length() - 10, dataStr.length() - 8);
int dataType = deviceCodeParam.getDataType();
if (ExchangeStringUtil.isInDate(date, "00:00:00", "00:00:30")) {
dateStr = dateStr.substring(0, 17) + "00";
System.out.println("插入时间00" + dateStr);
} else if (ExchangeStringUtil.isInDate(date, "00:00:30", "00:00:59")) {
dateStr = dateStr.substring(0, 17) + "30";
System.out.println("插入时间30" + dateStr);
}
try {
if (dataType == 3) {
data = decimalFormat.format(Math.abs(ExchangeStringUtil.hexToSingle(data)));//十六进制字符串转IEEE754浮点型
log.info("水表==>{},寄存器地址==>{},读数==>{}", cloudId, deviceCodeParam.getRegisterAddr(), data);
} else if (dataType == 2) {
data = dataStr.substring(dataStr.length() - 12, dataStr.length() - 10)
+ dataStr.substring(dataStr.length() - 10, dataStr.length() - 8)
+ dataStr.substring(dataStr.length() - 8, dataStr.length() - 6)
+ dataStr.substring(dataStr.length() - 6, dataStr.length() - 4);
data = ExchangeStringUtil.hexToDec(data);
BigDecimal bigDecimal = new BigDecimal(data);
bigDecimal = bigDecimal.divide(new BigDecimal((int) Math.pow(10, deviceCodeParam.getDigits()))).setScale(2, RoundingMode.HALF_UP); // 除以1000并保留整数
data = bigDecimal.toString();
log.info("水表==>{},寄存器地址==>{},累计读数==>{}", cloudId, deviceCodeParam.getRegisterAddr(), data);
}
return data;
} catch (Exception e) {
log.error("保存水表数据失败!", e);
}
} else {
log.info("水表===>{}", dataStr);
return "";
}
return "";
}
/**
* 解析电表返回的数据
*
* @param dataStr1
*/
public String analysisMeterOrder485(final String dataStr1, final CollectionParamsManage deviceCodeParam) {
// 去掉空格
String dataStr = dataStr1.replace(" ", "").toUpperCase();
// 检验报文
String checkStr = dataStr.substring(0, dataStr.length() - 4);
byte[] strOrder = ExchangeStringUtil.hexStrToBinaryStr(checkStr);
int checkNum = CRC16.CRC16_MODBUS(strOrder);
String checkWord = ExchangeStringUtil.decToHex(String.valueOf(checkNum));
checkWord = checkWord.substring(2, 4) + checkWord.substring(0, 2);
if (checkWord.equalsIgnoreCase(dataStr.substring(dataStr.length() - 4))) {
//创建SimpleDateFormat对象,指定样式 2019-05-13 22:39:30
Date date = new Date();
String dateStr = sdf1.format(date);
;
//保留两位小数处理
DecimalFormat decimalFormat = new DecimalFormat("0.00");
// 表号
String cloudId = ExchangeStringUtil.hexToDec(dataStr.substring(0, 2));
// 读数
String data = "";
data = dataStr.substring(dataStr.length() - 8, dataStr.length() - 6)
+ dataStr.substring(dataStr.length() - 6, dataStr.length() - 4)
+ dataStr.substring(dataStr.length() - 12, dataStr.length() - 10)
+ dataStr.substring(dataStr.length() - 10, dataStr.length() - 8);
int dataType = deviceCodeParam.getDataType();
if (ExchangeStringUtil.isInDate(date, "00:00:00", "00:00:30")) {
dateStr = dateStr.substring(0, 17) + "00";
System.out.println("插入时间00" + dateStr);
} else if (ExchangeStringUtil.isInDate(date, "00:00:30", "00:00:59")) {
dateStr = dateStr.substring(0, 17) + "30";
System.out.println("插入时间30" + dateStr);
}
try {
if (dataType == 3) {
data = decimalFormat.format(Math.abs(ExchangeStringUtil.hexToSingle(data)));//十六进制字符串转IEEE754浮点型
log.info("电表==>{},寄存器地址==>{},读数==>{}", cloudId, deviceCodeParam.getRegisterAddr(), data);
} else if (dataType == 2) {
data = ExchangeStringUtil.hexToDec(data);
log.info("电表==>{},寄存器地址==>{},累计读数==>{}", cloudId, deviceCodeParam.getRegisterAddr(), data);
}
return data;
} catch (Exception e) {
log.error("保存电表数据失败!", e);
}
} else {
log.info("电表===>{}", dataStr);
return "";
}
return "";
}
public static int dValue(String lastDate) throws ParseException {
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date lastTime = format.parse(lastDate);
long min = lastTime.getTime();
Calendar calendar = Calendar.getInstance();
long min1 = calendar.getTimeInMillis();
long subtract = min1 - min;
// System.out.println("相减值: " + subtract/(1000*60));
return (int) subtract / (1000 * 60);
}
// 判断是否存在寄存器地址
public Boolean queryRegisterAddr(List<String> stringList, String registerAddr) {
boolean flag = false;
for (int i = 0; i < stringList.size(); i++) {
if (stringList.get(i).equalsIgnoreCase(registerAddr)) {
flag = true;
break;
}
}
return flag;
}
public String analysisHeatPumpOrder485(String receiveStr, CollectionParamsManage deviceCodeParam) {
// 去掉空格
String dataStr = receiveStr.replace(" ", "").toUpperCase();
// 检验报文
String checkStr = dataStr.substring(0, dataStr.length() - 4);
byte[] strOrder = ExchangeStringUtil.hexStrToBinaryStr(checkStr);
int checkNum = CRC16.CRC16_MODBUS(strOrder);
String checkWord = ExchangeStringUtil.decToHex(String.valueOf(checkNum));
checkWord = checkWord.substring(2, 4) + checkWord.substring(0, 2);
if (checkWord.equalsIgnoreCase(dataStr.substring(dataStr.length() - 4))) {
//创建SimpleDateFormat对象,指定样式 2019-05-13 22:39:30
Date date = new Date();
String dateStr = sdf1.format(date);
//保留两位小数处理
DecimalFormat decimalFormat = new DecimalFormat("0.00");
// 表号
String cloudId = ExchangeStringUtil.hexToDec(dataStr.substring(0, 2));
// 读数
String data = "";
data = dataStr.substring(dataStr.length() - 8, dataStr.length() - 6)
+ dataStr.substring(dataStr.length() - 6, dataStr.length() - 4);
int dataType = deviceCodeParam.getDataType();
if (ExchangeStringUtil.isInDate(date, "00:00:00", "00:00:30")) {
dateStr = dateStr.substring(0, 17) + "00";
System.out.println("插入时间00" + dateStr);
} else if (ExchangeStringUtil.isInDate(date, "00:00:30", "00:00:59")) {
dateStr = dateStr.substring(0, 17) + "30";
System.out.println("插入时间30" + dateStr);
}
try {
if (dataType == 3) {
data = decimalFormat.format(Math.abs(ExchangeStringUtil.hexToSingle(data)));//十六进制字符串转IEEE754浮点型
} else if (dataType == 2 && (deviceCodeParam.getParamType().equals("5")
|| deviceCodeParam.getParamType().equals("2")
|| deviceCodeParam.getParamType().equals("12")
|| deviceCodeParam.getParamType().equals("14")
)) {
data = ExchangeStringUtil.hexToDec(data);
}
log.info("热泵==>{},寄存器地址==>{},读数==>{}", cloudId, deviceCodeParam.getRegisterAddr(), data);
return data;
} catch (Exception e) {
log.error("保存热泵数据失败!", e);
}
} else {
log.info("热泵===>{}", dataStr);
return "";
}
return "";
}
}

253
mh-common/src/main/java/com/mh/common/utils/CRC16.java

@ -0,0 +1,253 @@
package com.mh.common.utils;
/**
* CRC16_CCITT多项式x16+x12+x5+10x1021初始值0x0000低位在前高位在后结果与0x0000异或
* CRC16_CCITT_FALSE多项式x16+x12+x5+10x1021初始值0xFFFF低位在后高位在前结果与0x0000异或
* CRC16_XMODEM多项式x16+x12+x5+10x1021初始值0x0000低位在后高位在前结果与0x0000异或
* CRC16_X25多项式x16+x12+x5+10x1021初始值0xffff低位在前高位在后结果与0xFFFF异或
* CRC16_MODBUS多项式x16+x15+x2+10x8005初始值0xFFFF低位在前高位在后结果与0x0000异或
* CRC16_IBM多项式x16+x15+x2+10x8005初始值0x0000低位在前高位在后结果与0x0000异或
* CRC16_MAXIM多项式x16+x15+x2+10x8005初始值0x0000低位在前高位在后结果与0xFFFF异或
* CRC16_USB多项式x16+x15+x2+10x8005初始值0xFFFF低位在前高位在后结果与0xFFFF异或
* CRC16_DNP多项式x16+x13+x12+x11+x10+x8+x6+x5+x2+10x3D65初始值0x0000低位在前高位在后结果与0xFFFF异或
* <p>
* 1预置1个16位的寄存器为十六进制FFFF即全为1称此寄存器为CRC寄存器
* 2把第一个8位二进制数据既通讯信息帧的第一个字节与16位的CRC寄存器的低8位相异或把结果放于CRC寄存器高八位数据不变
* 3把CRC寄存器的内容右移一位朝低位用0填补最高位并检查右移后的移出位
* 4如果移出位为0重复第3步再次右移一位如果移出位为1CRC寄存器与多项式A0011010 0000 0000 0001进行异或
* 5重复步骤3和4直到右移8次这样整个8位数据全部进行了处理
* 6重复步骤2到步骤5进行通讯信息帧下一个字节的处理
* 7将该通讯信息帧所有字节按上述步骤计算完成后得到的16位CRC寄存器的高低字节进行交换
* 8最后得到的CRC寄存器内容即为CRC码
* <p>
* 以上计算步骤中的多项式0xA001是0x8005按位颠倒后的结果
* 0x8408是0x1021按位颠倒后的结果
* 在线校验工具
* http://www.ip33.com/crc.html
* https://blog.csdn.net/htmlxx/article/details/17369105
* <p>
* Author:Water
* Time:2018/11/19 0019 15:03
*/
public class CRC16 {
/**
* CRC16_CCITT多项式x16+x12+x5+10x1021初始值0x0000低位在前高位在后结果与0x0000异或
* 0x8408是0x1021按位颠倒后的结果
*
* @param buffer
* @return
*/
public static int CRC16_CCITT(byte[] buffer) {
int wCRCin = 0x0000;
int wCPoly = 0x8408;
for (byte b : buffer) {
wCRCin ^= ((int) b & 0x00ff);
for (int j = 0; j < 8; j++) {
if ((wCRCin & 0x0001) != 0) {
wCRCin >>= 1;
wCRCin ^= wCPoly;
} else {
wCRCin >>= 1;
}
}
}
// wCRCin=(wCRCin<<8)|(wCRCin>>8);
// wCRCin &= 0xffff;
return wCRCin ^= 0x0000;
}
/**
* CRC-CCITT (0xFFFF)
* CRC16_CCITT_FALSE多项式x16+x12+x5+10x1021初始值0xFFFF低位在后高位在前结果与0x0000异或
*
* @param buffer
* @return
*/
public static int CRC16_CCITT_FALSE(byte[] buffer) {
int wCRCin = 0xffff;
int wCPoly = 0x1021;
for (byte b : buffer) {
for (int i = 0; i < 8; i++) {
boolean bit = ((b >> (7 - i) & 1) == 1);
boolean c15 = ((wCRCin >> 15 & 1) == 1);
wCRCin <<= 1;
if (c15 ^ bit)
wCRCin ^= wCPoly;
}
}
wCRCin &= 0xffff;
return wCRCin ^= 0x0000;
}
/**
* CRC-CCITT (XModem)
* CRC16_XMODEM多项式x16+x12+x5+10x1021初始值0x0000低位在后高位在前结果与0x0000异或
*
* @param buffer
* @return
*/
public static int CRC16_XMODEM(byte[] buffer) {
int wCRCin = 0x0000; // initial value 65535
int wCPoly = 0x1021; // 0001 0000 0010 0001 (0, 5, 12)
for (byte b : buffer) {
for (int i = 0; i < 8; i++) {
boolean bit = ((b >> (7 - i) & 1) == 1);
boolean c15 = ((wCRCin >> 15 & 1) == 1);
wCRCin <<= 1;
if (c15 ^ bit)
wCRCin ^= wCPoly;
}
}
wCRCin &= 0xffff;
return wCRCin ^= 0x0000;
}
/**
* CRC16_X25多项式x16+x12+x5+10x1021初始值0xffff低位在前高位在后结果与0xFFFF异或
* 0x8408是0x1021按位颠倒后的结果
*
* @param buffer
* @return
*/
public static int CRC16_X25(byte[] buffer) {
int wCRCin = 0xffff;
int wCPoly = 0x8408;
for (byte b : buffer) {
wCRCin ^= ((int) b & 0x00ff);
for (int j = 0; j < 8; j++) {
if ((wCRCin & 0x0001) != 0) {
wCRCin >>= 1;
wCRCin ^= wCPoly;
} else {
wCRCin >>= 1;
}
}
}
return wCRCin ^= 0xffff;
}
/**
* CRC-16 (Modbus)
* CRC16_MODBUS多项式x16+x15+x2+10x8005初始值0xFFFF低位在前高位在后结果与0x0000异或
* 0xA001是0x8005按位颠倒后的结果
*
* @param buffer
* @return
*/
public static int CRC16_MODBUS(byte[] buffer) {
int wCRCin = 0xffff;
int POLYNOMIAL = 0xa001;
for (byte b : buffer) {
wCRCin ^= ((int) b & 0x00ff);
for (int j = 0; j < 8; j++) {
if ((wCRCin & 0x0001) != 0) {
wCRCin >>= 1;
wCRCin ^= POLYNOMIAL;
} else {
wCRCin >>= 1;
}
}
}
return wCRCin ^= 0x0000;
}
/**
* CRC-16
* CRC16_IBM多项式x16+x15+x2+10x8005初始值0x0000低位在前高位在后结果与0x0000异或
* 0xA001是0x8005按位颠倒后的结果
*
* @param buffer
* @return
*/
public static int CRC16_IBM(byte[] buffer) {
int wCRCin = 0x0000;
int wCPoly = 0xa001;
for (byte b : buffer) {
wCRCin ^= ((int) b & 0x00ff);
for (int j = 0; j < 8; j++) {
if ((wCRCin & 0x0001) != 0) {
wCRCin >>= 1;
wCRCin ^= wCPoly;
} else {
wCRCin >>= 1;
}
}
}
return wCRCin ^= 0x0000;
}
/**
* CRC16_MAXIM多项式x16+x15+x2+10x8005初始值0x0000低位在前高位在后结果与0xFFFF异或
* 0xA001是0x8005按位颠倒后的结果
*
* @param buffer
* @return
*/
public static int CRC16_MAXIM(byte[] buffer) {
int wCRCin = 0x0000;
int wCPoly = 0xa001;
for (byte b : buffer) {
wCRCin ^= ((int) b & 0x00ff);
for (int j = 0; j < 8; j++) {
if ((wCRCin & 0x0001) != 0) {
wCRCin >>= 1;
wCRCin ^= wCPoly;
} else {
wCRCin >>= 1;
}
}
}
return wCRCin ^= 0xffff;
}
/**
* CRC16_USB多项式x16+x15+x2+10x8005初始值0xFFFF低位在前高位在后结果与0xFFFF异或
* 0xA001是0x8005按位颠倒后的结果
*
* @param buffer
* @return
*/
public static int CRC16_USB(byte[] buffer) {
int wCRCin = 0xFFFF;
int wCPoly = 0xa001;
for (byte b : buffer) {
wCRCin ^= ((int) b & 0x00ff);
for (int j = 0; j < 8; j++) {
if ((wCRCin & 0x0001) != 0) {
wCRCin >>= 1;
wCRCin ^= wCPoly;
} else {
wCRCin >>= 1;
}
}
}
return wCRCin ^= 0xffff;
}
/**
* CRC16_DNP多项式x16+x13+x12+x11+x10+x8+x6+x5+x2+10x3D65初始值0x0000低位在前高位在后结果与0xFFFF异或
* 0xA6BC是0x3D65按位颠倒后的结果
*
* @param buffer
* @return
*/
public static int CRC16_DNP(byte[] buffer) {
int wCRCin = 0x0000;
int wCPoly = 0xA6BC;
for (byte b : buffer) {
wCRCin ^= ((int) b & 0x00ff);
for (int j = 0; j < 8; j++) {
if ((wCRCin & 0x0001) != 0) {
wCRCin >>= 1;
wCRCin ^= wCPoly;
} else {
wCRCin >>= 1;
}
}
}
return wCRCin ^= 0xffff;
}
}

1350
mh-common/src/main/java/com/mh/common/utils/ExchangeStringUtil.java

File diff suppressed because it is too large Load Diff

53
mh-common/src/main/java/com/mh/common/utils/SendOrderUtils.java

@ -0,0 +1,53 @@
package com.mh.common.utils;
import com.mh.common.core.domain.entity.CollectionParamsManage;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
/**
* @author ljf
* @title
* @description 发送指令工具类
* @updateTime 2021-01-26
* @throws
*/
@Slf4j
public class SendOrderUtils {
// 发送所有类型采集报文
public static void sendAllOrder(CollectionParamsManage paramsManage, ChannelHandlerContext ctx, int num, int size) {
// 开始创建指令
String mtCode = paramsManage.getMtCode(); // 采集编号
String funCode = paramsManage.getFuncCode(); // 功能码
String registerAddr = paramsManage.getRegisterAddr(); // 寄存器地址
String registerNum = String.valueOf(paramsManage.getRegisterSize()); // 寄存器数量
// 拼接指令
String sendOrderStr = ExchangeStringUtil.addZeroForNum(ExchangeStringUtil.decToHex(mtCode), 2)
+ ExchangeStringUtil.addZeroForNum(funCode, 2)
+ ExchangeStringUtil.addZeroForNum(registerAddr, 4)
+ ExchangeStringUtil.addZeroForNum(registerNum, 4);
byte[] strOrder = ExchangeStringUtil.hexStrToBinaryStr(sendOrderStr);
int checkNum = CRC16.CRC16_MODBUS(strOrder);
String checkWord = ExchangeStringUtil.decToHex(String.valueOf(checkNum));
checkWord = checkWord.substring(2, 4) + checkWord.substring(0, 2);
sendOrderStr = sendOrderStr + checkWord;
ByteBuf buffer = getByteBuf(ctx, sendOrderStr);
// 发送数据
ctx.channel().writeAndFlush(buffer);
log.info("sends :" + sendOrderStr + ",num:" + num + ",records:" + size);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
log.error("线程休眠异常", e);
}
}
private static ByteBuf getByteBuf(ChannelHandlerContext ctx, String sendStr) {
// 申请一个数据结构存储信息
ByteBuf buffer = ctx.alloc().buffer();
// 将信息放入数据结构中
buffer.writeBytes(ExchangeStringUtil.hexStrToBinaryStr(sendStr));//对接需要16进制
return buffer;
}
}

2
mh-framework/src/main/java/com/mh/framework/dealdata/DataProcessService.java

@ -37,7 +37,7 @@ public interface DataProcessService {
* @param deviceNum * @param deviceNum
* @return * @return
*/ */
String queryInitValue(String deviceNum); String queryInitValue(String deviceNum, String mtCode);
/** /**
* 查询上一次采集数据时间等参数 * 查询上一次采集数据时间等参数

17
mh-framework/src/main/java/com/mh/framework/dealdata/impl/DataProcessServiceImpl.java

@ -151,6 +151,11 @@ public class DataProcessServiceImpl implements DataProcessService {
} }
String timeString = data.getTs(); String timeString = data.getTs();
String formattedTime = "";
// 判断是否存在TimeZone
if (!timeString.contains("T")) {
formattedTime = timeString;
} else {
OffsetDateTime utcDateTime; OffsetDateTime utcDateTime;
try { try {
// 尝试多种常见的时间格式 // 尝试多种常见的时间格式
@ -173,9 +178,11 @@ public class DataProcessServiceImpl implements DataProcessService {
); );
// 3. 格式化为目标字符串 // 3. 格式化为目标字符串
String formattedTime = chinaDateTime.format( formattedTime = chinaDateTime.format(
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss") DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")
); );
}
String dString = data.getD().toString(); String dString = data.getD().toString();
// 替换掉inf // 替换掉inf
if (dString.contains("inf")) { if (dString.contains("inf")) {
@ -227,8 +234,12 @@ public class DataProcessServiceImpl implements DataProcessService {
} }
@Override @Override
public String queryInitValue(String deviceNum) { public String queryInitValue(String deviceNum, String mtCode) {
return dataProcessMapper.queryInitValue(deviceNum); if (StringUtils.isEmpty(mtCode)) {
return dataProcessMapper.queryInitValue1(deviceNum);
} else {
return dataProcessMapper.queryInitValue(deviceNum, mtCode);
}
} }
@Override @Override

57
mh-framework/src/main/java/com/mh/framework/netty/EchoServer.java

@ -0,0 +1,57 @@
package com.mh.framework.netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class EchoServer {
private final int port;
public EchoServer(int port) {
this.port = port;
}
public void start() {
// 创建EventLoopGroup
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.localAddress(port)
.option(ChannelOption.SO_BACKLOG, 1204)
.childHandler(new ServerChannelInitializer());
// 异步绑定端口
ChannelFuture channelFuture = serverBootstrap.bind();
// 添加监听器处理绑定结果
channelFuture.addListener(future -> {
if (future.isSuccess()) {
log.info("服务器启动成功,开始监听端口: {}", port);
} else {
log.error("服务器启动失败,端口: {}", port, future.cause());
bossGroup.shutdownGracefully(); // 绑定失败立即关闭资源
workerGroup.shutdownGracefully();
}
});
// ❌ 移除 sync() 阻塞调用
// channelFuture.channel().closeFuture().sync(); --> 删除这行
// 可选:添加 JVM 关闭钩子优雅关闭资源
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
log.info("JVM 正在关闭,准备释放 Netty 资源...");
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
log.info("Netty 资源已释放");
}));
}
}

264
mh-framework/src/main/java/com/mh/framework/netty/EchoServerHandler.java

@ -0,0 +1,264 @@
package com.mh.framework.netty;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.mh.common.constant.Constants;
import com.mh.common.core.domain.entity.CollectionParamsManage;
import com.mh.common.core.domain.entity.SysDictData;
import com.mh.common.core.redis.RedisCache;
import com.mh.common.model.request.AdvantechDatas;
import com.mh.common.model.request.AdvantechReceiver;
import com.mh.common.utils.*;
import com.mh.common.utils.spring.SpringUtils;
import com.mh.framework.rabbitmq.producer.SendMsgByTopic;
import com.mh.system.service.device.ICollectionParamsManageService;
import com.mh.system.service.device.IGatewayManageService;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.ReferenceCountUtil;
import lombok.extern.slf4j.Slf4j;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
@Slf4j
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
// 调用service层的接口信息
IGatewayManageService gatewayManageService = SpringUtils.getBean(IGatewayManageService.class);
ICollectionParamsManageService collectionParamsManageService = SpringUtils.getBean(ICollectionParamsManageService.class);
SendMsgByTopic sendMsgByTopic = SpringUtils.getBean(SendMsgByTopic.class);
/**
* 空闲次数
*/
private int idleCount = 1;
private int count = 0;
private List<String> orderList;
private int num = 0;
private int size = 0;
private String IP;
private String port;
private String receiveStr = "";
private List<CollectionParamsManage> deviceCodeParamList;
/**
* 客户端连接会触发
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("Channel active......");
}
/**
* 超时处理
* 如果120秒没有接受客户端的心跳就触发;
* 如果超过3次则直接关闭;
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object obj) throws Exception {
if (obj instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) obj;
if (IdleState.READER_IDLE.equals(event.state())) { //如果读通道处于空闲状态,说明没有接收到心跳命令
log.info("第{}已经40秒没有接收到客户端的信息了", idleCount);
receiveStr = "";
num = num + 1;
if (num > size - 1) {
num = 0;
// // 关闭连接
// ctx.close();
// 继续发送下一个采集指令
SendOrderUtils.sendAllOrder(deviceCodeParamList.get(num),ctx,num,size);
} else {
// 继续发送下一个采集指令
SendOrderUtils.sendAllOrder(deviceCodeParamList.get(num), ctx, num, size);
}
}
} else {
super.userEventTriggered(ctx, obj);
}
}
// 对于每一个传入的消息都要被调用
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
//接收到服务端发来的数据进行业务处理
ByteBuf buf = (ByteBuf) msg;
byte[] bytes = new byte[buf.readableBytes()];
buf.readBytes(bytes);//复制内容到字节数组bytes
buf.clear();
// 截取IP地址
IP = ExchangeStringUtil.getMidString(ctx.channel().remoteAddress() + "", "/", ":");
// 截取端口号
port = ExchangeStringUtil.getMidString(ctx.channel().remoteAddress() + "", ":", "");
if (bytes.length <= 1024) {
//将接收到的数据转为字符串,此字符串就是客户端发送的字符串
receiveStr = receiveStr + ExchangeStringUtil.bytesToHexString(bytes);//将接收到的数据转为字符串,此字符串就是客户端发送的字符串
receiveStr = receiveStr.replace("null", ""); //去null
receiveStr = receiveStr.replace(" ", ""); //去空格
//log.info("channelRead接收到的数据:" + receiveStr + ",length:" + receiveStr.length());
}
} catch (Exception e) {
log.error("channelRead异常", e);
} finally {
ReferenceCountUtil.release(msg);
}
}
// 当前批量读取中的最后一条消息
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//心跳包报文: 24 00 60 95
receiveStr = receiveStr.toUpperCase();//返回值全部变成大写
log.info("channelReadComplete接收到的数据{}, 长度: ===> {}", receiveStr, receiveStr.length());
//心跳包处理
if ((receiveStr.length() == 8) && receiveStr.startsWith("24")) {
// if ((receiveStr.length() == 8) && receiveStr.startsWith("C0A801FE")) {
log.info("接收到心跳包 ===> {}", receiveStr);
idleCount = 1;
port = receiveStr.substring(4, 8);//心跳包包含网关端口(自己定义返回心跳包)
// 更新对应的网关在线情况
gatewayManageService.updateGatewayStatus(receiveStr);
//根据端口或者IP或者心跳包查询网关对应的项目名称
// 生成采集指令
if (!SpringUtils.getBean(RedisCache.class).hasKey(receiveStr)) {
collectionParamsManageService.createDtuCollectionParams();
}
JSONArray arrayCache = SpringUtils.getBean(RedisCache.class).getCacheObject(receiveStr);
if (StringUtils.isNotNull(arrayCache)) {
deviceCodeParamList = arrayCache.toList(CollectionParamsManage.class);
}
size = deviceCodeParamList.size();
log.info("deviceCodeParam size ===> {}", size);
// 清空receiveStr
receiveStr = "";
num = 0;
// 发送采集报文
if (size > 0) {
if (idleCount < 2) {
Thread.sleep(200);
SendOrderUtils.sendAllOrder(deviceCodeParamList.get(num), ctx, num, size);
idleCount++;
} else {
ctx.channel().close();
}
} else {
log.info("gateway not find deviceCodeParam!");
}
} else if (receiveStr.length() == 34 || receiveStr.length() == 36 || receiveStr.length() == 40 || receiveStr.length() == 44 || receiveStr.length() == 50) {
//电表返回数据解析
idleCount = 1;
log.info("电表接收===> {},长度:{}", receiveStr, receiveStr.length());
//解析采集的报文,并保存到数据库
nextSendOrder(ctx);
} else if (receiveStr.length() == 18) {
//冷量计返回数据解析
idleCount = 1;
log.info("水电表、热泵设置接收==>{},长度:{}", receiveStr, receiveStr.length());
nextSendOrder(ctx);
} else if (receiveStr.length() == 12 || receiveStr.length() == 14) {
//冷水机返回数据解析
idleCount = 1;
log.info("热泵读取接收===>{},长度:{}", receiveStr, receiveStr.length());
nextSendOrder(ctx);
} else if (receiveStr.length() > 50 && receiveStr.length() < 100) {
idleCount = 1;
// 清空receiveStr
nextSendOrder(ctx);
}
ctx.flush();
}
private void nextSendOrder(ChannelHandlerContext ctx) throws InterruptedException {
// 解析采集的报文,并保存到数据库
analysisReceiveData(receiveStr, deviceCodeParamList.get(num));
// 清空receiveStr
receiveStr = "";
// 判断发送的下标,如果不等于指令数组大小
num = num + 1;
if (num > size - 1) {
num = 0;
Thread.sleep(1000);
// 继续发送下一个采集指令
SendOrderUtils.sendAllOrder(deviceCodeParamList.get(num), ctx, num, size);
log.info("------一轮采集完成,继续下一轮--------");
} else {
// 添加一个状态值,判断是否继续发送指令 update by ljf on 2020-08-07
if (Constants.WEB_FLAG) {
num = 0;
// 关闭连接
receiveStr = null;
ctx.close();
} else {
Thread.sleep(1000);
// 继续发送下一个采集指令
SendOrderUtils.sendAllOrder(deviceCodeParamList.get(num), ctx, num, size);
}
}
}
private void analysisReceiveData(final String receiveStr, final CollectionParamsManage deviceCodeParamEntity) {
AnalysisReceiveOrder485 analysisReceiveOrder485 = new AnalysisReceiveOrder485();
String analysisData = "";
switch (deviceCodeParamEntity.getParamType()) {
case "16" ->
// 电表
analysisData = analysisReceiveOrder485.analysisMeterOrder485(receiveStr, deviceCodeParamEntity);
case "18" ->
// 水表
analysisData = analysisReceiveOrder485.analysisWaterOrder485(receiveStr, deviceCodeParamEntity);
case "5" ->
// 热泵故障报警
analysisData = analysisReceiveOrder485.analysisHeatPumpOrder485(receiveStr, deviceCodeParamEntity);
case "2" ->
// 热泵启停控制
analysisData = analysisReceiveOrder485.analysisHeatPumpOrder485(receiveStr, deviceCodeParamEntity);
case "12" ->
// 热泵实际温度
analysisData = analysisReceiveOrder485.analysisHeatPumpOrder485(receiveStr, deviceCodeParamEntity);
case "14" ->
// 热泵读取温度设置
analysisData = analysisReceiveOrder485.analysisHeatPumpOrder485(receiveStr, deviceCodeParamEntity);
default -> {
log.info("设备类型错误");
return;
}
}
if (analysisData.isEmpty()) {
log.info("解析数据为空");
return;
}
// 格式化数据,配置成研华网关 AdvantechReceiver
AdvantechReceiver advantechReceiver = new AdvantechReceiver();
advantechReceiver.setTs(DateUtils.dateToString(new Date(), Constants.DATE_FORMAT));
List<AdvantechDatas> advantechDatas = new ArrayList<>();
AdvantechDatas datas = new AdvantechDatas();
datas.setValue(new BigDecimal(analysisData));
datas.setTag(deviceCodeParamEntity.getOtherName());
advantechDatas.add(datas);
advantechReceiver.setD(advantechDatas);
sendMsgByTopic.sendToDeviceMQ(JSONObject.toJSONString(advantechReceiver));
}
// 异常捕捉
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.getCause().printStackTrace();
log.info("异常捕捉,执行ctx.close" + cause.getCause());
ctx.close(); // 关闭该Channel
}
// 客户端断开
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
ctx.close();// 关闭流
log.info("客户端断开,执行ctx.close()......");
}
}

33
mh-framework/src/main/java/com/mh/framework/netty/ServerChannelInitializer.java

@ -0,0 +1,33 @@
package com.mh.framework.netty;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import java.util.concurrent.TimeUnit;
public class ServerChannelInitializer extends ChannelInitializer<SocketChannel>{
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
/* LineBasedFrameDecoder的工作原理是依次遍历ByteBuf中的可读字节
判断看其是否有\n \r\n 如果有就以此位置为结束位置
从可读索引到结束位置的区间的字节就组成了一行 它是以换行符为结束标志的解码器
支持携带结束符和不带结束符两种解码方式同时支持配置单行的最大长度
如果读到了最大长度之后仍然没有发现换行符则抛出异常同时忽略掉之前读到的异常码流*/
// pipeline.addLast(new LineBasedFrameDecoder(10010));
//字符串解码和编码
//LineBasedFrameDecoder + StringDecoder 就是一个按行切换的文本解码器。
// pipeline.addLast( new StringDecoder());
// pipeline.addLast( new StringEncoder());
// 设置读写超时操作
// 入参说明: 读超时时间、写超时时间、所有类型的超时时间、时间格式
pipeline.addLast(new IdleStateHandler(40, 40, 40, TimeUnit.SECONDS));
//服务器的逻辑
pipeline.addLast("handler", new EchoServerHandler());
}
}

12
mh-quartz/src/main/java/com/mh/quartz/task/AHUTask.java

@ -1,15 +1,12 @@
package com.mh.quartz.task; package com.mh.quartz.task;
import com.mh.common.config.MHConfig; import com.mh.common.config.MHConfig;
import com.mh.common.core.domain.AjaxResult;
import com.mh.common.core.domain.dto.DeviceMonitorDTO;
import com.mh.common.core.domain.entity.CollectionParamsManage; import com.mh.common.core.domain.entity.CollectionParamsManage;
import com.mh.common.core.domain.entity.CpmSpaceRelation; import com.mh.common.core.domain.entity.CpmSpaceRelation;
import com.mh.common.core.domain.entity.OrderEntity; import com.mh.common.core.domain.entity.OrderEntity;
import com.mh.common.core.domain.entity.PolicyManage; import com.mh.common.core.domain.entity.PolicyManage;
import com.mh.common.utils.DateUtils; import com.mh.common.utils.DateUtils;
import com.mh.framework.mqtt.service.IMqttGatewayService; import com.mh.framework.mqtt.service.IMqttGatewayService;
import com.mh.quartz.domain.PIDParams;
import com.mh.quartz.util.AHUPIDControlUtil; import com.mh.quartz.util.AHUPIDControlUtil;
import com.mh.system.service.device.ICollectionParamsManageService; import com.mh.system.service.device.ICollectionParamsManageService;
import com.mh.system.service.operation.IOperationDeviceService; import com.mh.system.service.operation.IOperationDeviceService;
@ -52,6 +49,8 @@ public class AHUTask {
private final IMqttGatewayService iMqttGatewayService; private final IMqttGatewayService iMqttGatewayService;
// 在 AHUTask 类中添加一个 PID 控制器成员变量
private final Map<String, AHUPIDControlUtil> pidControllers = new HashMap<>();
@Autowired @Autowired
public AHUTask(ICollectionParamsManageService collectionParamsManageService, IPolicyManageService policyManageService, ICpmSpaceRelationService cpmSpaceRelationService, IOperationDeviceService iOperationService, IMqttGatewayService iMqttGatewayService) { public AHUTask(ICollectionParamsManageService collectionParamsManageService, IPolicyManageService policyManageService, ICpmSpaceRelationService cpmSpaceRelationService, IOperationDeviceService iOperationService, IMqttGatewayService iMqttGatewayService) {
@ -105,9 +104,10 @@ public class AHUTask {
double backTempSet = second.get().getCurValue().doubleValue(); double backTempSet = second.get().getCurValue().doubleValue();
// 设定目标温度(夏季制冷24℃) // 设定目标温度(夏季制冷24℃)
AHUPIDControlUtil controller = new AHUPIDControlUtil(); // ✅ 如果没有该设备的控制器,则创建一个新的并保存起来
AHUPIDControlUtil controller = pidControllers.computeIfAbsent(deviceLedgerId, k -> new AHUPIDControlUtil());
System.out.println("开始模糊PID控制循环:"); log.info("开始模糊PID控制循环,查看对象是否有变化:{}", controller);
// 过滤获取当前回风温度 // 过滤获取当前回风温度
Optional<CollectionParamsManage> third = collectionParamsManages Optional<CollectionParamsManage> third = collectionParamsManages
@ -121,7 +121,7 @@ public class AHUTask {
double temp = third.get().getCurValue().doubleValue(); double temp = third.get().getCurValue().doubleValue();
// 2. 计算水阀开度(时间间隔1秒) // 2. 计算水阀开度(时间间隔1秒)
double valveOpening1 = controller.compute(backTempSet, temp, 60.0); double valveOpening1 = controller.compute(backTempSet, temp, 5);
int valveOpening = new BigDecimal(valveOpening1).intValue(); int valveOpening = new BigDecimal(valveOpening1).intValue();
// 过滤获取水阀调节参数 // 过滤获取水阀调节参数

17
mh-quartz/src/main/java/com/mh/quartz/task/DealDataTask.java

@ -6,6 +6,7 @@ import com.mh.common.core.domain.entity.DeviceReport;
import com.mh.common.core.redis.RedisCache; import com.mh.common.core.redis.RedisCache;
import com.mh.common.enums.ComputeEnum; import com.mh.common.enums.ComputeEnum;
import com.mh.common.utils.DateUtils; import com.mh.common.utils.DateUtils;
import com.mh.common.utils.StringUtils;
import com.mh.framework.dealdata.DataProcessService; import com.mh.framework.dealdata.DataProcessService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -114,7 +115,10 @@ public class DealDataTask {
int ratio = entity.getMtRatio(); int ratio = entity.getMtRatio();
if (ObjectUtils.isEmpty(lastData) || ObjectUtils.isEmpty(lastData.getLastValue())) { if (ObjectUtils.isEmpty(lastData) || ObjectUtils.isEmpty(lastData.getLastValue())) {
//从device_manage取出初始值 //从device_manage取出初始值
String initValue = dataProcessService.queryInitValue(entity.getMtNum()); String initValue = dataProcessService.queryInitValue(entity.getMtNum(), entity.getMtCode());
if (StringUtils.isEmpty(initValue)) {
initValue = "0";
}
DeviceReport firstEntity = new DeviceReport(); DeviceReport firstEntity = new DeviceReport();
firstEntity.setLastValue(initValue); firstEntity.setLastValue(initValue);
firstEntity.setLastTime(entity.getCurTime()); firstEntity.setLastTime(entity.getCurTime());
@ -126,7 +130,12 @@ public class DealDataTask {
firstEntity.setRegisterAddr(entity.getRegisterAddr()); firstEntity.setRegisterAddr(entity.getRegisterAddr());
firstEntity.setDeviceType(deviceType); firstEntity.setDeviceType(deviceType);
firstEntity.setRatio(ratio); firstEntity.setRatio(ratio);
double usedValue = entity.getCurValue().doubleValue() - Double.parseDouble(initValue); double usedValue = 0;
try {
usedValue = entity.getCurValue().doubleValue() - Double.parseDouble(initValue);
} catch (NumberFormatException e) {
log.error("数值格式解析异常:{}", e);
}
firstEntity.setUsedValue(String.valueOf(usedValue)); firstEntity.setUsedValue(String.valueOf(usedValue));
//区分瞬时值 //区分瞬时值
if ((deviceGrade >= 100 && deviceGrade < 200) || (deviceGrade >= 1200 && deviceGrade < 1300) ) { if ((deviceGrade >= 100 && deviceGrade < 200) || (deviceGrade >= 1200 && deviceGrade < 1300) ) {
@ -315,7 +324,7 @@ public class DealDataTask {
DeviceReport hourEntity = dataProcessService.queryLastValue(key, tableType); DeviceReport hourEntity = dataProcessService.queryLastValue(key, tableType);
if (ObjectUtils.isEmpty(hourEntity)) { if (ObjectUtils.isEmpty(hourEntity)) {
//查询设备信息初始值 //查询设备信息初始值
lastValue = dataProcessService.queryInitValue(key); lastValue = dataProcessService.queryInitValue(key, null);
} else { } else {
lastValue = hourEntity.getLastValue(); lastValue = hourEntity.getLastValue();
lastDate = hourEntity.getLastTime(); lastDate = hourEntity.getLastTime();
@ -493,7 +502,7 @@ public class DealDataTask {
DeviceReport entity = deviceList.get(i); DeviceReport entity = deviceList.get(i);
DeviceReport lastEntity = dataProcessService.queryLastValue(deviceNum, "month"); DeviceReport lastEntity = dataProcessService.queryLastValue(deviceNum, "month");
if (ObjectUtils.isEmpty(lastEntity)) { if (ObjectUtils.isEmpty(lastEntity)) {
lastValue = dataProcessService.queryInitValue(deviceNum); lastValue = dataProcessService.queryInitValue(deviceNum, null);
lastTime = entity.getCurTime(); lastTime = entity.getCurTime();
} else { } else {
lastValue = lastEntity.getLastValue(); lastValue = lastEntity.getLastValue();

6
mh-quartz/src/main/java/com/mh/quartz/util/AHUPIDControlUtil.java

@ -15,6 +15,8 @@ public class AHUPIDControlUtil {
private double kp = 2.0, ki = 0.1, kd = 0.0; private double kp = 2.0, ki = 0.1, kd = 0.0;
private double integral = 0; private double integral = 0;
private double previousError = 0; private double previousError = 0;
private static final double MAX_INTEGRAL = 100;
private static final double MIN_INTEGRAL = -100;
// 模糊增益修正比例 // 模糊增益修正比例
private double deltaKpScale = 0.5; private double deltaKpScale = 0.5;
@ -44,9 +46,11 @@ public class AHUPIDControlUtil {
kd = Math.max(0, Math.min(kd, 1)); kd = Math.max(0, Math.min(kd, 1));
// PID 计算 // PID 计算
// 积分项限幅
integral += error * deltaTime; integral += error * deltaTime;
integral = Math.max(MIN_INTEGRAL, Math.min(MAX_INTEGRAL, integral));
double output = kp * error + ki * integral + kd * dError; double output = kp * error + ki * integral + kd * dError;
// System.out.println("计算输出值:" + output + ",误差:" + error + ",误差变化:" + dError); System.out.println("计算输出值:" + output + ",误差:" + error + ",误差变化:" + dError);
previousError = error; previousError = error;
// 输出冷冻水阀开度,限制在0~100% // 输出冷冻水阀开度,限制在0~100%

5
mh-system/src/main/java/com/mh/system/mapper/device/CollectionParamsManageMapper.java

@ -339,4 +339,9 @@ public interface CollectionParamsManageMapper extends BaseMapper<CollectionParam
" and device_ledger_id is not null " + " and device_ledger_id is not null " +
" group by device_ledger_id;") " group by device_ledger_id;")
List<String> OffLine(); List<String> OffLine();
@Select("select * from collection_params_manage cpm " +
" where cpm.system_type = '1' and is_use = 0 " +
" and (param_type= '16' or param_type='18' or param_type = '5' or param_type = '2' or param_type = '12' or param_type = '14')")
List<CollectionParamsManage> createOrderList();
} }

14
mh-system/src/main/java/com/mh/system/mapper/device/DataProcessMapper.java

@ -170,8 +170,18 @@ public interface DataProcessMapper {
* @param deviceNum * @param deviceNum
* @return * @return
*/ */
@Select("select mt_init_value from collection_params_manage where mt_num = #{deviceNum} and is_use = '0'") @Select("select mt_init_value from collection_params_manage where mt_num = #{deviceNum} " +
String queryInitValue(@Param("deviceNum") String deviceNum); " and mt_code = #{mtCode} and is_use = '0' limit 1")
String queryInitValue(@Param("deviceNum") String deviceNum, @Param("mtCode") String mtCode);
/**
* 查询初始数据
*
* @param deviceNum
* @return
*/
@Select("select mt_init_value from collection_params_manage where mt_num = #{deviceNum} and is_use = '0' limit 1")
String queryInitValue1(@Param("deviceNum") String deviceNum);
@Select("select * from ${tableName} where register_id = #{registerId} and cur_time >= CURRENT_TIMESTAMP - INTERVAL '24 hours' ") @Select("select * from ${tableName} where register_id = #{registerId} and cur_time >= CURRENT_TIMESTAMP - INTERVAL '24 hours' ")
List<ChillersEntity> selectLineDataByCodeAndRegisterAddr(@Param("tableName") String tableName, List<ChillersEntity> selectLineDataByCodeAndRegisterAddr(@Param("tableName") String tableName,

2
mh-system/src/main/java/com/mh/system/mapper/device/GatewayManageMapper.java

@ -19,4 +19,6 @@ public interface GatewayManageMapper extends BaseMapper<GatewayManage> {
@Update("update gateway_manage set status = 0, connect_time = CURRENT_TIMESTAMP where id = #{gatewayId}") @Update("update gateway_manage set status = 0, connect_time = CURRENT_TIMESTAMP where id = #{gatewayId}")
void updateOnlineStatus(@Param("gatewayId") String gatewayId); void updateOnlineStatus(@Param("gatewayId") String gatewayId);
@Update("update gateway_manage set status = 0, connect_time = CURRENT_TIMESTAMP where heart_beat = #{heartBeat}")
void updateOnlineStatusByHeartBeat(@Param("heartBeat") String heartBeat);
} }

2
mh-system/src/main/java/com/mh/system/service/device/ICollectionParamsManageService.java

@ -79,4 +79,6 @@ public interface ICollectionParamsManageService {
List<DeviceMonitorDTO> selectMonitorListBySystemTypeAndHouseId(String systemType, String houseId); List<DeviceMonitorDTO> selectMonitorListBySystemTypeAndHouseId(String systemType, String houseId);
List<CollectionParamsManage> selectListByParams(HashMap<String, Object> queryMap); List<CollectionParamsManage> selectListByParams(HashMap<String, Object> queryMap);
void createDtuCollectionParams();
} }

2
mh-system/src/main/java/com/mh/system/service/device/IGatewayManageService.java

@ -22,4 +22,6 @@ public interface IGatewayManageService {
int updateGateway(GatewayManage gatewayManage); int updateGateway(GatewayManage gatewayManage);
int deleteGatewayByIds(String[] gatewayIds); int deleteGatewayByIds(String[] gatewayIds);
void updateGatewayStatus(String heartBeat);
} }

53
mh-system/src/main/java/com/mh/system/service/device/impl/CollectionParamsManageServiceImpl.java

@ -1,5 +1,6 @@
package com.mh.system.service.device.impl; package com.mh.system.service.device.impl;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.mh.common.core.domain.ColumnFilter; import com.mh.common.core.domain.ColumnFilter;
import com.mh.common.core.domain.dto.DeviceMonitorDTO; import com.mh.common.core.domain.dto.DeviceMonitorDTO;
@ -9,8 +10,12 @@ import com.mh.common.core.domain.dto.PumpInfoDTO;
import com.mh.common.core.domain.entity.ChillersEntity; import com.mh.common.core.domain.entity.ChillersEntity;
import com.mh.common.core.domain.entity.CollectionParamsManage; import com.mh.common.core.domain.entity.CollectionParamsManage;
import com.mh.common.core.domain.entity.DeviceLedger; import com.mh.common.core.domain.entity.DeviceLedger;
import com.mh.common.core.domain.entity.GatewayManage;
import com.mh.common.core.domain.vo.*; import com.mh.common.core.domain.vo.*;
import com.mh.common.core.redis.RedisCache;
import com.mh.common.utils.CRC16;
import com.mh.common.utils.DateUtils; import com.mh.common.utils.DateUtils;
import com.mh.common.utils.ExchangeStringUtil;
import com.mh.common.utils.StringUtils; import com.mh.common.utils.StringUtils;
import com.mh.system.mapper.device.CollectionParamsManageMapper; import com.mh.system.mapper.device.CollectionParamsManageMapper;
import com.mh.system.mapper.device.DataProcessMapper; import com.mh.system.mapper.device.DataProcessMapper;
@ -27,6 +32,7 @@ import java.time.LocalDate;
import java.time.ZoneId; import java.time.ZoneId;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
import java.util.*; import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
@ -52,6 +58,53 @@ public class CollectionParamsManageServiceImpl implements ICollectionParamsManag
@Resource @Resource
private DataProcessMapper dataProcessMapper; private DataProcessMapper dataProcessMapper;
@Resource
private RedisCache redisCache;
@Override
public void createDtuCollectionParams() {
List<CollectionParamsManage> paramsManages = collectionParamsManageMapper.createOrderList();
// 根据网关类型分组
Map<String, List<CollectionParamsManage>> map = paramsManages.stream().collect(Collectors.groupingBy(CollectionParamsManage::getGatewayId));
// 查询全部的网关
List<GatewayManage> gatewayManages = gatewayManageMapper.selectList(new QueryWrapper<GatewayManage>());
if (gatewayManages.isEmpty()) {
return;
}
for (GatewayManage gatewayManage : gatewayManages) {
String gatewayId = gatewayManage.getId();
String heartBeat = gatewayManage.getHeartBeat();
if (map.containsKey(gatewayId)) {
List<CollectionParamsManage> paramsManages1 = map.get(gatewayId);
// 删除缓存数据
if (redisCache.hasKey(heartBeat)) {
redisCache.deleteObject(heartBeat);
}
// List<String> sendOrder = new ArrayList<>();
// for (CollectionParamsManage paramsManage : paramsManages1) {
// // 开始创建指令
// String mtCode = paramsManage.getMtCode(); // 采集编号
// String funCode = paramsManage.getMtCode(); // 功能码
// String registerAddr = paramsManage.getRegisterAddr(); // 寄存器地址
// String registerNum = String.valueOf(paramsManage.getRegisterSize()); // 寄存器数量
// // 拼接指令
// String sendOrderStr = ExchangeStringUtil.addZeroForNum(ExchangeStringUtil.intToHexString(Integer.parseInt(mtCode), true), 2)
// + ExchangeStringUtil.addZeroForNum(funCode, 2)
// + ExchangeStringUtil.addZeroForNum(registerAddr, 4)
// + ExchangeStringUtil.addZeroForNum(registerNum, 4);
// byte[] strOrder = ExchangeStringUtil.hexStrToBinaryStr(sendOrderStr);
// int checkNum = CRC16.CRC16_MODBUS(strOrder);
// String checkWord = ExchangeStringUtil.decToHex(String.valueOf(checkNum));
// checkWord = checkWord.substring(2, 4) + checkWord.substring(0, 2);
// sendOrderStr = sendOrderStr + checkWord;
// sendOrder.add(sendOrderStr);
// }
redisCache.setCacheObject(heartBeat, paramsManages1);
redisCache.expire(heartBeat, 30, TimeUnit.MINUTES);
}
}
}
@Override @Override
public List<CollectionParamsManage> selectListByParams(HashMap<String, Object> queryMap) { public List<CollectionParamsManage> selectListByParams(HashMap<String, Object> queryMap) {
if (queryMap == null || queryMap.isEmpty()) { if (queryMap == null || queryMap.isEmpty()) {

6
mh-system/src/main/java/com/mh/system/service/device/impl/GatewayManageServiceImpl.java

@ -9,6 +9,7 @@ import jakarta.annotation.Resource;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.util.List; import java.util.List;
import java.util.stream.Collectors;
/** /**
* @author LJF * @author LJF
@ -23,6 +24,11 @@ public class GatewayManageServiceImpl implements IGatewayManageService {
@Resource @Resource
private GatewayManageMapper gatewayManageMapper; private GatewayManageMapper gatewayManageMapper;
@Override
public void updateGatewayStatus(String heartBeat) {
gatewayManageMapper.updateOnlineStatusByHeartBeat(heartBeat);
}
@Override @Override
public List<GatewayManage> selectGwManageList(GatewayManage gatewayManage) { public List<GatewayManage> selectGwManageList(GatewayManage gatewayManage) {
QueryWrapper<GatewayManage> queryWrapper = new QueryWrapper<>(); QueryWrapper<GatewayManage> queryWrapper = new QueryWrapper<>();

Loading…
Cancel
Save