Compare commits
No commits in common. 'dev_mz' and 'dev' have entirely different histories.
50 changed files with 46 additions and 3996 deletions
@ -1,63 +0,0 @@ |
|||||||
package com.mh.common.core.redis; |
|
||||||
|
|
||||||
import lombok.extern.slf4j.Slf4j; |
|
||||||
import org.springframework.data.redis.core.StringRedisTemplate; |
|
||||||
import org.springframework.data.redis.core.script.RedisScript; |
|
||||||
import org.springframework.stereotype.Component; |
|
||||||
|
|
||||||
import java.util.Collections; |
|
||||||
import java.util.concurrent.TimeUnit; |
|
||||||
|
|
||||||
/** |
|
||||||
* @author LJF |
|
||||||
* @version 1.0 |
|
||||||
* @project EEMCS |
|
||||||
* @description 锁 |
|
||||||
* @date 2025-06-06 16:08:13 |
|
||||||
*/ |
|
||||||
@Slf4j |
|
||||||
@Component |
|
||||||
public class RedisLock { |
|
||||||
|
|
||||||
private final StringRedisTemplate redisTemplate; |
|
||||||
|
|
||||||
public RedisLock(StringRedisTemplate redisTemplate) { |
|
||||||
this.redisTemplate = redisTemplate; |
|
||||||
} |
|
||||||
|
|
||||||
/** |
|
||||||
* 获取锁 |
|
||||||
*/ |
|
||||||
public boolean lock(String key, String requestId, long expireTimeInSeconds) { |
|
||||||
Boolean success = redisTemplate.opsForValue().setIfAbsent(key, requestId, expireTimeInSeconds, TimeUnit.SECONDS); |
|
||||||
return Boolean.TRUE.equals(success); |
|
||||||
} |
|
||||||
|
|
||||||
/** |
|
||||||
* 尝试获取锁(带超时) |
|
||||||
*/ |
|
||||||
public boolean tryLock(String key, String requestId, long expireTime, long timeoutMs) throws InterruptedException { |
|
||||||
long startTime = System.currentTimeMillis(); |
|
||||||
while (System.currentTimeMillis() - startTime < timeoutMs) { |
|
||||||
if (lock(key, requestId, expireTime)) { |
|
||||||
return true; |
|
||||||
} |
|
||||||
Thread.sleep(50); |
|
||||||
} |
|
||||||
return false; |
|
||||||
} |
|
||||||
|
|
||||||
/** |
|
||||||
* 释放锁(使用 Lua 脚本保证原子性) |
|
||||||
*/ |
|
||||||
public void unlock(String key, String requestId) { |
|
||||||
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; |
|
||||||
RedisScript<Long> redisScript = RedisScript.of(script, Long.class); |
|
||||||
|
|
||||||
Long result = redisTemplate.execute(redisScript, Collections.singletonList(key), requestId); |
|
||||||
|
|
||||||
if (result == null || result == 0) { |
|
||||||
log.warn("释放锁失败,可能已被其他线程释放 key={}", key); |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
@ -1,277 +0,0 @@ |
|||||||
package com.mh.common.utils; |
|
||||||
|
|
||||||
import com.mh.common.core.domain.entity.CollectionParamsManage; |
|
||||||
import lombok.extern.slf4j.Slf4j; |
|
||||||
|
|
||||||
import java.math.BigDecimal; |
|
||||||
import java.math.RoundingMode; |
|
||||||
import java.text.DecimalFormat; |
|
||||||
import java.text.ParseException; |
|
||||||
import java.text.SimpleDateFormat; |
|
||||||
import java.util.*; |
|
||||||
|
|
||||||
/** |
|
||||||
* @author ljf |
|
||||||
* @title : |
|
||||||
* @description : 解析485接收的数据 |
|
||||||
* @updateTime 2020-04-23 |
|
||||||
* @throws : |
|
||||||
*/ |
|
||||||
@Slf4j |
|
||||||
public class AnalysisReceiveOrder485 { |
|
||||||
|
|
||||||
// 调用service
|
|
||||||
private final SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); |
|
||||||
private final DecimalFormat df = new DecimalFormat("#.##"); |
|
||||||
|
|
||||||
//解析冷量表
|
|
||||||
public void analysisCloudOrder485(final String dataStr1, final CollectionParamsManage deviceCodeParam) { |
|
||||||
// 去掉空格
|
|
||||||
String dataStr = dataStr1.replace(" ", "").toUpperCase(); |
|
||||||
// 检验报文
|
|
||||||
String checkStr = dataStr.substring(0, dataStr.length() - 4); |
|
||||||
byte[] strOrder = ExchangeStringUtil.hexStrToBinaryStr(checkStr); |
|
||||||
int checkNum = CRC16.CRC16_MODBUS(strOrder); |
|
||||||
String checkWord = ExchangeStringUtil.decToHex(String.valueOf(checkNum)); |
|
||||||
checkWord = checkWord.substring(2, 4) + checkWord.substring(0, 2); |
|
||||||
|
|
||||||
if (checkWord.equalsIgnoreCase(dataStr.substring(dataStr.length() - 4))) { |
|
||||||
//创建SimpleDateFormat对象,指定样式 2019-05-13 22:39:30
|
|
||||||
Date date = new Date(); |
|
||||||
String dateStr = sdf1.format(date); |
|
||||||
; |
|
||||||
//保留两位小数处理
|
|
||||||
DecimalFormat decimalFormat = new DecimalFormat("0.00"); |
|
||||||
// 表号
|
|
||||||
String cloudId = ExchangeStringUtil.hexToDec(dataStr.substring(0, 2)); |
|
||||||
// 读数
|
|
||||||
String data = ""; |
|
||||||
data = dataStr.substring(dataStr.length() - 8, dataStr.length() - 6) |
|
||||||
+ dataStr.substring(dataStr.length() - 6, dataStr.length() - 4) |
|
||||||
+ dataStr.substring(dataStr.length() - 12, dataStr.length() - 10) |
|
||||||
+ dataStr.substring(dataStr.length() - 10, dataStr.length() - 8); |
|
||||||
|
|
||||||
String registerAddr = deviceCodeParam.getRegisterAddr(); |
|
||||||
if (ExchangeStringUtil.isInDate(date, "00:00:00", "00:00:30")) { |
|
||||||
dateStr = dateStr.substring(0, 17) + "00"; |
|
||||||
} else if (ExchangeStringUtil.isInDate(date, "00:00:30", "00:00:59")) { |
|
||||||
dateStr = dateStr.substring(0, 17) + "30"; |
|
||||||
} |
|
||||||
try { |
|
||||||
if (registerAddr.equals("32") || registerAddr.equals("33") || registerAddr.equals("35") || registerAddr.equals("36")) { |
|
||||||
data = decimalFormat.format(Math.abs(ExchangeStringUtil.hexToSingle(data)));//十六进制字符串转IEEE754浮点型
|
|
||||||
log.info("冷量计==>{},寄存器地址==>{},读数==>{}", cloudId, registerAddr, data); |
|
||||||
} else if (registerAddr.equals("31") || registerAddr.equals("34")) { |
|
||||||
long lData = Long.parseLong(ExchangeStringUtil.hexToDec(data)); |
|
||||||
log.info("冷量计==>{},寄存器地址==>{},累计读数==>{}", cloudId, registerAddr, lData); |
|
||||||
} |
|
||||||
} catch (Exception e) { |
|
||||||
log.error("保存冷量计数据失败!", e); |
|
||||||
} |
|
||||||
} else { |
|
||||||
log.info("冷量计校验失败===>{}", dataStr); |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
/** |
|
||||||
* 解析水表返回的数据 |
|
||||||
* |
|
||||||
* @param dataStr1 |
|
||||||
*/ |
|
||||||
public String analysisWaterOrder485(final String dataStr1, final CollectionParamsManage deviceCodeParam) { |
|
||||||
// 去掉空格
|
|
||||||
String dataStr = dataStr1.replace(" ", "").toUpperCase(); |
|
||||||
// 检验报文
|
|
||||||
String checkStr = dataStr.substring(0, dataStr.length() - 4); |
|
||||||
byte[] strOrder = ExchangeStringUtil.hexStrToBinaryStr(checkStr); |
|
||||||
int checkNum = CRC16.CRC16_MODBUS(strOrder); |
|
||||||
String checkWord = ExchangeStringUtil.decToHex(String.valueOf(checkNum)); |
|
||||||
checkWord = checkWord.substring(2, 4) + checkWord.substring(0, 2); |
|
||||||
|
|
||||||
if (checkWord.equalsIgnoreCase(dataStr.substring(dataStr.length() - 4))) { |
|
||||||
//创建SimpleDateFormat对象,指定样式 2019-05-13 22:39:30
|
|
||||||
Date date = new Date(); |
|
||||||
String dateStr = sdf1.format(date); |
|
||||||
; |
|
||||||
//保留两位小数处理
|
|
||||||
DecimalFormat decimalFormat = new DecimalFormat("0.00"); |
|
||||||
// 表号
|
|
||||||
String cloudId = ExchangeStringUtil.hexToDec(dataStr.substring(0, 2)); |
|
||||||
// 读数
|
|
||||||
String data = ""; |
|
||||||
data = dataStr.substring(dataStr.length() - 8, dataStr.length() - 6) |
|
||||||
+ dataStr.substring(dataStr.length() - 6, dataStr.length() - 4) |
|
||||||
+ dataStr.substring(dataStr.length() - 12, dataStr.length() - 10) |
|
||||||
+ dataStr.substring(dataStr.length() - 10, dataStr.length() - 8); |
|
||||||
|
|
||||||
int dataType = deviceCodeParam.getDataType(); |
|
||||||
if (ExchangeStringUtil.isInDate(date, "00:00:00", "00:00:30")) { |
|
||||||
dateStr = dateStr.substring(0, 17) + "00"; |
|
||||||
System.out.println("插入时间00" + dateStr); |
|
||||||
} else if (ExchangeStringUtil.isInDate(date, "00:00:30", "00:00:59")) { |
|
||||||
dateStr = dateStr.substring(0, 17) + "30"; |
|
||||||
System.out.println("插入时间30" + dateStr); |
|
||||||
} |
|
||||||
try { |
|
||||||
if (dataType == 3) { |
|
||||||
data = decimalFormat.format(Math.abs(ExchangeStringUtil.hexToSingle(data)));//十六进制字符串转IEEE754浮点型
|
|
||||||
log.info("水表==>{},寄存器地址==>{},读数==>{}", cloudId, deviceCodeParam.getRegisterAddr(), data); |
|
||||||
} else if (dataType == 2) { |
|
||||||
data = dataStr.substring(dataStr.length() - 12, dataStr.length() - 10) |
|
||||||
+ dataStr.substring(dataStr.length() - 10, dataStr.length() - 8) |
|
||||||
+ dataStr.substring(dataStr.length() - 8, dataStr.length() - 6) |
|
||||||
+ dataStr.substring(dataStr.length() - 6, dataStr.length() - 4); |
|
||||||
data = ExchangeStringUtil.hexToDec(data); |
|
||||||
BigDecimal bigDecimal = new BigDecimal(data); |
|
||||||
bigDecimal = bigDecimal.divide(new BigDecimal((int) Math.pow(10, deviceCodeParam.getDigits()))).setScale(2, RoundingMode.HALF_UP); // 除以1000并保留整数
|
|
||||||
data = bigDecimal.toString(); |
|
||||||
log.info("水表==>{},寄存器地址==>{},累计读数==>{}", cloudId, deviceCodeParam.getRegisterAddr(), data); |
|
||||||
} |
|
||||||
return data; |
|
||||||
} catch (Exception e) { |
|
||||||
log.error("保存水表数据失败!", e); |
|
||||||
} |
|
||||||
} else { |
|
||||||
log.info("水表===>{}", dataStr); |
|
||||||
return ""; |
|
||||||
} |
|
||||||
return ""; |
|
||||||
} |
|
||||||
|
|
||||||
|
|
||||||
/** |
|
||||||
* 解析电表返回的数据 |
|
||||||
* |
|
||||||
* @param dataStr1 |
|
||||||
*/ |
|
||||||
public String analysisMeterOrder485(final String dataStr1, final CollectionParamsManage deviceCodeParam) { |
|
||||||
// 去掉空格
|
|
||||||
String dataStr = dataStr1.replace(" ", "").toUpperCase(); |
|
||||||
// 检验报文
|
|
||||||
String checkStr = dataStr.substring(0, dataStr.length() - 4); |
|
||||||
byte[] strOrder = ExchangeStringUtil.hexStrToBinaryStr(checkStr); |
|
||||||
int checkNum = CRC16.CRC16_MODBUS(strOrder); |
|
||||||
String checkWord = ExchangeStringUtil.decToHex(String.valueOf(checkNum)); |
|
||||||
checkWord = checkWord.substring(2, 4) + checkWord.substring(0, 2); |
|
||||||
|
|
||||||
if (checkWord.equalsIgnoreCase(dataStr.substring(dataStr.length() - 4))) { |
|
||||||
//创建SimpleDateFormat对象,指定样式 2019-05-13 22:39:30
|
|
||||||
Date date = new Date(); |
|
||||||
String dateStr = sdf1.format(date); |
|
||||||
; |
|
||||||
//保留两位小数处理
|
|
||||||
DecimalFormat decimalFormat = new DecimalFormat("0.00"); |
|
||||||
// 表号
|
|
||||||
String cloudId = ExchangeStringUtil.hexToDec(dataStr.substring(0, 2)); |
|
||||||
// 读数
|
|
||||||
String data = ""; |
|
||||||
data = dataStr.substring(dataStr.length() - 8, dataStr.length() - 6) |
|
||||||
+ dataStr.substring(dataStr.length() - 6, dataStr.length() - 4) |
|
||||||
+ dataStr.substring(dataStr.length() - 12, dataStr.length() - 10) |
|
||||||
+ dataStr.substring(dataStr.length() - 10, dataStr.length() - 8); |
|
||||||
|
|
||||||
int dataType = deviceCodeParam.getDataType(); |
|
||||||
if (ExchangeStringUtil.isInDate(date, "00:00:00", "00:00:30")) { |
|
||||||
dateStr = dateStr.substring(0, 17) + "00"; |
|
||||||
System.out.println("插入时间00" + dateStr); |
|
||||||
} else if (ExchangeStringUtil.isInDate(date, "00:00:30", "00:00:59")) { |
|
||||||
dateStr = dateStr.substring(0, 17) + "30"; |
|
||||||
System.out.println("插入时间30" + dateStr); |
|
||||||
} |
|
||||||
try { |
|
||||||
if (dataType == 3) { |
|
||||||
data = decimalFormat.format(Math.abs(ExchangeStringUtil.hexToSingle(data)));//十六进制字符串转IEEE754浮点型
|
|
||||||
log.info("电表==>{},寄存器地址==>{},读数==>{}", cloudId, deviceCodeParam.getRegisterAddr(), data); |
|
||||||
} else if (dataType == 2) { |
|
||||||
data = ExchangeStringUtil.hexToDec(data); |
|
||||||
log.info("电表==>{},寄存器地址==>{},累计读数==>{}", cloudId, deviceCodeParam.getRegisterAddr(), data); |
|
||||||
} |
|
||||||
return data; |
|
||||||
} catch (Exception e) { |
|
||||||
log.error("保存电表数据失败!", e); |
|
||||||
} |
|
||||||
} else { |
|
||||||
log.info("电表===>{}", dataStr); |
|
||||||
return ""; |
|
||||||
} |
|
||||||
return ""; |
|
||||||
} |
|
||||||
|
|
||||||
public static int dValue(String lastDate) throws ParseException { |
|
||||||
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); |
|
||||||
Date lastTime = format.parse(lastDate); |
|
||||||
long min = lastTime.getTime(); |
|
||||||
Calendar calendar = Calendar.getInstance(); |
|
||||||
long min1 = calendar.getTimeInMillis(); |
|
||||||
long subtract = min1 - min; |
|
||||||
// System.out.println("相减值: " + subtract/(1000*60));
|
|
||||||
return (int) subtract / (1000 * 60); |
|
||||||
} |
|
||||||
|
|
||||||
|
|
||||||
// 判断是否存在寄存器地址
|
|
||||||
public Boolean queryRegisterAddr(List<String> stringList, String registerAddr) { |
|
||||||
boolean flag = false; |
|
||||||
for (int i = 0; i < stringList.size(); i++) { |
|
||||||
if (stringList.get(i).equalsIgnoreCase(registerAddr)) { |
|
||||||
flag = true; |
|
||||||
break; |
|
||||||
} |
|
||||||
} |
|
||||||
return flag; |
|
||||||
} |
|
||||||
|
|
||||||
public String analysisHeatPumpOrder485(String receiveStr, CollectionParamsManage deviceCodeParam) { |
|
||||||
// 去掉空格
|
|
||||||
String dataStr = receiveStr.replace(" ", "").toUpperCase(); |
|
||||||
// 检验报文
|
|
||||||
String checkStr = dataStr.substring(0, dataStr.length() - 4); |
|
||||||
byte[] strOrder = ExchangeStringUtil.hexStrToBinaryStr(checkStr); |
|
||||||
int checkNum = CRC16.CRC16_MODBUS(strOrder); |
|
||||||
String checkWord = ExchangeStringUtil.decToHex(String.valueOf(checkNum)); |
|
||||||
checkWord = checkWord.substring(2, 4) + checkWord.substring(0, 2); |
|
||||||
|
|
||||||
if (checkWord.equalsIgnoreCase(dataStr.substring(dataStr.length() - 4))) { |
|
||||||
//创建SimpleDateFormat对象,指定样式 2019-05-13 22:39:30
|
|
||||||
Date date = new Date(); |
|
||||||
String dateStr = sdf1.format(date); |
|
||||||
|
|
||||||
//保留两位小数处理
|
|
||||||
DecimalFormat decimalFormat = new DecimalFormat("0.00"); |
|
||||||
// 表号
|
|
||||||
String cloudId = ExchangeStringUtil.hexToDec(dataStr.substring(0, 2)); |
|
||||||
// 读数
|
|
||||||
String data = ""; |
|
||||||
data = dataStr.substring(dataStr.length() - 8, dataStr.length() - 6) |
|
||||||
+ dataStr.substring(dataStr.length() - 6, dataStr.length() - 4); |
|
||||||
|
|
||||||
int dataType = deviceCodeParam.getDataType(); |
|
||||||
if (ExchangeStringUtil.isInDate(date, "00:00:00", "00:00:30")) { |
|
||||||
dateStr = dateStr.substring(0, 17) + "00"; |
|
||||||
System.out.println("插入时间00" + dateStr); |
|
||||||
} else if (ExchangeStringUtil.isInDate(date, "00:00:30", "00:00:59")) { |
|
||||||
dateStr = dateStr.substring(0, 17) + "30"; |
|
||||||
System.out.println("插入时间30" + dateStr); |
|
||||||
} |
|
||||||
try { |
|
||||||
if (dataType == 3) { |
|
||||||
data = decimalFormat.format(Math.abs(ExchangeStringUtil.hexToSingle(data)));//十六进制字符串转IEEE754浮点型
|
|
||||||
} else if (dataType == 2 && (deviceCodeParam.getParamType().equals("5") |
|
||||||
|| deviceCodeParam.getParamType().equals("2") |
|
||||||
|| deviceCodeParam.getParamType().equals("12") |
|
||||||
|| deviceCodeParam.getParamType().equals("14") |
|
||||||
)) { |
|
||||||
data = ExchangeStringUtil.hexToDec(data); |
|
||||||
} |
|
||||||
log.info("热泵==>{},寄存器地址==>{},读数==>{}", cloudId, deviceCodeParam.getRegisterAddr(), data); |
|
||||||
return data; |
|
||||||
} catch (Exception e) { |
|
||||||
log.error("保存热泵数据失败!", e); |
|
||||||
} |
|
||||||
} else { |
|
||||||
log.info("热泵===>{}", dataStr); |
|
||||||
return ""; |
|
||||||
} |
|
||||||
return ""; |
|
||||||
} |
|
||||||
} |
|
@ -1,253 +0,0 @@ |
|||||||
package com.mh.common.utils; |
|
||||||
|
|
||||||
/** |
|
||||||
* CRC16_CCITT:多项式x16+x12+x5+1(0x1021),初始值0x0000,低位在前,高位在后,结果与0x0000异或 |
|
||||||
* CRC16_CCITT_FALSE:多项式x16+x12+x5+1(0x1021),初始值0xFFFF,低位在后,高位在前,结果与0x0000异或 |
|
||||||
* CRC16_XMODEM:多项式x16+x12+x5+1(0x1021),初始值0x0000,低位在后,高位在前,结果与0x0000异或 |
|
||||||
* CRC16_X25:多项式x16+x12+x5+1(0x1021),初始值0xffff,低位在前,高位在后,结果与0xFFFF异或 |
|
||||||
* CRC16_MODBUS:多项式x16+x15+x2+1(0x8005),初始值0xFFFF,低位在前,高位在后,结果与0x0000异或 |
|
||||||
* CRC16_IBM:多项式x16+x15+x2+1(0x8005),初始值0x0000,低位在前,高位在后,结果与0x0000异或 |
|
||||||
* CRC16_MAXIM:多项式x16+x15+x2+1(0x8005),初始值0x0000,低位在前,高位在后,结果与0xFFFF异或 |
|
||||||
* CRC16_USB:多项式x16+x15+x2+1(0x8005),初始值0xFFFF,低位在前,高位在后,结果与0xFFFF异或 |
|
||||||
* CRC16_DNP:多项式x16+x13+x12+x11+x10+x8+x6+x5+x2+1(0x3D65),初始值0x0000,低位在前,高位在后,结果与0xFFFF异或 |
|
||||||
* <p> |
|
||||||
* (1)、预置1个16位的寄存器为十六进制FFFF(即全为1),称此寄存器为CRC寄存器; |
|
||||||
* (2)、把第一个8位二进制数据(既通讯信息帧的第一个字节)与16位的CRC寄存器的低8位相异或,把结果放于CRC寄存器,高八位数据不变; |
|
||||||
* (3)、把CRC寄存器的内容右移一位(朝低位)用0填补最高位,并检查右移后的移出位; |
|
||||||
* (4)、如果移出位为0:重复第3步(再次右移一位);如果移出位为1,CRC寄存器与多项式A001(1010 0000 0000 0001)进行异或; |
|
||||||
* (5)、重复步骤3和4,直到右移8次,这样整个8位数据全部进行了处理; |
|
||||||
* (6)、重复步骤2到步骤5,进行通讯信息帧下一个字节的处理; |
|
||||||
* (7)、将该通讯信息帧所有字节按上述步骤计算完成后,得到的16位CRC寄存器的高、低字节进行交换; |
|
||||||
* (8)、最后得到的CRC寄存器内容即为:CRC码。 |
|
||||||
* <p> |
|
||||||
* 以上计算步骤中的多项式0xA001是0x8005按位颠倒后的结果。 |
|
||||||
* 0x8408是0x1021按位颠倒后的结果。 |
|
||||||
* 在线校验工具 |
|
||||||
* http://www.ip33.com/crc.html
|
|
||||||
* https://blog.csdn.net/htmlxx/article/details/17369105
|
|
||||||
* <p> |
|
||||||
* Author:Water |
|
||||||
* Time:2018/11/19 0019 15:03 |
|
||||||
*/ |
|
||||||
public class CRC16 { |
|
||||||
|
|
||||||
/** |
|
||||||
* CRC16_CCITT:多项式x16+x12+x5+1(0x1021),初始值0x0000,低位在前,高位在后,结果与0x0000异或 |
|
||||||
* 0x8408是0x1021按位颠倒后的结果。 |
|
||||||
* |
|
||||||
* @param buffer |
|
||||||
* @return |
|
||||||
*/ |
|
||||||
public static int CRC16_CCITT(byte[] buffer) { |
|
||||||
int wCRCin = 0x0000; |
|
||||||
int wCPoly = 0x8408; |
|
||||||
for (byte b : buffer) { |
|
||||||
wCRCin ^= ((int) b & 0x00ff); |
|
||||||
for (int j = 0; j < 8; j++) { |
|
||||||
if ((wCRCin & 0x0001) != 0) { |
|
||||||
wCRCin >>= 1; |
|
||||||
wCRCin ^= wCPoly; |
|
||||||
} else { |
|
||||||
wCRCin >>= 1; |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
||||||
// wCRCin=(wCRCin<<8)|(wCRCin>>8);
|
|
||||||
// wCRCin &= 0xffff;
|
|
||||||
return wCRCin ^= 0x0000; |
|
||||||
|
|
||||||
} |
|
||||||
|
|
||||||
/** |
|
||||||
* CRC-CCITT (0xFFFF) |
|
||||||
* CRC16_CCITT_FALSE:多项式x16+x12+x5+1(0x1021),初始值0xFFFF,低位在后,高位在前,结果与0x0000异或 |
|
||||||
* |
|
||||||
* @param buffer |
|
||||||
* @return |
|
||||||
*/ |
|
||||||
public static int CRC16_CCITT_FALSE(byte[] buffer) { |
|
||||||
int wCRCin = 0xffff; |
|
||||||
int wCPoly = 0x1021; |
|
||||||
for (byte b : buffer) { |
|
||||||
for (int i = 0; i < 8; i++) { |
|
||||||
boolean bit = ((b >> (7 - i) & 1) == 1); |
|
||||||
boolean c15 = ((wCRCin >> 15 & 1) == 1); |
|
||||||
wCRCin <<= 1; |
|
||||||
if (c15 ^ bit) |
|
||||||
wCRCin ^= wCPoly; |
|
||||||
} |
|
||||||
} |
|
||||||
wCRCin &= 0xffff; |
|
||||||
return wCRCin ^= 0x0000; |
|
||||||
} |
|
||||||
|
|
||||||
/** |
|
||||||
* CRC-CCITT (XModem) |
|
||||||
* CRC16_XMODEM:多项式x16+x12+x5+1(0x1021),初始值0x0000,低位在后,高位在前,结果与0x0000异或 |
|
||||||
* |
|
||||||
* @param buffer |
|
||||||
* @return |
|
||||||
*/ |
|
||||||
public static int CRC16_XMODEM(byte[] buffer) { |
|
||||||
int wCRCin = 0x0000; // initial value 65535
|
|
||||||
int wCPoly = 0x1021; // 0001 0000 0010 0001 (0, 5, 12)
|
|
||||||
for (byte b : buffer) { |
|
||||||
for (int i = 0; i < 8; i++) { |
|
||||||
boolean bit = ((b >> (7 - i) & 1) == 1); |
|
||||||
boolean c15 = ((wCRCin >> 15 & 1) == 1); |
|
||||||
wCRCin <<= 1; |
|
||||||
if (c15 ^ bit) |
|
||||||
wCRCin ^= wCPoly; |
|
||||||
} |
|
||||||
} |
|
||||||
wCRCin &= 0xffff; |
|
||||||
return wCRCin ^= 0x0000; |
|
||||||
} |
|
||||||
|
|
||||||
|
|
||||||
/** |
|
||||||
* CRC16_X25:多项式x16+x12+x5+1(0x1021),初始值0xffff,低位在前,高位在后,结果与0xFFFF异或 |
|
||||||
* 0x8408是0x1021按位颠倒后的结果。 |
|
||||||
* |
|
||||||
* @param buffer |
|
||||||
* @return |
|
||||||
*/ |
|
||||||
public static int CRC16_X25(byte[] buffer) { |
|
||||||
int wCRCin = 0xffff; |
|
||||||
int wCPoly = 0x8408; |
|
||||||
for (byte b : buffer) { |
|
||||||
wCRCin ^= ((int) b & 0x00ff); |
|
||||||
for (int j = 0; j < 8; j++) { |
|
||||||
if ((wCRCin & 0x0001) != 0) { |
|
||||||
wCRCin >>= 1; |
|
||||||
wCRCin ^= wCPoly; |
|
||||||
} else { |
|
||||||
wCRCin >>= 1; |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
||||||
return wCRCin ^= 0xffff; |
|
||||||
} |
|
||||||
|
|
||||||
/** |
|
||||||
* CRC-16 (Modbus) |
|
||||||
* CRC16_MODBUS:多项式x16+x15+x2+1(0x8005),初始值0xFFFF,低位在前,高位在后,结果与0x0000异或 |
|
||||||
* 0xA001是0x8005按位颠倒后的结果 |
|
||||||
* |
|
||||||
* @param buffer |
|
||||||
* @return |
|
||||||
*/ |
|
||||||
public static int CRC16_MODBUS(byte[] buffer) { |
|
||||||
int wCRCin = 0xffff; |
|
||||||
int POLYNOMIAL = 0xa001; |
|
||||||
for (byte b : buffer) { |
|
||||||
wCRCin ^= ((int) b & 0x00ff); |
|
||||||
for (int j = 0; j < 8; j++) { |
|
||||||
if ((wCRCin & 0x0001) != 0) { |
|
||||||
wCRCin >>= 1; |
|
||||||
wCRCin ^= POLYNOMIAL; |
|
||||||
} else { |
|
||||||
wCRCin >>= 1; |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
||||||
return wCRCin ^= 0x0000; |
|
||||||
} |
|
||||||
|
|
||||||
/** |
|
||||||
* CRC-16 |
|
||||||
* CRC16_IBM:多项式x16+x15+x2+1(0x8005),初始值0x0000,低位在前,高位在后,结果与0x0000异或 |
|
||||||
* 0xA001是0x8005按位颠倒后的结果 |
|
||||||
* |
|
||||||
* @param buffer |
|
||||||
* @return |
|
||||||
*/ |
|
||||||
public static int CRC16_IBM(byte[] buffer) { |
|
||||||
int wCRCin = 0x0000; |
|
||||||
int wCPoly = 0xa001; |
|
||||||
for (byte b : buffer) { |
|
||||||
wCRCin ^= ((int) b & 0x00ff); |
|
||||||
for (int j = 0; j < 8; j++) { |
|
||||||
if ((wCRCin & 0x0001) != 0) { |
|
||||||
wCRCin >>= 1; |
|
||||||
wCRCin ^= wCPoly; |
|
||||||
} else { |
|
||||||
wCRCin >>= 1; |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
||||||
return wCRCin ^= 0x0000; |
|
||||||
} |
|
||||||
|
|
||||||
/** |
|
||||||
* CRC16_MAXIM:多项式x16+x15+x2+1(0x8005),初始值0x0000,低位在前,高位在后,结果与0xFFFF异或 |
|
||||||
* 0xA001是0x8005按位颠倒后的结果 |
|
||||||
* |
|
||||||
* @param buffer |
|
||||||
* @return |
|
||||||
*/ |
|
||||||
public static int CRC16_MAXIM(byte[] buffer) { |
|
||||||
int wCRCin = 0x0000; |
|
||||||
int wCPoly = 0xa001; |
|
||||||
for (byte b : buffer) { |
|
||||||
wCRCin ^= ((int) b & 0x00ff); |
|
||||||
for (int j = 0; j < 8; j++) { |
|
||||||
if ((wCRCin & 0x0001) != 0) { |
|
||||||
wCRCin >>= 1; |
|
||||||
wCRCin ^= wCPoly; |
|
||||||
} else { |
|
||||||
wCRCin >>= 1; |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
||||||
return wCRCin ^= 0xffff; |
|
||||||
} |
|
||||||
|
|
||||||
/** |
|
||||||
* CRC16_USB:多项式x16+x15+x2+1(0x8005),初始值0xFFFF,低位在前,高位在后,结果与0xFFFF异或 |
|
||||||
* 0xA001是0x8005按位颠倒后的结果 |
|
||||||
* |
|
||||||
* @param buffer |
|
||||||
* @return |
|
||||||
*/ |
|
||||||
public static int CRC16_USB(byte[] buffer) { |
|
||||||
int wCRCin = 0xFFFF; |
|
||||||
int wCPoly = 0xa001; |
|
||||||
for (byte b : buffer) { |
|
||||||
wCRCin ^= ((int) b & 0x00ff); |
|
||||||
for (int j = 0; j < 8; j++) { |
|
||||||
if ((wCRCin & 0x0001) != 0) { |
|
||||||
wCRCin >>= 1; |
|
||||||
wCRCin ^= wCPoly; |
|
||||||
} else { |
|
||||||
wCRCin >>= 1; |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
||||||
return wCRCin ^= 0xffff; |
|
||||||
} |
|
||||||
|
|
||||||
/** |
|
||||||
* CRC16_DNP:多项式x16+x13+x12+x11+x10+x8+x6+x5+x2+1(0x3D65),初始值0x0000,低位在前,高位在后,结果与0xFFFF异或 |
|
||||||
* 0xA6BC是0x3D65按位颠倒后的结果 |
|
||||||
* |
|
||||||
* @param buffer |
|
||||||
* @return |
|
||||||
*/ |
|
||||||
public static int CRC16_DNP(byte[] buffer) { |
|
||||||
int wCRCin = 0x0000; |
|
||||||
int wCPoly = 0xA6BC; |
|
||||||
for (byte b : buffer) { |
|
||||||
wCRCin ^= ((int) b & 0x00ff); |
|
||||||
for (int j = 0; j < 8; j++) { |
|
||||||
if ((wCRCin & 0x0001) != 0) { |
|
||||||
wCRCin >>= 1; |
|
||||||
wCRCin ^= wCPoly; |
|
||||||
} else { |
|
||||||
wCRCin >>= 1; |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
||||||
return wCRCin ^= 0xffff; |
|
||||||
} |
|
||||||
} |
|
File diff suppressed because it is too large
Load Diff
@ -1,47 +0,0 @@ |
|||||||
package com.mh.common.utils; |
|
||||||
|
|
||||||
import io.netty.buffer.ByteBuf; |
|
||||||
import io.netty.buffer.Unpooled; |
|
||||||
import io.netty.channel.ChannelHandlerContext; |
|
||||||
|
|
||||||
/** |
|
||||||
* @author LJF |
|
||||||
* @version 1.0 |
|
||||||
* @project EEMCS |
|
||||||
* @description Modbus协议工具类 |
|
||||||
* @date 2025-06-06 14:40:24 |
|
||||||
*/ |
|
||||||
public class ModbusUtils { |
|
||||||
public static String createControlCode(String mtCode, Integer type, String registerAddr, String param) { |
|
||||||
String orderStr; |
|
||||||
mtCode = ExchangeStringUtil.addZeroForNum(mtCode, 2); |
|
||||||
registerAddr = ExchangeStringUtil.addZeroForNum(registerAddr, 4); |
|
||||||
param = ExchangeStringUtil.addZeroForNum(ExchangeStringUtil.decToHex(param), 4); |
|
||||||
orderStr = mtCode + "06" + registerAddr + param; |
|
||||||
byte[] strOrder = ExchangeStringUtil.hexStrToBinaryStr(orderStr); |
|
||||||
int checkNum = CRC16.CRC16_MODBUS(strOrder); |
|
||||||
String checkWord = ExchangeStringUtil.decToHex(String.valueOf(checkNum)); |
|
||||||
checkWord = checkWord.substring(2, 4) + checkWord.substring(0, 2); |
|
||||||
return orderStr + checkWord; |
|
||||||
} |
|
||||||
|
|
||||||
public static ByteBuf getByteBuf(ChannelHandlerContext ctx, String sendStr) { |
|
||||||
// byte类型的数据
|
|
||||||
// String sendStr = "5803004900021914"; // 冷量计
|
|
||||||
// 申请一个数据结构存储信息
|
|
||||||
ByteBuf buffer = ctx.alloc().buffer(); |
|
||||||
// 将信息放入数据结构中
|
|
||||||
buffer.writeBytes(ExchangeStringUtil.hexStrToBinaryStr(sendStr));//对接需要16进制
|
|
||||||
return buffer; |
|
||||||
} |
|
||||||
|
|
||||||
public static ByteBuf createByteBuf(String sendStr) { |
|
||||||
// byte类型的数据
|
|
||||||
// String sendStr = "5803004900021914"; // 冷量计
|
|
||||||
// 申请一个数据结构存储信息
|
|
||||||
ByteBuf buffer = Unpooled.buffer(); |
|
||||||
// 将信息放入数据结构中
|
|
||||||
buffer.writeBytes(ExchangeStringUtil.hexStrToBinaryStr(sendStr));//对接需要16进制
|
|
||||||
return buffer; |
|
||||||
} |
|
||||||
} |
|
@ -1,77 +0,0 @@ |
|||||||
package com.mh.common.utils; |
|
||||||
|
|
||||||
|
|
||||||
import com.google.common.cache.Cache; |
|
||||||
import com.google.common.cache.CacheBuilder; |
|
||||||
import lombok.extern.slf4j.Slf4j; |
|
||||||
import org.apache.commons.lang3.StringUtils; |
|
||||||
|
|
||||||
import java.util.Objects; |
|
||||||
import java.util.concurrent.BlockingQueue; |
|
||||||
import java.util.concurrent.LinkedBlockingQueue; |
|
||||||
import java.util.concurrent.TimeUnit; |
|
||||||
|
|
||||||
/** |
|
||||||
* @author LJF |
|
||||||
* @version 1.0 |
|
||||||
* @project TAD_Server |
|
||||||
* @description 缓存等待数据 |
|
||||||
* @date 2023/7/4 08:45:16 |
|
||||||
*/ |
|
||||||
@Slf4j |
|
||||||
public class NettyTools { |
|
||||||
|
|
||||||
/** |
|
||||||
* 响应消息缓存 |
|
||||||
*/ |
|
||||||
private static final Cache<String, BlockingQueue<String>> responseMsgCache = CacheBuilder.newBuilder() |
|
||||||
.maximumSize(500) |
|
||||||
.expireAfterWrite(1000, TimeUnit.SECONDS) |
|
||||||
.build(); |
|
||||||
|
|
||||||
|
|
||||||
/** |
|
||||||
* 等待响应消息 |
|
||||||
* @param key 消息唯一标识 |
|
||||||
* @return ReceiveDdcMsgVo |
|
||||||
*/ |
|
||||||
public static boolean waitReceiveMsg(String key) { |
|
||||||
|
|
||||||
try { |
|
||||||
//设置超时时间
|
|
||||||
String vo = Objects.requireNonNull(responseMsgCache.getIfPresent(key)) |
|
||||||
.poll(1000 * 10, TimeUnit.MILLISECONDS); |
|
||||||
|
|
||||||
//删除key
|
|
||||||
responseMsgCache.invalidate(key); |
|
||||||
return StringUtils.isNotBlank(vo); |
|
||||||
} catch (Exception e) { |
|
||||||
log.error("获取数据异常,sn={},msg=null",key); |
|
||||||
return false; |
|
||||||
} |
|
||||||
|
|
||||||
} |
|
||||||
|
|
||||||
/** |
|
||||||
* 初始化响应消息的队列 |
|
||||||
* @param key 消息唯一标识 |
|
||||||
*/ |
|
||||||
public static void initReceiveMsg(String key) { |
|
||||||
responseMsgCache.put(key,new LinkedBlockingQueue<String>(1)); |
|
||||||
} |
|
||||||
|
|
||||||
/** |
|
||||||
* 设置响应消息 |
|
||||||
* @param key 消息唯一标识 |
|
||||||
*/ |
|
||||||
public static void setReceiveMsg(String key, String msg) { |
|
||||||
|
|
||||||
if(responseMsgCache.getIfPresent(key) != null){ |
|
||||||
responseMsgCache.getIfPresent(key).add(msg); |
|
||||||
return; |
|
||||||
} |
|
||||||
|
|
||||||
log.warn("sn {}不存在",key); |
|
||||||
} |
|
||||||
|
|
||||||
} |
|
@ -1,53 +0,0 @@ |
|||||||
package com.mh.common.utils; |
|
||||||
|
|
||||||
import com.mh.common.core.domain.entity.CollectionParamsManage; |
|
||||||
import io.netty.buffer.ByteBuf; |
|
||||||
import io.netty.channel.ChannelHandlerContext; |
|
||||||
import lombok.extern.slf4j.Slf4j; |
|
||||||
|
|
||||||
/** |
|
||||||
* @author ljf |
|
||||||
* @title : |
|
||||||
* @description : 发送指令工具类 |
|
||||||
* @updateTime 2021-01-26 |
|
||||||
* @throws : |
|
||||||
*/ |
|
||||||
@Slf4j |
|
||||||
public class SendOrderUtils { |
|
||||||
|
|
||||||
// 发送所有类型采集报文
|
|
||||||
public static void sendAllOrder(CollectionParamsManage paramsManage, ChannelHandlerContext ctx, int num, int size) { |
|
||||||
// 开始创建指令
|
|
||||||
String mtCode = paramsManage.getMtCode(); // 采集编号
|
|
||||||
String funCode = paramsManage.getFuncCode(); // 功能码
|
|
||||||
String registerAddr = paramsManage.getRegisterAddr(); // 寄存器地址
|
|
||||||
String registerNum = String.valueOf(paramsManage.getRegisterSize()); // 寄存器数量
|
|
||||||
// 拼接指令
|
|
||||||
String sendOrderStr = ExchangeStringUtil.addZeroForNum(ExchangeStringUtil.decToHex(mtCode), 2) |
|
||||||
+ ExchangeStringUtil.addZeroForNum(funCode, 2) |
|
||||||
+ ExchangeStringUtil.addZeroForNum(registerAddr, 4) |
|
||||||
+ ExchangeStringUtil.addZeroForNum(registerNum, 4); |
|
||||||
byte[] strOrder = ExchangeStringUtil.hexStrToBinaryStr(sendOrderStr); |
|
||||||
int checkNum = CRC16.CRC16_MODBUS(strOrder); |
|
||||||
String checkWord = ExchangeStringUtil.decToHex(String.valueOf(checkNum)); |
|
||||||
checkWord = checkWord.substring(2, 4) + checkWord.substring(0, 2); |
|
||||||
sendOrderStr = sendOrderStr + checkWord; |
|
||||||
ByteBuf buffer = getByteBuf(ctx, sendOrderStr); |
|
||||||
// 发送数据
|
|
||||||
ctx.channel().writeAndFlush(buffer); |
|
||||||
log.info("sends :" + sendOrderStr + ",num:" + num + ",records:" + size); |
|
||||||
try { |
|
||||||
Thread.sleep(1000); |
|
||||||
} catch (InterruptedException e) { |
|
||||||
log.error("线程休眠异常", e); |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
private static ByteBuf getByteBuf(ChannelHandlerContext ctx, String sendStr) { |
|
||||||
// 申请一个数据结构存储信息
|
|
||||||
ByteBuf buffer = ctx.alloc().buffer(); |
|
||||||
// 将信息放入数据结构中
|
|
||||||
buffer.writeBytes(ExchangeStringUtil.hexStrToBinaryStr(sendStr));//对接需要16进制
|
|
||||||
return buffer; |
|
||||||
} |
|
||||||
} |
|
@ -1,57 +0,0 @@ |
|||||||
package com.mh.framework.netty; |
|
||||||
|
|
||||||
import io.netty.bootstrap.ServerBootstrap; |
|
||||||
import io.netty.channel.ChannelFuture; |
|
||||||
import io.netty.channel.ChannelOption; |
|
||||||
import io.netty.channel.nio.NioEventLoopGroup; |
|
||||||
import io.netty.channel.socket.nio.NioServerSocketChannel; |
|
||||||
import lombok.extern.slf4j.Slf4j; |
|
||||||
|
|
||||||
@Slf4j |
|
||||||
public class EchoServer { |
|
||||||
|
|
||||||
private final int port; |
|
||||||
|
|
||||||
public EchoServer(int port) { |
|
||||||
this.port = port; |
|
||||||
} |
|
||||||
|
|
||||||
public void start() { |
|
||||||
// 创建EventLoopGroup
|
|
||||||
NioEventLoopGroup bossGroup = new NioEventLoopGroup(); |
|
||||||
NioEventLoopGroup workerGroup = new NioEventLoopGroup(); |
|
||||||
|
|
||||||
ServerBootstrap serverBootstrap = new ServerBootstrap(); |
|
||||||
serverBootstrap.group(bossGroup, workerGroup) |
|
||||||
.channel(NioServerSocketChannel.class) |
|
||||||
.localAddress(port) |
|
||||||
.option(ChannelOption.SO_BACKLOG, 1204) |
|
||||||
.childHandler(new ServerChannelInitializer()); |
|
||||||
|
|
||||||
// 异步绑定端口
|
|
||||||
ChannelFuture channelFuture = serverBootstrap.bind(); |
|
||||||
|
|
||||||
// 添加监听器处理绑定结果
|
|
||||||
channelFuture.addListener(future -> { |
|
||||||
if (future.isSuccess()) { |
|
||||||
log.info("服务器启动成功,开始监听端口: {}", port); |
|
||||||
} else { |
|
||||||
log.error("服务器启动失败,端口: {}", port, future.cause()); |
|
||||||
bossGroup.shutdownGracefully(); // 绑定失败立即关闭资源
|
|
||||||
workerGroup.shutdownGracefully(); |
|
||||||
} |
|
||||||
}); |
|
||||||
|
|
||||||
// ❌ 移除 sync() 阻塞调用
|
|
||||||
// channelFuture.channel().closeFuture().sync(); --> 删除这行
|
|
||||||
|
|
||||||
// 可选:添加 JVM 关闭钩子优雅关闭资源
|
|
||||||
Runtime.getRuntime().addShutdownHook(new Thread(() -> { |
|
||||||
log.info("JVM 正在关闭,准备释放 Netty 资源..."); |
|
||||||
bossGroup.shutdownGracefully(); |
|
||||||
workerGroup.shutdownGracefully(); |
|
||||||
log.info("Netty 资源已释放"); |
|
||||||
})); |
|
||||||
} |
|
||||||
|
|
||||||
} |
|
@ -1,336 +0,0 @@ |
|||||||
package com.mh.framework.netty; |
|
||||||
|
|
||||||
import com.alibaba.fastjson2.JSONArray; |
|
||||||
import com.alibaba.fastjson2.JSONObject; |
|
||||||
import com.mh.common.constant.Constants; |
|
||||||
import com.mh.common.core.domain.entity.CollectionParamsManage; |
|
||||||
import com.mh.common.core.domain.entity.SysDictData; |
|
||||||
import com.mh.common.core.redis.RedisCache; |
|
||||||
import com.mh.common.model.request.AdvantechDatas; |
|
||||||
import com.mh.common.model.request.AdvantechReceiver; |
|
||||||
import com.mh.common.utils.*; |
|
||||||
import com.mh.common.utils.spring.SpringUtils; |
|
||||||
import com.mh.framework.netty.session.ServerSession; |
|
||||||
import com.mh.framework.netty.session.SessionMap; |
|
||||||
import com.mh.framework.netty.task.CallbackTask; |
|
||||||
//import com.mh.framework.netty.task.CallbackTaskScheduler;
|
|
||||||
import com.mh.framework.netty.task.CallbackTaskScheduler; |
|
||||||
import com.mh.framework.rabbitmq.producer.SendMsgByTopic; |
|
||||||
import com.mh.system.service.device.ICollectionParamsManageService; |
|
||||||
import com.mh.system.service.device.IGatewayManageService; |
|
||||||
import io.netty.buffer.ByteBuf; |
|
||||||
import io.netty.channel.ChannelHandlerContext; |
|
||||||
import io.netty.channel.ChannelInboundHandlerAdapter; |
|
||||||
import io.netty.handler.timeout.IdleState; |
|
||||||
import io.netty.handler.timeout.IdleStateEvent; |
|
||||||
import io.netty.util.ReferenceCountUtil; |
|
||||||
import lombok.extern.slf4j.Slf4j; |
|
||||||
|
|
||||||
import java.math.BigDecimal; |
|
||||||
import java.util.ArrayList; |
|
||||||
import java.util.Date; |
|
||||||
import java.util.List; |
|
||||||
|
|
||||||
@Slf4j |
|
||||||
public class EchoServerHandler extends ChannelInboundHandlerAdapter { |
|
||||||
|
|
||||||
// 调用service层的接口信息
|
|
||||||
IGatewayManageService gatewayManageService = SpringUtils.getBean(IGatewayManageService.class); |
|
||||||
ICollectionParamsManageService collectionParamsManageService = SpringUtils.getBean(ICollectionParamsManageService.class); |
|
||||||
SendMsgByTopic sendMsgByTopic = SpringUtils.getBean(SendMsgByTopic.class); |
|
||||||
RedisCache redisCache = SpringUtils.getBean(RedisCache.class); |
|
||||||
|
|
||||||
/** |
|
||||||
* 空闲次数 |
|
||||||
*/ |
|
||||||
private int idleCount = 1; |
|
||||||
private int count = 0; |
|
||||||
private List<String> orderList; |
|
||||||
private int num = 0; |
|
||||||
private int size = 0; |
|
||||||
private String IP; |
|
||||||
private String port; |
|
||||||
private String receiveStr = ""; |
|
||||||
private List<CollectionParamsManage> deviceCodeParamList; |
|
||||||
|
|
||||||
/** |
|
||||||
* 客户端连接会触发 |
|
||||||
*/ |
|
||||||
@Override |
|
||||||
public void channelActive(ChannelHandlerContext ctx) throws Exception { |
|
||||||
log.info("Channel active......"); |
|
||||||
} |
|
||||||
|
|
||||||
/** |
|
||||||
* 超时处理 |
|
||||||
* 如果120秒没有接受客户端的心跳,就触发; |
|
||||||
* 如果超过3次,则直接关闭; |
|
||||||
*/ |
|
||||||
@Override |
|
||||||
public void userEventTriggered(ChannelHandlerContext ctx, Object obj) throws Exception { |
|
||||||
if (obj instanceof IdleStateEvent) { |
|
||||||
IdleStateEvent event = (IdleStateEvent) obj; |
|
||||||
if (IdleState.READER_IDLE.equals(event.state())) { //如果读通道处于空闲状态,说明没有接收到心跳命令
|
|
||||||
log.info("第{}已经40秒没有接收到客户端的信息了", idleCount); |
|
||||||
receiveStr = ""; |
|
||||||
num = num + 1; |
|
||||||
if (num > size - 1) { |
|
||||||
num = 0; |
|
||||||
// // 关闭连接
|
|
||||||
// ctx.close();
|
|
||||||
// 继续发送下一个采集指令
|
|
||||||
SendOrderUtils.sendAllOrder(deviceCodeParamList.get(num),ctx,num,size); |
|
||||||
} else { |
|
||||||
// 继续发送下一个采集指令
|
|
||||||
SendOrderUtils.sendAllOrder(deviceCodeParamList.get(num), ctx, num, size); |
|
||||||
} |
|
||||||
} |
|
||||||
} else { |
|
||||||
super.userEventTriggered(ctx, obj); |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
// 对于每一个传入的消息都要被调用
|
|
||||||
@Override |
|
||||||
public void channelRead(ChannelHandlerContext ctx, Object msg) { |
|
||||||
try { |
|
||||||
//接收到服务端发来的数据进行业务处理
|
|
||||||
ByteBuf buf = (ByteBuf) msg; |
|
||||||
byte[] bytes = new byte[buf.readableBytes()]; |
|
||||||
buf.readBytes(bytes);//复制内容到字节数组bytes
|
|
||||||
buf.clear(); |
|
||||||
// 截取IP地址
|
|
||||||
IP = ExchangeStringUtil.getMidString(ctx.channel().remoteAddress() + "", "/", ":"); |
|
||||||
// 截取端口号
|
|
||||||
port = ExchangeStringUtil.getMidString(ctx.channel().remoteAddress() + "", ":", ""); |
|
||||||
if (bytes.length <= 1024) { |
|
||||||
//将接收到的数据转为字符串,此字符串就是客户端发送的字符串
|
|
||||||
receiveStr = receiveStr + ExchangeStringUtil.bytesToHexString(bytes);//将接收到的数据转为字符串,此字符串就是客户端发送的字符串
|
|
||||||
receiveStr = receiveStr.replace("null", ""); //去null
|
|
||||||
receiveStr = receiveStr.replace(" ", ""); //去空格
|
|
||||||
//log.info("channelRead接收到的数据:" + receiveStr + ",length:" + receiveStr.length());
|
|
||||||
} |
|
||||||
} catch (Exception e) { |
|
||||||
log.error("channelRead异常", e); |
|
||||||
} finally { |
|
||||||
ReferenceCountUtil.release(msg); |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
// 当前批量读取中的最后一条消息
|
|
||||||
@Override |
|
||||||
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { |
|
||||||
//心跳包报文: 24 00 60 95
|
|
||||||
receiveStr = receiveStr.toUpperCase();//返回值全部变成大写
|
|
||||||
log.info("channelReadComplete接收到的数据{}, 长度: ===> {}", receiveStr, receiveStr.length()); |
|
||||||
//心跳包处理
|
|
||||||
if ((receiveStr.length() == 8) && receiveStr.startsWith("24")) { |
|
||||||
// if ((receiveStr.length() == 8) && receiveStr.startsWith("C0A801FE")) {
|
|
||||||
log.info("接收到心跳包 ===> {}", receiveStr); |
|
||||||
// 开始进行会话保存
|
|
||||||
dealSession(ctx); |
|
||||||
idleCount = 1; |
|
||||||
port = receiveStr.substring(4, 8);//心跳包包含网关端口(自己定义返回心跳包)
|
|
||||||
// 更新对应的网关在线情况
|
|
||||||
gatewayManageService.updateGatewayStatus(receiveStr); |
|
||||||
//根据端口或者IP或者心跳包查询网关对应的项目名称
|
|
||||||
// 生成采集指令
|
|
||||||
if (!SpringUtils.getBean(RedisCache.class).hasKey(receiveStr)) { |
|
||||||
collectionParamsManageService.createDtuCollectionParams(); |
|
||||||
} |
|
||||||
JSONArray arrayCache = SpringUtils.getBean(RedisCache.class).getCacheObject(receiveStr); |
|
||||||
if (StringUtils.isNotNull(arrayCache)) { |
|
||||||
deviceCodeParamList = arrayCache.toList(CollectionParamsManage.class); |
|
||||||
} |
|
||||||
size = deviceCodeParamList.size(); |
|
||||||
log.info("deviceCodeParam size ===> {}", size); |
|
||||||
// 清空receiveStr
|
|
||||||
receiveStr = ""; |
|
||||||
num = 0; |
|
||||||
// 发送采集报文
|
|
||||||
if (size > 0) { |
|
||||||
if (idleCount < 2) { |
|
||||||
Thread.sleep(200); |
|
||||||
SendOrderUtils.sendAllOrder(deviceCodeParamList.get(num), ctx, num, size); |
|
||||||
idleCount++; |
|
||||||
} else { |
|
||||||
ctx.channel().close(); |
|
||||||
} |
|
||||||
} else { |
|
||||||
log.info("gateway not find deviceCodeParam!"); |
|
||||||
} |
|
||||||
} else if (receiveStr.length() == 18) { |
|
||||||
// 水电表、热泵设置返回数据解析
|
|
||||||
idleCount = 1; |
|
||||||
log.info("水电表、热泵设置接收==>{},长度:{}", receiveStr, receiveStr.length()); |
|
||||||
nextSendOrder(ctx); |
|
||||||
} else if (receiveStr.length() == 12 || receiveStr.length() == 14) { |
|
||||||
// 热泵返回数据解析
|
|
||||||
idleCount = 1; |
|
||||||
log.info("热泵读取接收===>{},长度:{}", receiveStr, receiveStr.length()); |
|
||||||
nextSendOrder(ctx); |
|
||||||
} else if (receiveStr.length() == 16) { |
|
||||||
// 热泵设置指令返回
|
|
||||||
// 判断是否有指令发送
|
|
||||||
if (redisCache.hasKey("order_send") && redisCache.getCacheObject("order_send").equals(receiveStr)) { |
|
||||||
NettyTools.setReceiveMsg("order_wait", receiveStr); |
|
||||||
} |
|
||||||
nextSendOrder(ctx); |
|
||||||
} else if (receiveStr.length() > 50 && receiveStr.length() < 100) { |
|
||||||
idleCount = 1; |
|
||||||
// 清空receiveStr
|
|
||||||
nextSendOrder(ctx); |
|
||||||
} |
|
||||||
ctx.flush(); |
|
||||||
} |
|
||||||
|
|
||||||
private void dealSession(ChannelHandlerContext ctx) { |
|
||||||
// 获取表号
|
|
||||||
String deviceCode =receiveStr; |
|
||||||
String meterNum = deviceCode; |
|
||||||
deviceCode = deviceCode + ctx.channel().remoteAddress(); |
|
||||||
//新的session的创建
|
|
||||||
ServerSession session = new ServerSession(ctx.channel(), deviceCode); |
|
||||||
|
|
||||||
//进行登录逻辑处理,异步进行处理。并且需要知道 处理的结果。 callbacktask就要
|
|
||||||
//派上用场了
|
|
||||||
String finalDeviceCode = deviceCode; |
|
||||||
CallbackTaskScheduler.add(new CallbackTask<Boolean>() { |
|
||||||
@Override |
|
||||||
public Boolean execute() throws Exception { |
|
||||||
//进行 login 逻辑的处理
|
|
||||||
return action(session, finalDeviceCode, ctx); |
|
||||||
} |
|
||||||
//没有异常的话,我们进行处理
|
|
||||||
@Override |
|
||||||
public void onBack(Boolean result) { |
|
||||||
if(result) { |
|
||||||
log.info("设备保存会话: 设备号 = " + session.getSessionId()); |
|
||||||
//ctx.pipeline().remove(LoginRequestHandler.class); //压测需要放开
|
|
||||||
} else { |
|
||||||
log.info("设备刷新会话: 设备号 = " + session.getSessionId()); |
|
||||||
SessionMap.inst().updateSession(finalDeviceCode ,session, meterNum); |
|
||||||
//log.info("设备登录失败: 设备号 = " + session.getSessionId());
|
|
||||||
//ServerSession.closeSession(ctx);
|
|
||||||
// 假如说已经在会话中了,直接断开连接
|
|
||||||
//ctx.close();
|
|
||||||
} |
|
||||||
} |
|
||||||
//有异常的话,我们进行处理
|
|
||||||
@Override |
|
||||||
public void onException(Throwable t) { |
|
||||||
log.info("设备登录异常: 设备号 = " + session.getSessionId()); |
|
||||||
ServerSession.closeSession(ctx); |
|
||||||
} |
|
||||||
}); |
|
||||||
} |
|
||||||
|
|
||||||
private void nextSendOrder(ChannelHandlerContext ctx) throws InterruptedException { |
|
||||||
// 发送指令响应不用解析
|
|
||||||
if (receiveStr.length() != 16) { |
|
||||||
// 解析采集的报文,并保存到数据库
|
|
||||||
analysisReceiveData(receiveStr, deviceCodeParamList.get(num)); |
|
||||||
} |
|
||||||
// 清空receiveStr
|
|
||||||
receiveStr = ""; |
|
||||||
// 判断发送的下标,如果不等于指令数组大小
|
|
||||||
num = num + 1; |
|
||||||
if (num > size - 1) { |
|
||||||
num = 0; |
|
||||||
Thread.sleep(1000); |
|
||||||
// 继续发送下一个采集指令
|
|
||||||
SendOrderUtils.sendAllOrder(deviceCodeParamList.get(num), ctx, num, size); |
|
||||||
log.info("------一轮采集完成,继续下一轮--------"); |
|
||||||
} else { |
|
||||||
// 添加一个状态值,判断是否继续发送指令 update by ljf on 2020-08-07
|
|
||||||
if (Constants.WEB_FLAG) { |
|
||||||
num = 0; |
|
||||||
// 关闭连接
|
|
||||||
receiveStr = null; |
|
||||||
ctx.close(); |
|
||||||
} else { |
|
||||||
Thread.sleep(1000); |
|
||||||
// 继续发送下一个采集指令
|
|
||||||
SendOrderUtils.sendAllOrder(deviceCodeParamList.get(num), ctx, num, size); |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
private void analysisReceiveData(final String receiveStr, final CollectionParamsManage deviceCodeParamEntity) { |
|
||||||
AnalysisReceiveOrder485 analysisReceiveOrder485 = new AnalysisReceiveOrder485(); |
|
||||||
String analysisData = ""; |
|
||||||
switch (deviceCodeParamEntity.getParamType()) { |
|
||||||
case "16" -> |
|
||||||
// 电表
|
|
||||||
analysisData = analysisReceiveOrder485.analysisMeterOrder485(receiveStr, deviceCodeParamEntity); |
|
||||||
case "18" -> |
|
||||||
// 水表
|
|
||||||
analysisData = analysisReceiveOrder485.analysisWaterOrder485(receiveStr, deviceCodeParamEntity); |
|
||||||
case "5" -> |
|
||||||
// 热泵故障报警
|
|
||||||
analysisData = analysisReceiveOrder485.analysisHeatPumpOrder485(receiveStr, deviceCodeParamEntity); |
|
||||||
case "2" -> |
|
||||||
// 热泵启停控制
|
|
||||||
analysisData = analysisReceiveOrder485.analysisHeatPumpOrder485(receiveStr, deviceCodeParamEntity); |
|
||||||
case "12" -> |
|
||||||
// 热泵实际温度
|
|
||||||
analysisData = analysisReceiveOrder485.analysisHeatPumpOrder485(receiveStr, deviceCodeParamEntity); |
|
||||||
case "14" -> |
|
||||||
// 热泵读取温度设置
|
|
||||||
analysisData = analysisReceiveOrder485.analysisHeatPumpOrder485(receiveStr, deviceCodeParamEntity); |
|
||||||
default -> { |
|
||||||
log.info("设备类型错误"); |
|
||||||
return; |
|
||||||
} |
|
||||||
} |
|
||||||
if (analysisData.isEmpty()) { |
|
||||||
log.info("解析数据为空"); |
|
||||||
return; |
|
||||||
} |
|
||||||
// 格式化数据,配置成研华网关 AdvantechReceiver
|
|
||||||
AdvantechReceiver advantechReceiver = new AdvantechReceiver(); |
|
||||||
advantechReceiver.setTs(DateUtils.dateToString(new Date(), Constants.DATE_FORMAT)); |
|
||||||
List<AdvantechDatas> advantechDatas = new ArrayList<>(); |
|
||||||
AdvantechDatas datas = new AdvantechDatas(); |
|
||||||
datas.setValue(new BigDecimal(analysisData)); |
|
||||||
datas.setTag(deviceCodeParamEntity.getOtherName()); |
|
||||||
advantechDatas.add(datas); |
|
||||||
advantechReceiver.setD(advantechDatas); |
|
||||||
sendMsgByTopic.sendToDeviceMQ(JSONObject.toJSONString(advantechReceiver)); |
|
||||||
} |
|
||||||
|
|
||||||
// 异常捕捉
|
|
||||||
@Override |
|
||||||
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { |
|
||||||
cause.getCause().printStackTrace(); |
|
||||||
log.info("异常捕捉,执行ctx.close" + cause.getCause()); |
|
||||||
ctx.close(); // 关闭该Channel
|
|
||||||
} |
|
||||||
|
|
||||||
// 客户端断开
|
|
||||||
@Override |
|
||||||
public void channelInactive(ChannelHandlerContext ctx) throws Exception { |
|
||||||
ctx.close();// 关闭流
|
|
||||||
log.info("客户端断开,执行ctx.close()......"); |
|
||||||
} |
|
||||||
|
|
||||||
private boolean action(ServerSession session, String deviceCode, ChannelHandlerContext ctx) { |
|
||||||
//user验证
|
|
||||||
boolean isValidUser = checkUser(deviceCode,session); |
|
||||||
session.bind(); |
|
||||||
return true; |
|
||||||
} |
|
||||||
|
|
||||||
private boolean checkUser(String deviceCode,ServerSession session) { |
|
||||||
//当前用户已经登录
|
|
||||||
if(SessionMap.inst().hasLogin(deviceCode)) { |
|
||||||
log.info("设备已经登录: 设备号 = " + deviceCode); |
|
||||||
return false; |
|
||||||
} |
|
||||||
//一般情况下,我们会将 user存储到 DB中,然后对user的用户名和密码进行校验
|
|
||||||
//但是,我们这边没有进行db的集成,所以我们想一个别的办法进行user的校验。在我们的sessionMap进行以下校验
|
|
||||||
//为什么选sessionmap,因为我们user的会话,都是存储到sessionmap中的,sessionmap中只要有这个user的会话,说明就是ok的
|
|
||||||
return true; |
|
||||||
} |
|
||||||
|
|
||||||
} |
|
@ -1,18 +0,0 @@ |
|||||||
package com.mh.framework.netty; |
|
||||||
|
|
||||||
import com.mh.common.core.domain.entity.OrderEntity; |
|
||||||
|
|
||||||
import java.util.List; |
|
||||||
|
|
||||||
/** |
|
||||||
* @author LJF |
|
||||||
* @version 1.0 |
|
||||||
* @project EEMCS |
|
||||||
* @description netty |
|
||||||
* @date 2025-06-06 15:13:06 |
|
||||||
*/ |
|
||||||
public interface INettyService { |
|
||||||
|
|
||||||
boolean sendOrder(List<OrderEntity> changeValues); |
|
||||||
|
|
||||||
} |
|
@ -1,119 +0,0 @@ |
|||||||
package com.mh.framework.netty; |
|
||||||
|
|
||||||
import com.mh.common.core.domain.AjaxResult; |
|
||||||
import com.mh.common.core.domain.entity.CollectionParamsManage; |
|
||||||
import com.mh.common.core.domain.entity.GatewayManage; |
|
||||||
import com.mh.common.core.domain.entity.OrderEntity; |
|
||||||
import com.mh.common.core.redis.RedisCache; |
|
||||||
import com.mh.common.core.redis.RedisLock; |
|
||||||
import com.mh.common.utils.ModbusUtils; |
|
||||||
import com.mh.common.utils.NettyTools; |
|
||||||
import com.mh.common.utils.StringUtils; |
|
||||||
import com.mh.framework.netty.session.ServerSession; |
|
||||||
import com.mh.framework.netty.session.SessionMap; |
|
||||||
import com.mh.system.mapper.device.CollectionParamsManageMapper; |
|
||||||
import com.mh.system.mapper.device.GatewayManageMapper; |
|
||||||
import jakarta.annotation.Resource; |
|
||||||
import lombok.extern.slf4j.Slf4j; |
|
||||||
import org.springframework.stereotype.Service; |
|
||||||
|
|
||||||
import java.util.List; |
|
||||||
import java.util.Map; |
|
||||||
import java.util.Set; |
|
||||||
import java.util.UUID; |
|
||||||
import java.util.concurrent.ConcurrentHashMap; |
|
||||||
import java.util.concurrent.TimeUnit; |
|
||||||
|
|
||||||
/** |
|
||||||
* @author LJF |
|
||||||
* @version 1.0 |
|
||||||
* @project EEMCS |
|
||||||
* @description netty实现类 |
|
||||||
* @date 2025-06-06 15:13:23 |
|
||||||
*/ |
|
||||||
@Slf4j |
|
||||||
@Service |
|
||||||
public class NettyServiceImpl implements INettyService { |
|
||||||
|
|
||||||
@Resource |
|
||||||
private CollectionParamsManageMapper collectionParamsManageMapper; |
|
||||||
|
|
||||||
@Resource |
|
||||||
private GatewayManageMapper gatewayManageMapper; |
|
||||||
|
|
||||||
@Resource |
|
||||||
private RedisCache redisCache; |
|
||||||
|
|
||||||
@Resource |
|
||||||
private RedisLock redisLock; |
|
||||||
|
|
||||||
@Override |
|
||||||
public boolean sendOrder(List<OrderEntity> changeValues) { |
|
||||||
for (OrderEntity changeValue : changeValues) { |
|
||||||
String cpmId = changeValue.getId(); |
|
||||||
CollectionParamsManage collectionParamsManage = collectionParamsManageMapper.selectById(cpmId); |
|
||||||
if (null == collectionParamsManage) { |
|
||||||
return false; |
|
||||||
} |
|
||||||
GatewayManage gatewayManage = gatewayManageMapper.selectById(collectionParamsManage.getGatewayId()); |
|
||||||
if (null == gatewayManage || StringUtils.isEmpty(gatewayManage.getHeartBeat())) { |
|
||||||
return false; |
|
||||||
} |
|
||||||
ConcurrentHashMap<String, ServerSession> map = SessionMap.inst().getMap(); |
|
||||||
Set<Map.Entry<String, ServerSession>> entries = map.entrySet(); |
|
||||||
boolean flag = false; |
|
||||||
String keyVal = null; |
|
||||||
for (Map.Entry<String, ServerSession> entry : entries) { |
|
||||||
String key = entry.getKey(); |
|
||||||
if (key.contains(gatewayManage.getHeartBeat())){ |
|
||||||
flag = true; |
|
||||||
keyVal = key; |
|
||||||
break; |
|
||||||
} |
|
||||||
} |
|
||||||
if (flag) { |
|
||||||
ServerSession serverSession = map.get(keyVal); |
|
||||||
// 目前只有DTU,modbus方式,只创建modbus先
|
|
||||||
String controlCode = ModbusUtils.createControlCode(collectionParamsManage.getMtCode(), |
|
||||||
changeValue.getType(), |
|
||||||
collectionParamsManage.getRegisterAddr(), |
|
||||||
changeValue.getParam()); |
|
||||||
if (StringUtils.isEmpty(controlCode)) { |
|
||||||
log.error("创建控制码失败"); |
|
||||||
return false; |
|
||||||
} |
|
||||||
|
|
||||||
String requestId = UUID.randomUUID().toString(); // 唯一标识当前请求
|
|
||||||
String lockKey = "lock:order_send:" + gatewayManage.getHeartBeat(); // 按网关分锁
|
|
||||||
|
|
||||||
try { |
|
||||||
if (!redisLock.tryLock(lockKey, requestId, 10, 10)) { |
|
||||||
log.warn("获取锁失败,当前操作繁忙"); |
|
||||||
return false; |
|
||||||
} |
|
||||||
// 初始化发送指令
|
|
||||||
NettyTools.initReceiveMsg("order_wait"); |
|
||||||
// 设置缓存,方便在netty中判断发送的指令
|
|
||||||
redisCache.setCacheObject("order_send", controlCode, 10, TimeUnit.SECONDS); |
|
||||||
// 发送指令
|
|
||||||
serverSession.getChannel().writeAndFlush(ModbusUtils.createByteBuf(controlCode)); |
|
||||||
// 等待指令
|
|
||||||
if (NettyTools.waitReceiveMsg("order_wait")) { |
|
||||||
log.error("发送指令成功,心跳包:{}", gatewayManage.getHeartBeat()); |
|
||||||
return true; |
|
||||||
} else { |
|
||||||
log.error("发送指令异常,心跳包:{}", gatewayManage.getHeartBeat()); |
|
||||||
return false; |
|
||||||
} |
|
||||||
} catch (InterruptedException e) { |
|
||||||
log.error("发送指令异常", e); |
|
||||||
} finally { |
|
||||||
redisLock.unlock(lockKey, requestId); |
|
||||||
} |
|
||||||
} |
|
||||||
log.error("当前设备不在线,心跳包:{}",gatewayManage.getHeartBeat()); |
|
||||||
return false; |
|
||||||
} |
|
||||||
return false; |
|
||||||
} |
|
||||||
} |
|
@ -1,33 +0,0 @@ |
|||||||
package com.mh.framework.netty; |
|
||||||
|
|
||||||
import io.netty.channel.ChannelInitializer; |
|
||||||
import io.netty.channel.ChannelPipeline; |
|
||||||
import io.netty.channel.socket.SocketChannel; |
|
||||||
import io.netty.handler.timeout.IdleStateHandler; |
|
||||||
|
|
||||||
import java.util.concurrent.TimeUnit; |
|
||||||
|
|
||||||
public class ServerChannelInitializer extends ChannelInitializer<SocketChannel>{ |
|
||||||
|
|
||||||
@Override |
|
||||||
protected void initChannel(SocketChannel socketChannel) throws Exception { |
|
||||||
ChannelPipeline pipeline = socketChannel.pipeline(); |
|
||||||
|
|
||||||
/* LineBasedFrameDecoder的工作原理是:依次遍历ByteBuf中的可读字节, |
|
||||||
判断看其是否有”\n” 或 “\r\n”, 如果有就以此位置为结束位置。 |
|
||||||
从可读索引到结束位置的区间的字节就组成了一行。 它是以换行符为结束标志的解码器, |
|
||||||
支持携带结束符和不带结束符两种解码方式,同时支持配置单行的最大长度, |
|
||||||
如果读到了最大长度之后仍然没有发现换行符,则抛出异常,同时忽略掉之前读到的异常码流。*/ |
|
||||||
// pipeline.addLast(new LineBasedFrameDecoder(10010));
|
|
||||||
//字符串解码和编码
|
|
||||||
//LineBasedFrameDecoder + StringDecoder 就是一个按行切换的文本解码器。
|
|
||||||
// pipeline.addLast( new StringDecoder());
|
|
||||||
// pipeline.addLast( new StringEncoder());
|
|
||||||
// 设置读写超时操作
|
|
||||||
// 入参说明: 读超时时间、写超时时间、所有类型的超时时间、时间格式
|
|
||||||
pipeline.addLast(new IdleStateHandler(40, 40, 40, TimeUnit.SECONDS)); |
|
||||||
//服务器的逻辑
|
|
||||||
pipeline.addLast("handler", new EchoServerHandler()); |
|
||||||
} |
|
||||||
|
|
||||||
} |
|
@ -1,66 +0,0 @@ |
|||||||
package com.mh.framework.netty.session; |
|
||||||
|
|
||||||
import io.netty.channel.Channel; |
|
||||||
import io.netty.channel.ChannelFuture; |
|
||||||
import io.netty.channel.ChannelFutureListener; |
|
||||||
import io.netty.channel.ChannelHandlerContext; |
|
||||||
import io.netty.util.AttributeKey; |
|
||||||
import lombok.Data; |
|
||||||
import lombok.extern.slf4j.Slf4j; |
|
||||||
|
|
||||||
@Data |
|
||||||
@Slf4j |
|
||||||
public class ServerSession { |
|
||||||
public static final AttributeKey<ServerSession> SESSION_KEY = |
|
||||||
AttributeKey.valueOf("SESSION_KEY"); |
|
||||||
//通道
|
|
||||||
private Channel channel; |
|
||||||
private final String sessionId; |
|
||||||
private boolean isLogin = false; |
|
||||||
|
|
||||||
public ServerSession(Channel channel, String deviceCode){ |
|
||||||
this.channel = channel; |
|
||||||
this.sessionId = deviceCode; |
|
||||||
} |
|
||||||
|
|
||||||
//session需要和通道进行一定的关联,他是在构造函数中关联上的;
|
|
||||||
//session还需要通过sessionkey和channel进行再次的关联;channel.attr方法.set当前的
|
|
||||||
// serverSession
|
|
||||||
//session需要被添加到我们的SessionMap中
|
|
||||||
public void bind(){ |
|
||||||
log.info("server Session 会话进行绑定 :" + channel.remoteAddress()); |
|
||||||
channel.attr(SESSION_KEY).set(this); |
|
||||||
SessionMap.inst().addSession(sessionId, this); |
|
||||||
this.isLogin = true; |
|
||||||
} |
|
||||||
|
|
||||||
//通过channel获取session
|
|
||||||
public static ServerSession getSession(ChannelHandlerContext ctx){ |
|
||||||
Channel channel = ctx.channel(); |
|
||||||
return channel.attr(SESSION_KEY).get(); |
|
||||||
} |
|
||||||
|
|
||||||
//关闭session,新增返回一个meterNum用于纪录设备下线时间2024-05-08
|
|
||||||
public static String closeSession(ChannelHandlerContext ctx){ |
|
||||||
String meterNum = null; |
|
||||||
ServerSession serverSession = ctx.channel().attr(SESSION_KEY).get(); |
|
||||||
if(serverSession != null && serverSession.getSessionId() != null) { |
|
||||||
ChannelFuture future = serverSession.channel.close(); |
|
||||||
future.addListener((ChannelFutureListener) future1 -> { |
|
||||||
if(!future1.isSuccess()) { |
|
||||||
log.info("Channel close error!"); |
|
||||||
} |
|
||||||
}); |
|
||||||
ctx.close(); |
|
||||||
meterNum = serverSession.sessionId; |
|
||||||
SessionMap.inst().removeSession(serverSession.sessionId); |
|
||||||
log.info(ctx.channel().remoteAddress()+" "+serverSession.sessionId + "==>移除会话"); |
|
||||||
} |
|
||||||
return meterNum; |
|
||||||
} |
|
||||||
|
|
||||||
//写消息
|
|
||||||
public void writeAndFlush(Object msg) { |
|
||||||
channel.writeAndFlush(msg); |
|
||||||
} |
|
||||||
} |
|
@ -1,96 +0,0 @@ |
|||||||
package com.mh.framework.netty.session; |
|
||||||
|
|
||||||
import lombok.Data; |
|
||||||
import lombok.extern.slf4j.Slf4j; |
|
||||||
|
|
||||||
import java.util.Iterator; |
|
||||||
import java.util.List; |
|
||||||
import java.util.Map; |
|
||||||
import java.util.concurrent.ConcurrentHashMap; |
|
||||||
import java.util.stream.Collectors; |
|
||||||
|
|
||||||
@Data |
|
||||||
@Slf4j |
|
||||||
public class SessionMap { |
|
||||||
|
|
||||||
private ThreadLocal<Boolean> sceneThreadLocal = new ThreadLocal<>(); |
|
||||||
|
|
||||||
//用单例模式进行sessionMap的创建
|
|
||||||
private SessionMap(){} |
|
||||||
|
|
||||||
private static SessionMap singleInstance = new SessionMap(); |
|
||||||
|
|
||||||
public static SessionMap inst() { |
|
||||||
return singleInstance; |
|
||||||
} |
|
||||||
|
|
||||||
//进行会话的保存
|
|
||||||
//key 我们使用 sessionId;value 需要是 serverSession
|
|
||||||
private ConcurrentHashMap<String, ServerSession> map = new ConcurrentHashMap<>(256); |
|
||||||
//添加session
|
|
||||||
public void addSession(String sessionId, ServerSession s) { |
|
||||||
map.put(sessionId, s); |
|
||||||
log.info("IP地址:"+s.getChannel().remoteAddress()+" "+ sessionId + " 表具上线,总共表具:" + map.size()); |
|
||||||
} |
|
||||||
|
|
||||||
//删除session
|
|
||||||
public void removeSession(String sessionId) { |
|
||||||
if(map.containsKey(sessionId)) { |
|
||||||
ServerSession s = map.get(sessionId); |
|
||||||
map.remove(sessionId); |
|
||||||
log.info("设备id下线:{},在线设备:{}", s.getSessionId(), map.size() ); |
|
||||||
} |
|
||||||
return; |
|
||||||
} |
|
||||||
|
|
||||||
public boolean hasLogin(String sessionId) { |
|
||||||
Iterator<Map.Entry<String, ServerSession>> iterator = map.entrySet().iterator(); |
|
||||||
while(iterator.hasNext()) { |
|
||||||
Map.Entry<String, ServerSession> next = iterator.next(); |
|
||||||
if(sessionId != null && sessionId.equalsIgnoreCase(next.getValue().getSessionId())) { |
|
||||||
return true ; |
|
||||||
} |
|
||||||
} |
|
||||||
return false; |
|
||||||
} |
|
||||||
|
|
||||||
//如果在线,肯定有sessionMap里保存的 serverSession
|
|
||||||
//如果不在线,serverSession也没有。用这个来判断是否在线
|
|
||||||
public List<ServerSession> getSessionBy(String sessionId) { |
|
||||||
return map.values().stream(). |
|
||||||
filter(s -> s.getSessionId().equals(sessionId)). |
|
||||||
collect(Collectors.toList()); |
|
||||||
} |
|
||||||
|
|
||||||
public boolean getScene() { |
|
||||||
return sceneThreadLocal.get(); |
|
||||||
} |
|
||||||
|
|
||||||
public void initScene(Boolean status) { |
|
||||||
if (sceneThreadLocal == null) { |
|
||||||
log.info("======创建ThreadLocal======"); |
|
||||||
sceneThreadLocal = new ThreadLocal<>(); |
|
||||||
} |
|
||||||
log.info("设置状态==>" + status); |
|
||||||
sceneThreadLocal.set(status); |
|
||||||
} |
|
||||||
|
|
||||||
public void clearScene() { |
|
||||||
initScene(null); |
|
||||||
sceneThreadLocal.remove(); |
|
||||||
} |
|
||||||
|
|
||||||
public void updateSession(String sessionId, ServerSession session, String meterNum) { |
|
||||||
Iterator<Map.Entry<String, ServerSession>> iterator = map.entrySet().iterator(); |
|
||||||
while(iterator.hasNext()) { |
|
||||||
Map.Entry<String, ServerSession> next = iterator.next(); |
|
||||||
if (next.getKey().contains(meterNum)){ |
|
||||||
iterator.remove(); |
|
||||||
} |
|
||||||
if(sessionId != null && sessionId.equalsIgnoreCase(next.getValue().getSessionId())) { |
|
||||||
next.setValue(session); |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
} |
|
@ -1,20 +0,0 @@ |
|||||||
package com.mh.framework.netty.task; |
|
||||||
|
|
||||||
/** |
|
||||||
* @author LJF |
|
||||||
* @version 1.0 |
|
||||||
* @project TAD_Server |
|
||||||
* @description 回调任务 |
|
||||||
* @date 2023/7/3 15:34:11 |
|
||||||
*/ |
|
||||||
public interface CallbackTask<T> { |
|
||||||
T execute() throws Exception; |
|
||||||
|
|
||||||
/** |
|
||||||
* // 执行没有 异常的情况下的 返回值
|
|
||||||
* @param t |
|
||||||
*/ |
|
||||||
void onBack(T t); |
|
||||||
|
|
||||||
void onException(Throwable t); |
|
||||||
} |
|
@ -1,78 +0,0 @@ |
|||||||
package com.mh.framework.netty.task; |
|
||||||
|
|
||||||
import com.google.common.util.concurrent.*; |
|
||||||
|
|
||||||
import java.util.concurrent.Callable; |
|
||||||
import java.util.concurrent.ConcurrentLinkedQueue; |
|
||||||
import java.util.concurrent.ExecutorService; |
|
||||||
import java.util.concurrent.Executors; |
|
||||||
|
|
||||||
/** |
|
||||||
* @author LJF |
|
||||||
* @version 1.0 |
|
||||||
* @project TAD_Server |
|
||||||
* @description 回调任务 |
|
||||||
* @date 2023/7/3 15:34:11 |
|
||||||
*/ |
|
||||||
public class CallbackTaskScheduler extends Thread { |
|
||||||
private ConcurrentLinkedQueue<CallbackTask> executeTaskQueue = |
|
||||||
new ConcurrentLinkedQueue<>(); |
|
||||||
private long sleepTime = 1000 * 10; |
|
||||||
private final ExecutorService pool = Executors.newCachedThreadPool(); |
|
||||||
ListeningExecutorService lpool = MoreExecutors.listeningDecorator(pool); |
|
||||||
private static CallbackTaskScheduler inst = new CallbackTaskScheduler(); |
|
||||||
private CallbackTaskScheduler() { |
|
||||||
this.start(); |
|
||||||
} |
|
||||||
//add task
|
|
||||||
public static <T> void add(CallbackTask<T> executeTask) { |
|
||||||
inst.executeTaskQueue.add(executeTask); |
|
||||||
} |
|
||||||
|
|
||||||
@Override |
|
||||||
public void run() { |
|
||||||
while (true) { |
|
||||||
handleTask(); |
|
||||||
//为了避免频繁连接服务器,但是当前连接服务器过长导致失败
|
|
||||||
//threadSleep(sleepTime);
|
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
private void threadSleep(long sleepTime) { |
|
||||||
try { |
|
||||||
Thread.sleep(sleepTime); |
|
||||||
}catch (Exception e) { |
|
||||||
e.printStackTrace(); |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
//任务执行
|
|
||||||
private void handleTask() { |
|
||||||
CallbackTask executeTask = null; |
|
||||||
while (executeTaskQueue.peek() != null) { |
|
||||||
executeTask = executeTaskQueue.poll(); |
|
||||||
handleTask(executeTask); |
|
||||||
} |
|
||||||
} |
|
||||||
private <T> void handleTask(CallbackTask<T> executeTask) { |
|
||||||
ListenableFuture<T> future = lpool.submit(new Callable<T>() { |
|
||||||
public T call() throws Exception { |
|
||||||
return executeTask.execute(); |
|
||||||
} |
|
||||||
}); |
|
||||||
Futures.addCallback(future, new FutureCallback<T>() { |
|
||||||
@Override |
|
||||||
public void onSuccess(T t) { |
|
||||||
executeTask.onBack(t); |
|
||||||
} |
|
||||||
|
|
||||||
@Override |
|
||||||
public void onFailure(Throwable throwable) { |
|
||||||
executeTask.onException(throwable); |
|
||||||
} |
|
||||||
|
|
||||||
|
|
||||||
}, pool); |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
@ -1,6 +0,0 @@ |
|||||||
package com.mh.framework.netty.task; |
|
||||||
|
|
||||||
//不需要知道异步线程的 返回值
|
|
||||||
public interface ExecuteTask { |
|
||||||
void execute(); |
|
||||||
} |
|
@ -1,67 +0,0 @@ |
|||||||
package com.mh.framework.netty.task; |
|
||||||
|
|
||||||
import java.util.concurrent.ConcurrentLinkedQueue; |
|
||||||
import java.util.concurrent.ExecutorService; |
|
||||||
import java.util.concurrent.Executors; |
|
||||||
|
|
||||||
/** |
|
||||||
* @author LJF |
|
||||||
* @version 1.0 |
|
||||||
* @project TAD_Server |
|
||||||
* @description 任务定时 |
|
||||||
* @date 2023/7/3 15:34:11 |
|
||||||
*/ |
|
||||||
public class FutureTaskScheduler extends Thread{ |
|
||||||
private ConcurrentLinkedQueue<ExecuteTask> executeTaskQueue = |
|
||||||
new ConcurrentLinkedQueue<>(); |
|
||||||
private long sleepTime = 200; |
|
||||||
private ExecutorService pool = Executors.newFixedThreadPool(10); |
|
||||||
private static FutureTaskScheduler inst = new FutureTaskScheduler(); |
|
||||||
public FutureTaskScheduler() { |
|
||||||
this.start(); |
|
||||||
} |
|
||||||
//任务添加
|
|
||||||
public static void add(ExecuteTask executeTask) { |
|
||||||
inst.executeTaskQueue.add(executeTask); |
|
||||||
} |
|
||||||
|
|
||||||
@Override |
|
||||||
public void run() { |
|
||||||
while (true) { |
|
||||||
handleTask(); |
|
||||||
//threadSleep(sleepTime);
|
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
private void threadSleep(long sleepTime) { |
|
||||||
try { |
|
||||||
Thread.sleep(sleepTime); |
|
||||||
} catch (InterruptedException e) { |
|
||||||
e.printStackTrace(); |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
//执行任务
|
|
||||||
private void handleTask() { |
|
||||||
ExecuteTask executeTask; |
|
||||||
while (executeTaskQueue.peek() != null) { |
|
||||||
executeTask = executeTaskQueue.poll(); |
|
||||||
handleTask(executeTask); |
|
||||||
} |
|
||||||
//刷新心跳时间
|
|
||||||
} |
|
||||||
private void handleTask(ExecuteTask executeTask) { |
|
||||||
pool.execute(new ExecuteRunnable(executeTask)); |
|
||||||
} |
|
||||||
|
|
||||||
class ExecuteRunnable implements Runnable { |
|
||||||
ExecuteTask executeTask; |
|
||||||
public ExecuteRunnable(ExecuteTask executeTask) { |
|
||||||
this.executeTask = executeTask; |
|
||||||
} |
|
||||||
@Override |
|
||||||
public void run() { |
|
||||||
executeTask.execute(); |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
@ -1,11 +0,0 @@ |
|||||||
package com.mh.quartz.domain; |
|
||||||
|
|
||||||
/** |
|
||||||
* @Classname FuzzyLevel |
|
||||||
* Todo: |
|
||||||
* @Date 2025-05-31 14:19 |
|
||||||
* @Created by LJF |
|
||||||
*/ |
|
||||||
public enum FuzzyLevel { |
|
||||||
NB, NM, NS, ZO, PS, PM, PB; // 极小,较小,小,零,稍大,较大,极大
|
|
||||||
} |
|
@ -1,52 +0,0 @@ |
|||||||
package com.mh.quartz.domain; |
|
||||||
|
|
||||||
/** |
|
||||||
* @author LJF |
|
||||||
* @version 1.0 |
|
||||||
* @project EEMCS |
|
||||||
* @description PID参数 |
|
||||||
* @date 2025-05-30 13:51:22 |
|
||||||
*/ |
|
||||||
public class PIDParams { |
|
||||||
|
|
||||||
private volatile double kp; // 比例系数
|
|
||||||
private volatile double ki; // 积分系数
|
|
||||||
private volatile double kd; // 微分系数
|
|
||||||
|
|
||||||
public PIDParams(double kp, double ki, double kd) { |
|
||||||
this.kp = kp; |
|
||||||
this.ki = ki; |
|
||||||
this.kd = kd; |
|
||||||
} |
|
||||||
|
|
||||||
// 动态更新PID参数
|
|
||||||
public void updateParams(double kp, double ki, double kd) { |
|
||||||
this.kp = kp; |
|
||||||
this.ki = ki; |
|
||||||
this.kd = kd; |
|
||||||
} |
|
||||||
|
|
||||||
public double getKp() { |
|
||||||
return kp; |
|
||||||
} |
|
||||||
|
|
||||||
public void setKp(double kp) { |
|
||||||
this.kp = kp; |
|
||||||
} |
|
||||||
|
|
||||||
public double getKi() { |
|
||||||
return ki; |
|
||||||
} |
|
||||||
|
|
||||||
public void setKi(double ki) { |
|
||||||
this.ki = ki; |
|
||||||
} |
|
||||||
|
|
||||||
public double getKd() { |
|
||||||
return kd; |
|
||||||
} |
|
||||||
|
|
||||||
public void setKd(double kd) { |
|
||||||
this.kd = kd; |
|
||||||
} |
|
||||||
} |
|
@ -1,257 +0,0 @@ |
|||||||
package com.mh.quartz.task; |
|
||||||
|
|
||||||
import com.mh.common.config.MHConfig; |
|
||||||
import com.mh.common.core.domain.entity.CollectionParamsManage; |
|
||||||
import com.mh.common.core.domain.entity.CpmSpaceRelation; |
|
||||||
import com.mh.common.core.domain.entity.OrderEntity; |
|
||||||
import com.mh.common.core.domain.entity.PolicyManage; |
|
||||||
import com.mh.common.utils.DateUtils; |
|
||||||
import com.mh.framework.mqtt.service.IMqttGatewayService; |
|
||||||
import com.mh.quartz.util.AHUPIDControlUtil; |
|
||||||
import com.mh.quartz.util.FuzzyPIDControlUtil; |
|
||||||
import com.mh.system.service.device.ICollectionParamsManageService; |
|
||||||
import com.mh.system.service.operation.IOperationDeviceService; |
|
||||||
import com.mh.system.service.policy.IPolicyManageService; |
|
||||||
import com.mh.system.service.space.ICpmSpaceRelationService; |
|
||||||
import lombok.extern.slf4j.Slf4j; |
|
||||||
import org.springframework.beans.factory.annotation.Autowired; |
|
||||||
import org.springframework.beans.factory.annotation.Value; |
|
||||||
import org.springframework.stereotype.Component; |
|
||||||
|
|
||||||
import java.math.BigDecimal; |
|
||||||
import java.time.LocalTime; |
|
||||||
import java.util.*; |
|
||||||
import java.util.stream.Collectors; |
|
||||||
|
|
||||||
/** |
|
||||||
* @author LJF |
|
||||||
* @version 1.0 |
|
||||||
* @project EEMCS |
|
||||||
* @description 风柜系统任务 |
|
||||||
* @date 2025-05-30 08:36:43 |
|
||||||
*/ |
|
||||||
@Slf4j |
|
||||||
@Component("ahuTask") |
|
||||||
public class AHUTask { |
|
||||||
|
|
||||||
@Value("${control.topic}") |
|
||||||
String controlTopic; |
|
||||||
|
|
||||||
@Autowired |
|
||||||
private MHConfig mhConfig; |
|
||||||
|
|
||||||
private final ICollectionParamsManageService collectionParamsManageService; |
|
||||||
|
|
||||||
private final IPolicyManageService policyManageService; |
|
||||||
|
|
||||||
private final ICpmSpaceRelationService cpmSpaceRelationService; |
|
||||||
|
|
||||||
private final IOperationDeviceService iOperationService; |
|
||||||
|
|
||||||
private final IMqttGatewayService iMqttGatewayService; |
|
||||||
|
|
||||||
// 在 AHUTask 类中添加一个 PID 控制器成员变量
|
|
||||||
private final Map<String, FuzzyPIDControlUtil> pidControllers = new HashMap<>(); |
|
||||||
|
|
||||||
@Autowired |
|
||||||
public AHUTask(ICollectionParamsManageService collectionParamsManageService, IPolicyManageService policyManageService, ICpmSpaceRelationService cpmSpaceRelationService, IOperationDeviceService iOperationService, IMqttGatewayService iMqttGatewayService) { |
|
||||||
this.collectionParamsManageService = collectionParamsManageService; |
|
||||||
this.policyManageService = policyManageService; |
|
||||||
this.cpmSpaceRelationService = cpmSpaceRelationService; |
|
||||||
this.iOperationService = iOperationService; |
|
||||||
this.iMqttGatewayService = iMqttGatewayService; |
|
||||||
} |
|
||||||
|
|
||||||
|
|
||||||
public void sendOrderToMqtt(List<OrderEntity> changeValues) { |
|
||||||
try { |
|
||||||
String sendOrder = iOperationService.operationDevice(changeValues); |
|
||||||
String name = mhConfig.getName(); |
|
||||||
// 获取mqtt操作队列(后期通过mqtt队列配置发送主题)
|
|
||||||
log.info("发送主题:{},消息:{}", name + "/" + controlTopic, sendOrder); |
|
||||||
iMqttGatewayService.publish(name + "/" + controlTopic, sendOrder, 1); |
|
||||||
} catch (Exception e) { |
|
||||||
log.error("设备操作失败", e); |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
public void adjustWaterValve(String kp, String ki, String kd) { |
|
||||||
// 西餐走廊2、宴会走廊需要调整PID参数,其他的ddc自己已经处理好
|
|
||||||
String[] deviceLedgerIds = new String[]{"ddc0083b3a898d85f3a1205a2d82071e100", "ddc0133b3a898d85f3a1205a2d82071e100"}; |
|
||||||
for (String deviceLedgerId : deviceLedgerIds) { |
|
||||||
// 获取西餐走廊2的启停控制
|
|
||||||
HashMap<String, Object> queryMap = new HashMap<>(); |
|
||||||
queryMap.put("systemType", "2"); |
|
||||||
queryMap.put("deviceLedgerId", deviceLedgerId); |
|
||||||
queryMap.put("isUse", 0); |
|
||||||
// 得出 systemType =2 的数据
|
|
||||||
List<CollectionParamsManage> collectionParamsManages = collectionParamsManageService.selectListByParams(queryMap); |
|
||||||
|
|
||||||
// 过滤得出启停状态
|
|
||||||
Optional<CollectionParamsManage> first = collectionParamsManages.stream().filter(item -> item.getCurValue().intValue() == 1 && item.getParamType().equals("2")).findFirst(); |
|
||||||
if (first.isEmpty()) { |
|
||||||
continue; |
|
||||||
} |
|
||||||
|
|
||||||
// 过滤获取回风温度设置值
|
|
||||||
Optional<CollectionParamsManage> second = collectionParamsManages |
|
||||||
.stream() |
|
||||||
.filter(item -> item.getOtherName().contains("回风温度") |
|
||||||
&& item.getParamType().equals("14")).findFirst(); |
|
||||||
if (second.isEmpty()) { |
|
||||||
continue; |
|
||||||
} |
|
||||||
// 得出回风温度设置值
|
|
||||||
double backTempSet = second.get().getCurValue().doubleValue(); |
|
||||||
|
|
||||||
// 设定目标温度(夏季制冷24℃)
|
|
||||||
// ✅ 如果没有该设备的控制器,则创建一个新的并保存起来
|
|
||||||
FuzzyPIDControlUtil controller = pidControllers.computeIfAbsent(deviceLedgerId, k -> new FuzzyPIDControlUtil(kp, ki, kd)); |
|
||||||
|
|
||||||
log.info("开始模糊PID控制循环,查看对象是否有变化:{}", controller); |
|
||||||
|
|
||||||
// 过滤获取当前回风温度
|
|
||||||
Optional<CollectionParamsManage> third = collectionParamsManages |
|
||||||
.stream() |
|
||||||
.filter(item -> item.getOtherName().contains("回风温度") |
|
||||||
&& item.getParamType().equals("12")).findFirst(); |
|
||||||
if (third.isEmpty()) { |
|
||||||
continue; |
|
||||||
} |
|
||||||
// 得出当前回风温度
|
|
||||||
double temp = third.get().getCurValue().doubleValue(); |
|
||||||
|
|
||||||
// 2. 计算水阀开度(时间间隔1秒)
|
|
||||||
double valveOpening1 = controller.calculate(backTempSet, temp, 1); |
|
||||||
int valveOpening = new BigDecimal(valveOpening1).intValue(); |
|
||||||
|
|
||||||
// 过滤获取水阀调节参数
|
|
||||||
Optional<CollectionParamsManage> fourth = collectionParamsManages |
|
||||||
.stream() |
|
||||||
.filter(item -> item.getOtherName().contains("水阀调节") |
|
||||||
&& item.getParamType().equals("3")).findFirst(); |
|
||||||
if (fourth.isEmpty()) { |
|
||||||
continue; |
|
||||||
} |
|
||||||
// 得出水阀调节参数
|
|
||||||
CollectionParamsManage collectionParamsManage = fourth.get(); |
|
||||||
// 发送控制指令
|
|
||||||
if (valveOpening > 0 && valveOpening <= 100) { |
|
||||||
// 开启
|
|
||||||
List<OrderEntity> changeValues = new ArrayList<>(); |
|
||||||
changeValues.add(new OrderEntity(collectionParamsManage.getId(), String.valueOf(valveOpening), Integer.parseInt(collectionParamsManage.getParamType()), collectionParamsManage.getOtherName())); |
|
||||||
sendOrderToMqtt(changeValues); |
|
||||||
// 3. 应用水阀开度(实际应用发送给执行机构)
|
|
||||||
log.info("回风温度: {} ℃ | 水阀开度: {} % ", |
|
||||||
temp, valveOpening); |
|
||||||
} |
|
||||||
|
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
public void startOrStopAHU() { |
|
||||||
// 扫描启动了定时开关机的风机,根据当前时间判断是否需要启动或停止
|
|
||||||
// systemType 2: 风柜系统
|
|
||||||
HashMap<String, Object> queryMap = new HashMap<>(); |
|
||||||
queryMap.put("systemType", "2"); |
|
||||||
// 得出 systemType =2 的数据
|
|
||||||
List<CollectionParamsManage> collectionParamsManages = collectionParamsManageService.selectListByParams(queryMap); |
|
||||||
// 判断当前时间是星期几
|
|
||||||
String dayOfWeekValue = DateUtils.dayOfWeekValue(); |
|
||||||
// 过滤otherName包含dayOfWeekValue,paramType=29, curValue=1的数据,代表已经启用定时开关机的功能
|
|
||||||
List<CollectionParamsManage> needStartOrStopDataList = collectionParamsManages |
|
||||||
.stream() |
|
||||||
.filter(item -> item.getOtherName().contains(dayOfWeekValue) |
|
||||||
&& item.getParamType().equals("29") |
|
||||||
&& item.getCurValue().intValue() == 1) |
|
||||||
.toList(); |
|
||||||
// 查询得出对应的houseId
|
|
||||||
List<PolicyManage> policyManageList = policyManageService.selectListByCpmIds(needStartOrStopDataList); |
|
||||||
|
|
||||||
// 开始:根据houseId查询出对应的风机启停id
|
|
||||||
List<CpmSpaceRelation> cpmSpaceRelationList = cpmSpaceRelationService.selectListByHouseId(policyManageList); |
|
||||||
// collectionParamsManages过滤出能够开启风机的点位,paramType=2,isUse=0
|
|
||||||
List<CollectionParamsManage> startDeviceList = collectionParamsManages |
|
||||||
.stream() |
|
||||||
.filter(item -> item.getParamType().equals("2") |
|
||||||
&& item.getIsUse() == 0) |
|
||||||
.toList(); |
|
||||||
// 结束:根据houseId查询出对应的风机启停id
|
|
||||||
|
|
||||||
// 在拼接出启用定时开关机的启动时间、关闭时间
|
|
||||||
Map<String, List<PolicyManage>> groupedByHouseId = policyManageList.stream() |
|
||||||
.collect(Collectors.groupingBy( |
|
||||||
PolicyManage::getHouseId, |
|
||||||
Collectors.toList() |
|
||||||
)); |
|
||||||
// groupedByHouseId for 循环遍历
|
|
||||||
for (Map.Entry<String, List<PolicyManage>> entry : groupedByHouseId.entrySet()) { |
|
||||||
// 得出houseId
|
|
||||||
String houseId = entry.getKey(); |
|
||||||
// 得出policyManageList
|
|
||||||
List<PolicyManage> timeList = entry.getValue(); |
|
||||||
int startHour = 0; |
|
||||||
int startMinute = 0; |
|
||||||
int endHour = 0; |
|
||||||
int endMinute = 0; |
|
||||||
for (PolicyManage policyManage : timeList) { |
|
||||||
if (policyManage.getPointName().equals("开_时")) { |
|
||||||
startHour = collectionParamsManageService.selectCollectionParamsManageById(policyManage.getCpmId()).getCurValue().intValue(); |
|
||||||
} |
|
||||||
if (policyManage.getPointName().equals("开_分")) { |
|
||||||
startMinute = collectionParamsManageService.selectCollectionParamsManageById(policyManage.getCpmId()).getCurValue().intValue(); |
|
||||||
} |
|
||||||
if (policyManage.getPointName().equals("关_时")) { |
|
||||||
endHour = collectionParamsManageService.selectCollectionParamsManageById(policyManage.getCpmId()).getCurValue().intValue(); |
|
||||||
} |
|
||||||
if (policyManage.getPointName().equals("关_分")) { |
|
||||||
endMinute = collectionParamsManageService.selectCollectionParamsManageById(policyManage.getCpmId()).getCurValue().intValue(); |
|
||||||
} |
|
||||||
} |
|
||||||
LocalTime nowTime = LocalTime.now(); |
|
||||||
LocalTime startTime = LocalTime.of(startHour, startMinute); |
|
||||||
LocalTime endTime = LocalTime.of(endHour, endMinute); |
|
||||||
// collectionParamsManages过滤出能够开启风机的点位,paramType=2,isUse=0
|
|
||||||
Set<String> validCpmIds = cpmSpaceRelationList.stream() |
|
||||||
.filter(item -> item.getHouseId().equals(houseId)) |
|
||||||
.map(CpmSpaceRelation::getCpmId) |
|
||||||
.collect(Collectors.toSet()); |
|
||||||
List<CollectionParamsManage> startDataList = startDeviceList |
|
||||||
.stream() |
|
||||||
.filter(item -> validCpmIds.contains(item.getId())) |
|
||||||
.toList(); |
|
||||||
// 判断当前风机是否在开启状态了
|
|
||||||
if (null == startDataList || startDataList.size() == 0) { |
|
||||||
return; |
|
||||||
} |
|
||||||
CollectionParamsManage first = startDataList.getFirst(); |
|
||||||
// 判断当前时间是否在开启时间范围内
|
|
||||||
if (DateUtils.isBetween(nowTime, startTime, endTime)) { |
|
||||||
// 判断当前风机是否在开启状态了
|
|
||||||
if (first.getCurValue().intValue() == 1) { |
|
||||||
// 当前风机在开启状态,不需要启动
|
|
||||||
log.info("当前风机在开启状态,不需要启动"); |
|
||||||
} else { |
|
||||||
// 当前风机不在开启状态,需要启动
|
|
||||||
log.info("当前风机不在开启状态,需要启动"); |
|
||||||
List<OrderEntity> changeValues = new ArrayList<>(); |
|
||||||
changeValues.add(new OrderEntity(first.getId(), "1", Integer.parseInt(first.getParamType()), first.getOtherName())); |
|
||||||
sendOrderToMqtt(changeValues); |
|
||||||
} |
|
||||||
; |
|
||||||
} else { |
|
||||||
// 判断当前风机是否在关闭状态了
|
|
||||||
if (first.getCurValue().intValue() == 0) { |
|
||||||
// 当前风机在关闭状态,不需要停止
|
|
||||||
log.info("当前风机在关闭状态,不需要停止"); |
|
||||||
} else { |
|
||||||
// 当前风机不在关闭状态,需要停止
|
|
||||||
log.info("当前风机不在关闭状态,需要停止"); |
|
||||||
List<OrderEntity> changeValues = new ArrayList<>(); |
|
||||||
changeValues.add(new OrderEntity(first.getId(), "0", Integer.parseInt(first.getParamType()), first.getOtherName())); |
|
||||||
sendOrderToMqtt(changeValues); |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
@ -1,84 +0,0 @@ |
|||||||
package com.mh.quartz.util; |
|
||||||
|
|
||||||
import com.mh.quartz.domain.FuzzyLevel; |
|
||||||
|
|
||||||
/** |
|
||||||
* @author LJF |
|
||||||
* @version 1.0 |
|
||||||
* @project EEMCS |
|
||||||
* @description 风柜系统PID调节工具类 |
|
||||||
* @date 2025-05-30 13:47:40 |
|
||||||
*/ |
|
||||||
public class AHUPIDControlUtil { |
|
||||||
|
|
||||||
private double kp = 2.0, ki = 0.1, kd = 0.0; |
|
||||||
private double integral = 0; |
|
||||||
private double previousError = 0; |
|
||||||
private static final double MAX_INTEGRAL = 100; |
|
||||||
private static final double MIN_INTEGRAL = -100; |
|
||||||
|
|
||||||
// 模糊增益修正比例
|
|
||||||
private double deltaKpScale = 0.5; |
|
||||||
private double deltaKiScale = 0.01; |
|
||||||
private double deltaKdScale = 0.01; |
|
||||||
|
|
||||||
public double compute(double setTemp, double currentTemp, double deltaTime) { |
|
||||||
double error = currentTemp - setTemp; |
|
||||||
double dError = (error - previousError) / deltaTime; |
|
||||||
|
|
||||||
// 模糊映射
|
|
||||||
FuzzyLevel eLevel = toFuzzyLevel(error); |
|
||||||
FuzzyLevel ecLevel = toFuzzyLevel(dError); |
|
||||||
|
|
||||||
// 获取PID参数调整
|
|
||||||
FuzzyLevel kpAdjust = FuzzyRuleBase.getKpAdjust(eLevel, ecLevel); |
|
||||||
FuzzyLevel kiAdjust = kpAdjust; // 简化处理
|
|
||||||
FuzzyLevel kdAdjust = kpAdjust; |
|
||||||
|
|
||||||
kp += fuzzyDeltaToValue(kpAdjust, deltaKpScale); |
|
||||||
ki += fuzzyDeltaToValue(kiAdjust, deltaKiScale); |
|
||||||
kd += fuzzyDeltaToValue(kdAdjust, deltaKdScale); |
|
||||||
|
|
||||||
// 限幅
|
|
||||||
kp = Math.max(0, Math.min(kp, 10)); |
|
||||||
ki = Math.max(0, Math.min(ki, 1)); |
|
||||||
kd = Math.max(0, Math.min(kd, 1)); |
|
||||||
|
|
||||||
// PID 计算
|
|
||||||
// 积分项限幅
|
|
||||||
integral += error * deltaTime; |
|
||||||
integral = Math.max(MIN_INTEGRAL, Math.min(MAX_INTEGRAL, integral)); |
|
||||||
double output = kp * error + ki * integral + kd * dError; |
|
||||||
System.out.println("计算输出值:" + output + ",误差:" + error + ",误差变化:" + dError); |
|
||||||
previousError = error; |
|
||||||
|
|
||||||
// 输出冷冻水阀开度,限制在0~100%
|
|
||||||
return Math.max(0, Math.min(100, Math.abs(output))); |
|
||||||
} |
|
||||||
|
|
||||||
// 将数值误差映射为模糊等级
|
|
||||||
public static FuzzyLevel toFuzzyLevel(double value) { |
|
||||||
if (value <= -3) return FuzzyLevel.NB; |
|
||||||
else if (value <= -2) return FuzzyLevel.NM; |
|
||||||
else if (value <= -1) return FuzzyLevel.NS; |
|
||||||
else if (value <= 1) return FuzzyLevel.ZO; |
|
||||||
else if (value <= 2) return FuzzyLevel.PS; |
|
||||||
else if (value <= 3) return FuzzyLevel.PM; |
|
||||||
else return FuzzyLevel.PB; |
|
||||||
} |
|
||||||
|
|
||||||
// 将模糊等级转为实际数值调整量
|
|
||||||
public static double fuzzyDeltaToValue(FuzzyLevel level, double scale) { |
|
||||||
switch (level) { |
|
||||||
case NB: return -3 * scale; |
|
||||||
case NM: return -2 * scale; |
|
||||||
case NS: return -1 * scale; |
|
||||||
case ZO: return 0; |
|
||||||
case PS: return 1 * scale; |
|
||||||
case PM: return 2 * scale; |
|
||||||
case PB: return 3 * scale; |
|
||||||
default: return 0; |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
} |
|
@ -1,152 +0,0 @@ |
|||||||
package com.mh.quartz.util; |
|
||||||
|
|
||||||
/** |
|
||||||
* @author LJF |
|
||||||
* @version 1.0 |
|
||||||
* @project EEMCS |
|
||||||
* @description 模糊PID控制算法 |
|
||||||
* @date 2025-06-04 09:47:16 |
|
||||||
*/ |
|
||||||
public class FuzzyPIDControlUtil { |
|
||||||
// PID参数
|
|
||||||
private final double kp; // 比例增益
|
|
||||||
private final double ki; // 积分增益
|
|
||||||
private final double kd; // 微分增益
|
|
||||||
|
|
||||||
// 控制器状态
|
|
||||||
private double prevError; |
|
||||||
private double integral; |
|
||||||
private double derivative; |
|
||||||
|
|
||||||
// 模糊规则参数
|
|
||||||
private final double[] errorLevels = {-6, -3, -1, 0, 1, 3, 6}; // 温度误差级别(℃)
|
|
||||||
private final double[] dErrorLevels = {-3, -1, 0, 1, 3}; // 误差变化率级别(℃/min)
|
|
||||||
private final double[] kpAdjust = {1.5, 2.0, 2.5, 3.0, 4.0}; // Kp调整因子 (增强)
|
|
||||||
private final double[] kiAdjust = {0.3, 0.7, 1.0, 1.3, 1.7}; // Ki调整因子
|
|
||||||
|
|
||||||
// 阀门限制
|
|
||||||
private static final double MIN_VALVE = 0.0; // 最小开度(0%)
|
|
||||||
private static final double MAX_VALVE = 100.0; // 最大开度(100%)
|
|
||||||
|
|
||||||
public FuzzyPIDControlUtil(String kp, String ki, String kd) { |
|
||||||
this.kp = Double.parseDouble(kp); |
|
||||||
this.ki = Double.parseDouble(ki); |
|
||||||
this.kd = Double.parseDouble(kd); |
|
||||||
this.prevError = 0; |
|
||||||
this.integral = 0; |
|
||||||
this.derivative = 0; |
|
||||||
} |
|
||||||
|
|
||||||
// 模糊推理计算PID参数调整因子
|
|
||||||
private double[] fuzzyInference(double error, double dError) { |
|
||||||
// 模糊化:计算误差和误差变化率的隶属度
|
|
||||||
double[] errorMembership = calculateMembership(error, errorLevels); |
|
||||||
double[] dErrorMembership = calculateMembership(dError, dErrorLevels); |
|
||||||
|
|
||||||
// 模糊规则库
|
|
||||||
double kpAdjustSum = 0.0; |
|
||||||
double kiAdjustSum = 0.0; |
|
||||||
double weightSum = 0.0; |
|
||||||
|
|
||||||
// 应用模糊规则 (增强大误差时的响应)
|
|
||||||
for (int i = 0; i < errorMembership.length; i++) { |
|
||||||
for (int j = 0; j < dErrorMembership.length; j++) { |
|
||||||
double weight = errorMembership[i] * dErrorMembership[j]; |
|
||||||
if (weight > 0) { |
|
||||||
// 增强大误差时的响应
|
|
||||||
int kpIndex; |
|
||||||
if (Math.abs(error) > 3) { // 大误差
|
|
||||||
kpIndex = Math.min(Math.max(i + j, 0), kpAdjust.length - 1); |
|
||||||
} else { |
|
||||||
kpIndex = Math.min(Math.max(i + j - 1, 0), kpAdjust.length - 1); |
|
||||||
} |
|
||||||
|
|
||||||
// Ki调整:小误差时增强积分作用
|
|
||||||
int kiIndex; |
|
||||||
if (Math.abs(error) < 1) { // 小误差
|
|
||||||
kiIndex = Math.min(Math.max(3 + j, 0), kiAdjust.length - 1); |
|
||||||
} else { |
|
||||||
kiIndex = Math.min(Math.max(2 + j, 0), kiAdjust.length - 1); |
|
||||||
} |
|
||||||
|
|
||||||
kpAdjustSum += weight * kpAdjust[kpIndex]; |
|
||||||
kiAdjustSum += weight * kiAdjust[kiIndex]; |
|
||||||
weightSum += weight; |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
// 反模糊化 (加权平均)
|
|
||||||
double kpFactor = weightSum > 0 ? kpAdjustSum / weightSum : 1.0; |
|
||||||
double kiFactor = weightSum > 0 ? kiAdjustSum / weightSum : 1.0; |
|
||||||
|
|
||||||
return new double[]{kpFactor, kiFactor, 1.0}; // Kd不调整
|
|
||||||
} |
|
||||||
|
|
||||||
// 计算隶属度 (三角隶属函数)
|
|
||||||
private double[] calculateMembership(double value, double[] levels) { |
|
||||||
double[] membership = new double[levels.length]; |
|
||||||
|
|
||||||
for (int i = 0; i < levels.length; i++) { |
|
||||||
if (i == 0) { |
|
||||||
membership[i] = (value <= levels[i]) ? 1.0 : |
|
||||||
(value < levels[i+1]) ? (levels[i+1] - value) / (levels[i+1] - levels[i]) : 0.0; |
|
||||||
} else if (i == levels.length - 1) { |
|
||||||
membership[i] = (value >= levels[i]) ? 1.0 : |
|
||||||
(value > levels[i-1]) ? (value - levels[i-1]) / (levels[i] - levels[i-1]) : 0.0; |
|
||||||
} else { |
|
||||||
if (value >= levels[i-1] && value <= levels[i]) { |
|
||||||
membership[i] = (value - levels[i-1]) / (levels[i] - levels[i-1]); |
|
||||||
} else if (value >= levels[i] && value <= levels[i+1]) { |
|
||||||
membership[i] = (levels[i+1] - value) / (levels[i+1] - levels[i]); |
|
||||||
} else { |
|
||||||
membership[i] = 0.0; |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
return membership; |
|
||||||
} |
|
||||||
|
|
||||||
// 计算控制输出 (阀门开度) - 修复了符号问题
|
|
||||||
public double calculate(double setpoint, double currentValue, double dt) { |
|
||||||
// 计算误差项 - 修复:当前值高于设定值需要冷却,误差应为正
|
|
||||||
double error = currentValue - setpoint; |
|
||||||
|
|
||||||
// 计算微分项 (基于误差变化率)
|
|
||||||
if (dt > 0) { |
|
||||||
derivative = (error - prevError) / dt; |
|
||||||
} |
|
||||||
|
|
||||||
// 模糊调整PID参数
|
|
||||||
double[] adjustments = fuzzyInference(error, derivative); |
|
||||||
double adjKp = kp * adjustments[0]; |
|
||||||
double adjKi = ki * adjustments[1]; |
|
||||||
double adjKd = kd * adjustments[2]; |
|
||||||
|
|
||||||
// 计算积分项 (带抗饱和)
|
|
||||||
integral += error * dt; |
|
||||||
|
|
||||||
// 抗饱和限制
|
|
||||||
double maxIntegral = MAX_VALVE / (adjKi + 1e-5); |
|
||||||
if (Math.abs(integral) > maxIntegral) { |
|
||||||
integral = Math.signum(integral) * maxIntegral; |
|
||||||
} |
|
||||||
|
|
||||||
// PID计算 - 修复:误差为正时需要正输出打开阀门
|
|
||||||
double output = adjKp * error + adjKi * integral + adjKd * derivative; |
|
||||||
|
|
||||||
// 保存误差用于下次计算
|
|
||||||
prevError = error; |
|
||||||
|
|
||||||
// 阀门开度限制
|
|
||||||
return Math.min(Math.max(output, MIN_VALVE), MAX_VALVE); |
|
||||||
} |
|
||||||
|
|
||||||
// 重置控制器状态
|
|
||||||
public void reset() { |
|
||||||
integral = 0; |
|
||||||
prevError = 0; |
|
||||||
derivative = 0; |
|
||||||
} |
|
||||||
} |
|
@ -1,29 +0,0 @@ |
|||||||
package com.mh.quartz.util; |
|
||||||
|
|
||||||
import com.mh.quartz.domain.FuzzyLevel; |
|
||||||
|
|
||||||
/** |
|
||||||
* @Classname FuzzyRuleBase |
|
||||||
* Todo: |
|
||||||
* @Date 2025-05-31 14:20 |
|
||||||
* @Created by LJF |
|
||||||
*/ |
|
||||||
public class FuzzyRuleBase { |
|
||||||
|
|
||||||
// ΔKp 规则:根据误差 e 与误差变化 ec 得到修正量
|
|
||||||
private static final FuzzyLevel[][] kpRuleTable = { |
|
||||||
{FuzzyLevel.PB, FuzzyLevel.PM, FuzzyLevel.PS, FuzzyLevel.ZO, FuzzyLevel.NS, FuzzyLevel.NM, FuzzyLevel.NB}, |
|
||||||
{FuzzyLevel.PM, FuzzyLevel.PS, FuzzyLevel.ZO, FuzzyLevel.NS, FuzzyLevel.NM, FuzzyLevel.NB, FuzzyLevel.NB}, |
|
||||||
{FuzzyLevel.PS, FuzzyLevel.ZO, FuzzyLevel.NS, FuzzyLevel.NM, FuzzyLevel.NB, FuzzyLevel.NB, FuzzyLevel.NB}, |
|
||||||
{FuzzyLevel.ZO, FuzzyLevel.ZO, FuzzyLevel.ZO, FuzzyLevel.ZO, FuzzyLevel.ZO, FuzzyLevel.ZO, FuzzyLevel.ZO}, |
|
||||||
{FuzzyLevel.NS, FuzzyLevel.ZO, FuzzyLevel.PS, FuzzyLevel.PM, FuzzyLevel.PB, FuzzyLevel.PB, FuzzyLevel.PB}, |
|
||||||
{FuzzyLevel.NM, FuzzyLevel.NS, FuzzyLevel.ZO, FuzzyLevel.PS, FuzzyLevel.PM, FuzzyLevel.PB, FuzzyLevel.PB}, |
|
||||||
{FuzzyLevel.NB, FuzzyLevel.NB, FuzzyLevel.NS, FuzzyLevel.ZO, FuzzyLevel.PS, FuzzyLevel.PM, FuzzyLevel.PB} |
|
||||||
}; |
|
||||||
|
|
||||||
public static FuzzyLevel getKpAdjust(FuzzyLevel e, FuzzyLevel ec) { |
|
||||||
return kpRuleTable[e.ordinal()][ec.ordinal()]; |
|
||||||
} |
|
||||||
|
|
||||||
// 可扩展为不同规则表:getKiAdjust(), getKdAdjust()
|
|
||||||
} |
|
Loading…
Reference in new issue