Compare commits
4 Commits
Author | SHA1 | Date |
---|---|---|
|
36d60e0b79 | 3 days ago |
|
1603247df1 | 6 days ago |
|
b22ba182dc | 1 week ago |
|
d268edfce2 | 1 week ago |
50 changed files with 3996 additions and 46 deletions
@ -0,0 +1,63 @@
|
||||
package com.mh.common.core.redis; |
||||
|
||||
import lombok.extern.slf4j.Slf4j; |
||||
import org.springframework.data.redis.core.StringRedisTemplate; |
||||
import org.springframework.data.redis.core.script.RedisScript; |
||||
import org.springframework.stereotype.Component; |
||||
|
||||
import java.util.Collections; |
||||
import java.util.concurrent.TimeUnit; |
||||
|
||||
/** |
||||
* @author LJF |
||||
* @version 1.0 |
||||
* @project EEMCS |
||||
* @description 锁 |
||||
* @date 2025-06-06 16:08:13 |
||||
*/ |
||||
@Slf4j |
||||
@Component |
||||
public class RedisLock { |
||||
|
||||
private final StringRedisTemplate redisTemplate; |
||||
|
||||
public RedisLock(StringRedisTemplate redisTemplate) { |
||||
this.redisTemplate = redisTemplate; |
||||
} |
||||
|
||||
/** |
||||
* 获取锁 |
||||
*/ |
||||
public boolean lock(String key, String requestId, long expireTimeInSeconds) { |
||||
Boolean success = redisTemplate.opsForValue().setIfAbsent(key, requestId, expireTimeInSeconds, TimeUnit.SECONDS); |
||||
return Boolean.TRUE.equals(success); |
||||
} |
||||
|
||||
/** |
||||
* 尝试获取锁(带超时) |
||||
*/ |
||||
public boolean tryLock(String key, String requestId, long expireTime, long timeoutMs) throws InterruptedException { |
||||
long startTime = System.currentTimeMillis(); |
||||
while (System.currentTimeMillis() - startTime < timeoutMs) { |
||||
if (lock(key, requestId, expireTime)) { |
||||
return true; |
||||
} |
||||
Thread.sleep(50); |
||||
} |
||||
return false; |
||||
} |
||||
|
||||
/** |
||||
* 释放锁(使用 Lua 脚本保证原子性) |
||||
*/ |
||||
public void unlock(String key, String requestId) { |
||||
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; |
||||
RedisScript<Long> redisScript = RedisScript.of(script, Long.class); |
||||
|
||||
Long result = redisTemplate.execute(redisScript, Collections.singletonList(key), requestId); |
||||
|
||||
if (result == null || result == 0) { |
||||
log.warn("释放锁失败,可能已被其他线程释放 key={}", key); |
||||
} |
||||
} |
||||
} |
@ -0,0 +1,277 @@
|
||||
package com.mh.common.utils; |
||||
|
||||
import com.mh.common.core.domain.entity.CollectionParamsManage; |
||||
import lombok.extern.slf4j.Slf4j; |
||||
|
||||
import java.math.BigDecimal; |
||||
import java.math.RoundingMode; |
||||
import java.text.DecimalFormat; |
||||
import java.text.ParseException; |
||||
import java.text.SimpleDateFormat; |
||||
import java.util.*; |
||||
|
||||
/** |
||||
* @author ljf |
||||
* @title : |
||||
* @description : 解析485接收的数据 |
||||
* @updateTime 2020-04-23 |
||||
* @throws : |
||||
*/ |
||||
@Slf4j |
||||
public class AnalysisReceiveOrder485 { |
||||
|
||||
// 调用service
|
||||
private final SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); |
||||
private final DecimalFormat df = new DecimalFormat("#.##"); |
||||
|
||||
//解析冷量表
|
||||
public void analysisCloudOrder485(final String dataStr1, final CollectionParamsManage deviceCodeParam) { |
||||
// 去掉空格
|
||||
String dataStr = dataStr1.replace(" ", "").toUpperCase(); |
||||
// 检验报文
|
||||
String checkStr = dataStr.substring(0, dataStr.length() - 4); |
||||
byte[] strOrder = ExchangeStringUtil.hexStrToBinaryStr(checkStr); |
||||
int checkNum = CRC16.CRC16_MODBUS(strOrder); |
||||
String checkWord = ExchangeStringUtil.decToHex(String.valueOf(checkNum)); |
||||
checkWord = checkWord.substring(2, 4) + checkWord.substring(0, 2); |
||||
|
||||
if (checkWord.equalsIgnoreCase(dataStr.substring(dataStr.length() - 4))) { |
||||
//创建SimpleDateFormat对象,指定样式 2019-05-13 22:39:30
|
||||
Date date = new Date(); |
||||
String dateStr = sdf1.format(date); |
||||
; |
||||
//保留两位小数处理
|
||||
DecimalFormat decimalFormat = new DecimalFormat("0.00"); |
||||
// 表号
|
||||
String cloudId = ExchangeStringUtil.hexToDec(dataStr.substring(0, 2)); |
||||
// 读数
|
||||
String data = ""; |
||||
data = dataStr.substring(dataStr.length() - 8, dataStr.length() - 6) |
||||
+ dataStr.substring(dataStr.length() - 6, dataStr.length() - 4) |
||||
+ dataStr.substring(dataStr.length() - 12, dataStr.length() - 10) |
||||
+ dataStr.substring(dataStr.length() - 10, dataStr.length() - 8); |
||||
|
||||
String registerAddr = deviceCodeParam.getRegisterAddr(); |
||||
if (ExchangeStringUtil.isInDate(date, "00:00:00", "00:00:30")) { |
||||
dateStr = dateStr.substring(0, 17) + "00"; |
||||
} else if (ExchangeStringUtil.isInDate(date, "00:00:30", "00:00:59")) { |
||||
dateStr = dateStr.substring(0, 17) + "30"; |
||||
} |
||||
try { |
||||
if (registerAddr.equals("32") || registerAddr.equals("33") || registerAddr.equals("35") || registerAddr.equals("36")) { |
||||
data = decimalFormat.format(Math.abs(ExchangeStringUtil.hexToSingle(data)));//十六进制字符串转IEEE754浮点型
|
||||
log.info("冷量计==>{},寄存器地址==>{},读数==>{}", cloudId, registerAddr, data); |
||||
} else if (registerAddr.equals("31") || registerAddr.equals("34")) { |
||||
long lData = Long.parseLong(ExchangeStringUtil.hexToDec(data)); |
||||
log.info("冷量计==>{},寄存器地址==>{},累计读数==>{}", cloudId, registerAddr, lData); |
||||
} |
||||
} catch (Exception e) { |
||||
log.error("保存冷量计数据失败!", e); |
||||
} |
||||
} else { |
||||
log.info("冷量计校验失败===>{}", dataStr); |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* 解析水表返回的数据 |
||||
* |
||||
* @param dataStr1 |
||||
*/ |
||||
public String analysisWaterOrder485(final String dataStr1, final CollectionParamsManage deviceCodeParam) { |
||||
// 去掉空格
|
||||
String dataStr = dataStr1.replace(" ", "").toUpperCase(); |
||||
// 检验报文
|
||||
String checkStr = dataStr.substring(0, dataStr.length() - 4); |
||||
byte[] strOrder = ExchangeStringUtil.hexStrToBinaryStr(checkStr); |
||||
int checkNum = CRC16.CRC16_MODBUS(strOrder); |
||||
String checkWord = ExchangeStringUtil.decToHex(String.valueOf(checkNum)); |
||||
checkWord = checkWord.substring(2, 4) + checkWord.substring(0, 2); |
||||
|
||||
if (checkWord.equalsIgnoreCase(dataStr.substring(dataStr.length() - 4))) { |
||||
//创建SimpleDateFormat对象,指定样式 2019-05-13 22:39:30
|
||||
Date date = new Date(); |
||||
String dateStr = sdf1.format(date); |
||||
; |
||||
//保留两位小数处理
|
||||
DecimalFormat decimalFormat = new DecimalFormat("0.00"); |
||||
// 表号
|
||||
String cloudId = ExchangeStringUtil.hexToDec(dataStr.substring(0, 2)); |
||||
// 读数
|
||||
String data = ""; |
||||
data = dataStr.substring(dataStr.length() - 8, dataStr.length() - 6) |
||||
+ dataStr.substring(dataStr.length() - 6, dataStr.length() - 4) |
||||
+ dataStr.substring(dataStr.length() - 12, dataStr.length() - 10) |
||||
+ dataStr.substring(dataStr.length() - 10, dataStr.length() - 8); |
||||
|
||||
int dataType = deviceCodeParam.getDataType(); |
||||
if (ExchangeStringUtil.isInDate(date, "00:00:00", "00:00:30")) { |
||||
dateStr = dateStr.substring(0, 17) + "00"; |
||||
System.out.println("插入时间00" + dateStr); |
||||
} else if (ExchangeStringUtil.isInDate(date, "00:00:30", "00:00:59")) { |
||||
dateStr = dateStr.substring(0, 17) + "30"; |
||||
System.out.println("插入时间30" + dateStr); |
||||
} |
||||
try { |
||||
if (dataType == 3) { |
||||
data = decimalFormat.format(Math.abs(ExchangeStringUtil.hexToSingle(data)));//十六进制字符串转IEEE754浮点型
|
||||
log.info("水表==>{},寄存器地址==>{},读数==>{}", cloudId, deviceCodeParam.getRegisterAddr(), data); |
||||
} else if (dataType == 2) { |
||||
data = dataStr.substring(dataStr.length() - 12, dataStr.length() - 10) |
||||
+ dataStr.substring(dataStr.length() - 10, dataStr.length() - 8) |
||||
+ dataStr.substring(dataStr.length() - 8, dataStr.length() - 6) |
||||
+ dataStr.substring(dataStr.length() - 6, dataStr.length() - 4); |
||||
data = ExchangeStringUtil.hexToDec(data); |
||||
BigDecimal bigDecimal = new BigDecimal(data); |
||||
bigDecimal = bigDecimal.divide(new BigDecimal((int) Math.pow(10, deviceCodeParam.getDigits()))).setScale(2, RoundingMode.HALF_UP); // 除以1000并保留整数
|
||||
data = bigDecimal.toString(); |
||||
log.info("水表==>{},寄存器地址==>{},累计读数==>{}", cloudId, deviceCodeParam.getRegisterAddr(), data); |
||||
} |
||||
return data; |
||||
} catch (Exception e) { |
||||
log.error("保存水表数据失败!", e); |
||||
} |
||||
} else { |
||||
log.info("水表===>{}", dataStr); |
||||
return ""; |
||||
} |
||||
return ""; |
||||
} |
||||
|
||||
|
||||
/** |
||||
* 解析电表返回的数据 |
||||
* |
||||
* @param dataStr1 |
||||
*/ |
||||
public String analysisMeterOrder485(final String dataStr1, final CollectionParamsManage deviceCodeParam) { |
||||
// 去掉空格
|
||||
String dataStr = dataStr1.replace(" ", "").toUpperCase(); |
||||
// 检验报文
|
||||
String checkStr = dataStr.substring(0, dataStr.length() - 4); |
||||
byte[] strOrder = ExchangeStringUtil.hexStrToBinaryStr(checkStr); |
||||
int checkNum = CRC16.CRC16_MODBUS(strOrder); |
||||
String checkWord = ExchangeStringUtil.decToHex(String.valueOf(checkNum)); |
||||
checkWord = checkWord.substring(2, 4) + checkWord.substring(0, 2); |
||||
|
||||
if (checkWord.equalsIgnoreCase(dataStr.substring(dataStr.length() - 4))) { |
||||
//创建SimpleDateFormat对象,指定样式 2019-05-13 22:39:30
|
||||
Date date = new Date(); |
||||
String dateStr = sdf1.format(date); |
||||
; |
||||
//保留两位小数处理
|
||||
DecimalFormat decimalFormat = new DecimalFormat("0.00"); |
||||
// 表号
|
||||
String cloudId = ExchangeStringUtil.hexToDec(dataStr.substring(0, 2)); |
||||
// 读数
|
||||
String data = ""; |
||||
data = dataStr.substring(dataStr.length() - 8, dataStr.length() - 6) |
||||
+ dataStr.substring(dataStr.length() - 6, dataStr.length() - 4) |
||||
+ dataStr.substring(dataStr.length() - 12, dataStr.length() - 10) |
||||
+ dataStr.substring(dataStr.length() - 10, dataStr.length() - 8); |
||||
|
||||
int dataType = deviceCodeParam.getDataType(); |
||||
if (ExchangeStringUtil.isInDate(date, "00:00:00", "00:00:30")) { |
||||
dateStr = dateStr.substring(0, 17) + "00"; |
||||
System.out.println("插入时间00" + dateStr); |
||||
} else if (ExchangeStringUtil.isInDate(date, "00:00:30", "00:00:59")) { |
||||
dateStr = dateStr.substring(0, 17) + "30"; |
||||
System.out.println("插入时间30" + dateStr); |
||||
} |
||||
try { |
||||
if (dataType == 3) { |
||||
data = decimalFormat.format(Math.abs(ExchangeStringUtil.hexToSingle(data)));//十六进制字符串转IEEE754浮点型
|
||||
log.info("电表==>{},寄存器地址==>{},读数==>{}", cloudId, deviceCodeParam.getRegisterAddr(), data); |
||||
} else if (dataType == 2) { |
||||
data = ExchangeStringUtil.hexToDec(data); |
||||
log.info("电表==>{},寄存器地址==>{},累计读数==>{}", cloudId, deviceCodeParam.getRegisterAddr(), data); |
||||
} |
||||
return data; |
||||
} catch (Exception e) { |
||||
log.error("保存电表数据失败!", e); |
||||
} |
||||
} else { |
||||
log.info("电表===>{}", dataStr); |
||||
return ""; |
||||
} |
||||
return ""; |
||||
} |
||||
|
||||
public static int dValue(String lastDate) throws ParseException { |
||||
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); |
||||
Date lastTime = format.parse(lastDate); |
||||
long min = lastTime.getTime(); |
||||
Calendar calendar = Calendar.getInstance(); |
||||
long min1 = calendar.getTimeInMillis(); |
||||
long subtract = min1 - min; |
||||
// System.out.println("相减值: " + subtract/(1000*60));
|
||||
return (int) subtract / (1000 * 60); |
||||
} |
||||
|
||||
|
||||
// 判断是否存在寄存器地址
|
||||
public Boolean queryRegisterAddr(List<String> stringList, String registerAddr) { |
||||
boolean flag = false; |
||||
for (int i = 0; i < stringList.size(); i++) { |
||||
if (stringList.get(i).equalsIgnoreCase(registerAddr)) { |
||||
flag = true; |
||||
break; |
||||
} |
||||
} |
||||
return flag; |
||||
} |
||||
|
||||
public String analysisHeatPumpOrder485(String receiveStr, CollectionParamsManage deviceCodeParam) { |
||||
// 去掉空格
|
||||
String dataStr = receiveStr.replace(" ", "").toUpperCase(); |
||||
// 检验报文
|
||||
String checkStr = dataStr.substring(0, dataStr.length() - 4); |
||||
byte[] strOrder = ExchangeStringUtil.hexStrToBinaryStr(checkStr); |
||||
int checkNum = CRC16.CRC16_MODBUS(strOrder); |
||||
String checkWord = ExchangeStringUtil.decToHex(String.valueOf(checkNum)); |
||||
checkWord = checkWord.substring(2, 4) + checkWord.substring(0, 2); |
||||
|
||||
if (checkWord.equalsIgnoreCase(dataStr.substring(dataStr.length() - 4))) { |
||||
//创建SimpleDateFormat对象,指定样式 2019-05-13 22:39:30
|
||||
Date date = new Date(); |
||||
String dateStr = sdf1.format(date); |
||||
|
||||
//保留两位小数处理
|
||||
DecimalFormat decimalFormat = new DecimalFormat("0.00"); |
||||
// 表号
|
||||
String cloudId = ExchangeStringUtil.hexToDec(dataStr.substring(0, 2)); |
||||
// 读数
|
||||
String data = ""; |
||||
data = dataStr.substring(dataStr.length() - 8, dataStr.length() - 6) |
||||
+ dataStr.substring(dataStr.length() - 6, dataStr.length() - 4); |
||||
|
||||
int dataType = deviceCodeParam.getDataType(); |
||||
if (ExchangeStringUtil.isInDate(date, "00:00:00", "00:00:30")) { |
||||
dateStr = dateStr.substring(0, 17) + "00"; |
||||
System.out.println("插入时间00" + dateStr); |
||||
} else if (ExchangeStringUtil.isInDate(date, "00:00:30", "00:00:59")) { |
||||
dateStr = dateStr.substring(0, 17) + "30"; |
||||
System.out.println("插入时间30" + dateStr); |
||||
} |
||||
try { |
||||
if (dataType == 3) { |
||||
data = decimalFormat.format(Math.abs(ExchangeStringUtil.hexToSingle(data)));//十六进制字符串转IEEE754浮点型
|
||||
} else if (dataType == 2 && (deviceCodeParam.getParamType().equals("5") |
||||
|| deviceCodeParam.getParamType().equals("2") |
||||
|| deviceCodeParam.getParamType().equals("12") |
||||
|| deviceCodeParam.getParamType().equals("14") |
||||
)) { |
||||
data = ExchangeStringUtil.hexToDec(data); |
||||
} |
||||
log.info("热泵==>{},寄存器地址==>{},读数==>{}", cloudId, deviceCodeParam.getRegisterAddr(), data); |
||||
return data; |
||||
} catch (Exception e) { |
||||
log.error("保存热泵数据失败!", e); |
||||
} |
||||
} else { |
||||
log.info("热泵===>{}", dataStr); |
||||
return ""; |
||||
} |
||||
return ""; |
||||
} |
||||
} |
@ -0,0 +1,253 @@
|
||||
package com.mh.common.utils; |
||||
|
||||
/** |
||||
* CRC16_CCITT:多项式x16+x12+x5+1(0x1021),初始值0x0000,低位在前,高位在后,结果与0x0000异或 |
||||
* CRC16_CCITT_FALSE:多项式x16+x12+x5+1(0x1021),初始值0xFFFF,低位在后,高位在前,结果与0x0000异或 |
||||
* CRC16_XMODEM:多项式x16+x12+x5+1(0x1021),初始值0x0000,低位在后,高位在前,结果与0x0000异或 |
||||
* CRC16_X25:多项式x16+x12+x5+1(0x1021),初始值0xffff,低位在前,高位在后,结果与0xFFFF异或 |
||||
* CRC16_MODBUS:多项式x16+x15+x2+1(0x8005),初始值0xFFFF,低位在前,高位在后,结果与0x0000异或 |
||||
* CRC16_IBM:多项式x16+x15+x2+1(0x8005),初始值0x0000,低位在前,高位在后,结果与0x0000异或 |
||||
* CRC16_MAXIM:多项式x16+x15+x2+1(0x8005),初始值0x0000,低位在前,高位在后,结果与0xFFFF异或 |
||||
* CRC16_USB:多项式x16+x15+x2+1(0x8005),初始值0xFFFF,低位在前,高位在后,结果与0xFFFF异或 |
||||
* CRC16_DNP:多项式x16+x13+x12+x11+x10+x8+x6+x5+x2+1(0x3D65),初始值0x0000,低位在前,高位在后,结果与0xFFFF异或 |
||||
* <p> |
||||
* (1)、预置1个16位的寄存器为十六进制FFFF(即全为1),称此寄存器为CRC寄存器; |
||||
* (2)、把第一个8位二进制数据(既通讯信息帧的第一个字节)与16位的CRC寄存器的低8位相异或,把结果放于CRC寄存器,高八位数据不变; |
||||
* (3)、把CRC寄存器的内容右移一位(朝低位)用0填补最高位,并检查右移后的移出位; |
||||
* (4)、如果移出位为0:重复第3步(再次右移一位);如果移出位为1,CRC寄存器与多项式A001(1010 0000 0000 0001)进行异或; |
||||
* (5)、重复步骤3和4,直到右移8次,这样整个8位数据全部进行了处理; |
||||
* (6)、重复步骤2到步骤5,进行通讯信息帧下一个字节的处理; |
||||
* (7)、将该通讯信息帧所有字节按上述步骤计算完成后,得到的16位CRC寄存器的高、低字节进行交换; |
||||
* (8)、最后得到的CRC寄存器内容即为:CRC码。 |
||||
* <p> |
||||
* 以上计算步骤中的多项式0xA001是0x8005按位颠倒后的结果。 |
||||
* 0x8408是0x1021按位颠倒后的结果。 |
||||
* 在线校验工具 |
||||
* http://www.ip33.com/crc.html
|
||||
* https://blog.csdn.net/htmlxx/article/details/17369105
|
||||
* <p> |
||||
* Author:Water |
||||
* Time:2018/11/19 0019 15:03 |
||||
*/ |
||||
public class CRC16 { |
||||
|
||||
/** |
||||
* CRC16_CCITT:多项式x16+x12+x5+1(0x1021),初始值0x0000,低位在前,高位在后,结果与0x0000异或 |
||||
* 0x8408是0x1021按位颠倒后的结果。 |
||||
* |
||||
* @param buffer |
||||
* @return |
||||
*/ |
||||
public static int CRC16_CCITT(byte[] buffer) { |
||||
int wCRCin = 0x0000; |
||||
int wCPoly = 0x8408; |
||||
for (byte b : buffer) { |
||||
wCRCin ^= ((int) b & 0x00ff); |
||||
for (int j = 0; j < 8; j++) { |
||||
if ((wCRCin & 0x0001) != 0) { |
||||
wCRCin >>= 1; |
||||
wCRCin ^= wCPoly; |
||||
} else { |
||||
wCRCin >>= 1; |
||||
} |
||||
} |
||||
} |
||||
// wCRCin=(wCRCin<<8)|(wCRCin>>8);
|
||||
// wCRCin &= 0xffff;
|
||||
return wCRCin ^= 0x0000; |
||||
|
||||
} |
||||
|
||||
/** |
||||
* CRC-CCITT (0xFFFF) |
||||
* CRC16_CCITT_FALSE:多项式x16+x12+x5+1(0x1021),初始值0xFFFF,低位在后,高位在前,结果与0x0000异或 |
||||
* |
||||
* @param buffer |
||||
* @return |
||||
*/ |
||||
public static int CRC16_CCITT_FALSE(byte[] buffer) { |
||||
int wCRCin = 0xffff; |
||||
int wCPoly = 0x1021; |
||||
for (byte b : buffer) { |
||||
for (int i = 0; i < 8; i++) { |
||||
boolean bit = ((b >> (7 - i) & 1) == 1); |
||||
boolean c15 = ((wCRCin >> 15 & 1) == 1); |
||||
wCRCin <<= 1; |
||||
if (c15 ^ bit) |
||||
wCRCin ^= wCPoly; |
||||
} |
||||
} |
||||
wCRCin &= 0xffff; |
||||
return wCRCin ^= 0x0000; |
||||
} |
||||
|
||||
/** |
||||
* CRC-CCITT (XModem) |
||||
* CRC16_XMODEM:多项式x16+x12+x5+1(0x1021),初始值0x0000,低位在后,高位在前,结果与0x0000异或 |
||||
* |
||||
* @param buffer |
||||
* @return |
||||
*/ |
||||
public static int CRC16_XMODEM(byte[] buffer) { |
||||
int wCRCin = 0x0000; // initial value 65535
|
||||
int wCPoly = 0x1021; // 0001 0000 0010 0001 (0, 5, 12)
|
||||
for (byte b : buffer) { |
||||
for (int i = 0; i < 8; i++) { |
||||
boolean bit = ((b >> (7 - i) & 1) == 1); |
||||
boolean c15 = ((wCRCin >> 15 & 1) == 1); |
||||
wCRCin <<= 1; |
||||
if (c15 ^ bit) |
||||
wCRCin ^= wCPoly; |
||||
} |
||||
} |
||||
wCRCin &= 0xffff; |
||||
return wCRCin ^= 0x0000; |
||||
} |
||||
|
||||
|
||||
/** |
||||
* CRC16_X25:多项式x16+x12+x5+1(0x1021),初始值0xffff,低位在前,高位在后,结果与0xFFFF异或 |
||||
* 0x8408是0x1021按位颠倒后的结果。 |
||||
* |
||||
* @param buffer |
||||
* @return |
||||
*/ |
||||
public static int CRC16_X25(byte[] buffer) { |
||||
int wCRCin = 0xffff; |
||||
int wCPoly = 0x8408; |
||||
for (byte b : buffer) { |
||||
wCRCin ^= ((int) b & 0x00ff); |
||||
for (int j = 0; j < 8; j++) { |
||||
if ((wCRCin & 0x0001) != 0) { |
||||
wCRCin >>= 1; |
||||
wCRCin ^= wCPoly; |
||||
} else { |
||||
wCRCin >>= 1; |
||||
} |
||||
} |
||||
} |
||||
return wCRCin ^= 0xffff; |
||||
} |
||||
|
||||
/** |
||||
* CRC-16 (Modbus) |
||||
* CRC16_MODBUS:多项式x16+x15+x2+1(0x8005),初始值0xFFFF,低位在前,高位在后,结果与0x0000异或 |
||||
* 0xA001是0x8005按位颠倒后的结果 |
||||
* |
||||
* @param buffer |
||||
* @return |
||||
*/ |
||||
public static int CRC16_MODBUS(byte[] buffer) { |
||||
int wCRCin = 0xffff; |
||||
int POLYNOMIAL = 0xa001; |
||||
for (byte b : buffer) { |
||||
wCRCin ^= ((int) b & 0x00ff); |
||||
for (int j = 0; j < 8; j++) { |
||||
if ((wCRCin & 0x0001) != 0) { |
||||
wCRCin >>= 1; |
||||
wCRCin ^= POLYNOMIAL; |
||||
} else { |
||||
wCRCin >>= 1; |
||||
} |
||||
} |
||||
} |
||||
return wCRCin ^= 0x0000; |
||||
} |
||||
|
||||
/** |
||||
* CRC-16 |
||||
* CRC16_IBM:多项式x16+x15+x2+1(0x8005),初始值0x0000,低位在前,高位在后,结果与0x0000异或 |
||||
* 0xA001是0x8005按位颠倒后的结果 |
||||
* |
||||
* @param buffer |
||||
* @return |
||||
*/ |
||||
public static int CRC16_IBM(byte[] buffer) { |
||||
int wCRCin = 0x0000; |
||||
int wCPoly = 0xa001; |
||||
for (byte b : buffer) { |
||||
wCRCin ^= ((int) b & 0x00ff); |
||||
for (int j = 0; j < 8; j++) { |
||||
if ((wCRCin & 0x0001) != 0) { |
||||
wCRCin >>= 1; |
||||
wCRCin ^= wCPoly; |
||||
} else { |
||||
wCRCin >>= 1; |
||||
} |
||||
} |
||||
} |
||||
return wCRCin ^= 0x0000; |
||||
} |
||||
|
||||
/** |
||||
* CRC16_MAXIM:多项式x16+x15+x2+1(0x8005),初始值0x0000,低位在前,高位在后,结果与0xFFFF异或 |
||||
* 0xA001是0x8005按位颠倒后的结果 |
||||
* |
||||
* @param buffer |
||||
* @return |
||||
*/ |
||||
public static int CRC16_MAXIM(byte[] buffer) { |
||||
int wCRCin = 0x0000; |
||||
int wCPoly = 0xa001; |
||||
for (byte b : buffer) { |
||||
wCRCin ^= ((int) b & 0x00ff); |
||||
for (int j = 0; j < 8; j++) { |
||||
if ((wCRCin & 0x0001) != 0) { |
||||
wCRCin >>= 1; |
||||
wCRCin ^= wCPoly; |
||||
} else { |
||||
wCRCin >>= 1; |
||||
} |
||||
} |
||||
} |
||||
return wCRCin ^= 0xffff; |
||||
} |
||||
|
||||
/** |
||||
* CRC16_USB:多项式x16+x15+x2+1(0x8005),初始值0xFFFF,低位在前,高位在后,结果与0xFFFF异或 |
||||
* 0xA001是0x8005按位颠倒后的结果 |
||||
* |
||||
* @param buffer |
||||
* @return |
||||
*/ |
||||
public static int CRC16_USB(byte[] buffer) { |
||||
int wCRCin = 0xFFFF; |
||||
int wCPoly = 0xa001; |
||||
for (byte b : buffer) { |
||||
wCRCin ^= ((int) b & 0x00ff); |
||||
for (int j = 0; j < 8; j++) { |
||||
if ((wCRCin & 0x0001) != 0) { |
||||
wCRCin >>= 1; |
||||
wCRCin ^= wCPoly; |
||||
} else { |
||||
wCRCin >>= 1; |
||||
} |
||||
} |
||||
} |
||||
return wCRCin ^= 0xffff; |
||||
} |
||||
|
||||
/** |
||||
* CRC16_DNP:多项式x16+x13+x12+x11+x10+x8+x6+x5+x2+1(0x3D65),初始值0x0000,低位在前,高位在后,结果与0xFFFF异或 |
||||
* 0xA6BC是0x3D65按位颠倒后的结果 |
||||
* |
||||
* @param buffer |
||||
* @return |
||||
*/ |
||||
public static int CRC16_DNP(byte[] buffer) { |
||||
int wCRCin = 0x0000; |
||||
int wCPoly = 0xA6BC; |
||||
for (byte b : buffer) { |
||||
wCRCin ^= ((int) b & 0x00ff); |
||||
for (int j = 0; j < 8; j++) { |
||||
if ((wCRCin & 0x0001) != 0) { |
||||
wCRCin >>= 1; |
||||
wCRCin ^= wCPoly; |
||||
} else { |
||||
wCRCin >>= 1; |
||||
} |
||||
} |
||||
} |
||||
return wCRCin ^= 0xffff; |
||||
} |
||||
} |
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,47 @@
|
||||
package com.mh.common.utils; |
||||
|
||||
import io.netty.buffer.ByteBuf; |
||||
import io.netty.buffer.Unpooled; |
||||
import io.netty.channel.ChannelHandlerContext; |
||||
|
||||
/** |
||||
* @author LJF |
||||
* @version 1.0 |
||||
* @project EEMCS |
||||
* @description Modbus协议工具类 |
||||
* @date 2025-06-06 14:40:24 |
||||
*/ |
||||
public class ModbusUtils { |
||||
public static String createControlCode(String mtCode, Integer type, String registerAddr, String param) { |
||||
String orderStr; |
||||
mtCode = ExchangeStringUtil.addZeroForNum(mtCode, 2); |
||||
registerAddr = ExchangeStringUtil.addZeroForNum(registerAddr, 4); |
||||
param = ExchangeStringUtil.addZeroForNum(ExchangeStringUtil.decToHex(param), 4); |
||||
orderStr = mtCode + "06" + registerAddr + param; |
||||
byte[] strOrder = ExchangeStringUtil.hexStrToBinaryStr(orderStr); |
||||
int checkNum = CRC16.CRC16_MODBUS(strOrder); |
||||
String checkWord = ExchangeStringUtil.decToHex(String.valueOf(checkNum)); |
||||
checkWord = checkWord.substring(2, 4) + checkWord.substring(0, 2); |
||||
return orderStr + checkWord; |
||||
} |
||||
|
||||
public static ByteBuf getByteBuf(ChannelHandlerContext ctx, String sendStr) { |
||||
// byte类型的数据
|
||||
// String sendStr = "5803004900021914"; // 冷量计
|
||||
// 申请一个数据结构存储信息
|
||||
ByteBuf buffer = ctx.alloc().buffer(); |
||||
// 将信息放入数据结构中
|
||||
buffer.writeBytes(ExchangeStringUtil.hexStrToBinaryStr(sendStr));//对接需要16进制
|
||||
return buffer; |
||||
} |
||||
|
||||
public static ByteBuf createByteBuf(String sendStr) { |
||||
// byte类型的数据
|
||||
// String sendStr = "5803004900021914"; // 冷量计
|
||||
// 申请一个数据结构存储信息
|
||||
ByteBuf buffer = Unpooled.buffer(); |
||||
// 将信息放入数据结构中
|
||||
buffer.writeBytes(ExchangeStringUtil.hexStrToBinaryStr(sendStr));//对接需要16进制
|
||||
return buffer; |
||||
} |
||||
} |
@ -0,0 +1,77 @@
|
||||
package com.mh.common.utils; |
||||
|
||||
|
||||
import com.google.common.cache.Cache; |
||||
import com.google.common.cache.CacheBuilder; |
||||
import lombok.extern.slf4j.Slf4j; |
||||
import org.apache.commons.lang3.StringUtils; |
||||
|
||||
import java.util.Objects; |
||||
import java.util.concurrent.BlockingQueue; |
||||
import java.util.concurrent.LinkedBlockingQueue; |
||||
import java.util.concurrent.TimeUnit; |
||||
|
||||
/** |
||||
* @author LJF |
||||
* @version 1.0 |
||||
* @project TAD_Server |
||||
* @description 缓存等待数据 |
||||
* @date 2023/7/4 08:45:16 |
||||
*/ |
||||
@Slf4j |
||||
public class NettyTools { |
||||
|
||||
/** |
||||
* 响应消息缓存 |
||||
*/ |
||||
private static final Cache<String, BlockingQueue<String>> responseMsgCache = CacheBuilder.newBuilder() |
||||
.maximumSize(500) |
||||
.expireAfterWrite(1000, TimeUnit.SECONDS) |
||||
.build(); |
||||
|
||||
|
||||
/** |
||||
* 等待响应消息 |
||||
* @param key 消息唯一标识 |
||||
* @return ReceiveDdcMsgVo |
||||
*/ |
||||
public static boolean waitReceiveMsg(String key) { |
||||
|
||||
try { |
||||
//设置超时时间
|
||||
String vo = Objects.requireNonNull(responseMsgCache.getIfPresent(key)) |
||||
.poll(1000 * 10, TimeUnit.MILLISECONDS); |
||||
|
||||
//删除key
|
||||
responseMsgCache.invalidate(key); |
||||
return StringUtils.isNotBlank(vo); |
||||
} catch (Exception e) { |
||||
log.error("获取数据异常,sn={},msg=null",key); |
||||
return false; |
||||
} |
||||
|
||||
} |
||||
|
||||
/** |
||||
* 初始化响应消息的队列 |
||||
* @param key 消息唯一标识 |
||||
*/ |
||||
public static void initReceiveMsg(String key) { |
||||
responseMsgCache.put(key,new LinkedBlockingQueue<String>(1)); |
||||
} |
||||
|
||||
/** |
||||
* 设置响应消息 |
||||
* @param key 消息唯一标识 |
||||
*/ |
||||
public static void setReceiveMsg(String key, String msg) { |
||||
|
||||
if(responseMsgCache.getIfPresent(key) != null){ |
||||
responseMsgCache.getIfPresent(key).add(msg); |
||||
return; |
||||
} |
||||
|
||||
log.warn("sn {}不存在",key); |
||||
} |
||||
|
||||
} |
@ -0,0 +1,53 @@
|
||||
package com.mh.common.utils; |
||||
|
||||
import com.mh.common.core.domain.entity.CollectionParamsManage; |
||||
import io.netty.buffer.ByteBuf; |
||||
import io.netty.channel.ChannelHandlerContext; |
||||
import lombok.extern.slf4j.Slf4j; |
||||
|
||||
/** |
||||
* @author ljf |
||||
* @title : |
||||
* @description : 发送指令工具类 |
||||
* @updateTime 2021-01-26 |
||||
* @throws : |
||||
*/ |
||||
@Slf4j |
||||
public class SendOrderUtils { |
||||
|
||||
// 发送所有类型采集报文
|
||||
public static void sendAllOrder(CollectionParamsManage paramsManage, ChannelHandlerContext ctx, int num, int size) { |
||||
// 开始创建指令
|
||||
String mtCode = paramsManage.getMtCode(); // 采集编号
|
||||
String funCode = paramsManage.getFuncCode(); // 功能码
|
||||
String registerAddr = paramsManage.getRegisterAddr(); // 寄存器地址
|
||||
String registerNum = String.valueOf(paramsManage.getRegisterSize()); // 寄存器数量
|
||||
// 拼接指令
|
||||
String sendOrderStr = ExchangeStringUtil.addZeroForNum(ExchangeStringUtil.decToHex(mtCode), 2) |
||||
+ ExchangeStringUtil.addZeroForNum(funCode, 2) |
||||
+ ExchangeStringUtil.addZeroForNum(registerAddr, 4) |
||||
+ ExchangeStringUtil.addZeroForNum(registerNum, 4); |
||||
byte[] strOrder = ExchangeStringUtil.hexStrToBinaryStr(sendOrderStr); |
||||
int checkNum = CRC16.CRC16_MODBUS(strOrder); |
||||
String checkWord = ExchangeStringUtil.decToHex(String.valueOf(checkNum)); |
||||
checkWord = checkWord.substring(2, 4) + checkWord.substring(0, 2); |
||||
sendOrderStr = sendOrderStr + checkWord; |
||||
ByteBuf buffer = getByteBuf(ctx, sendOrderStr); |
||||
// 发送数据
|
||||
ctx.channel().writeAndFlush(buffer); |
||||
log.info("sends :" + sendOrderStr + ",num:" + num + ",records:" + size); |
||||
try { |
||||
Thread.sleep(1000); |
||||
} catch (InterruptedException e) { |
||||
log.error("线程休眠异常", e); |
||||
} |
||||
} |
||||
|
||||
private static ByteBuf getByteBuf(ChannelHandlerContext ctx, String sendStr) { |
||||
// 申请一个数据结构存储信息
|
||||
ByteBuf buffer = ctx.alloc().buffer(); |
||||
// 将信息放入数据结构中
|
||||
buffer.writeBytes(ExchangeStringUtil.hexStrToBinaryStr(sendStr));//对接需要16进制
|
||||
return buffer; |
||||
} |
||||
} |
@ -0,0 +1,57 @@
|
||||
package com.mh.framework.netty; |
||||
|
||||
import io.netty.bootstrap.ServerBootstrap; |
||||
import io.netty.channel.ChannelFuture; |
||||
import io.netty.channel.ChannelOption; |
||||
import io.netty.channel.nio.NioEventLoopGroup; |
||||
import io.netty.channel.socket.nio.NioServerSocketChannel; |
||||
import lombok.extern.slf4j.Slf4j; |
||||
|
||||
@Slf4j |
||||
public class EchoServer { |
||||
|
||||
private final int port; |
||||
|
||||
public EchoServer(int port) { |
||||
this.port = port; |
||||
} |
||||
|
||||
public void start() { |
||||
// 创建EventLoopGroup
|
||||
NioEventLoopGroup bossGroup = new NioEventLoopGroup(); |
||||
NioEventLoopGroup workerGroup = new NioEventLoopGroup(); |
||||
|
||||
ServerBootstrap serverBootstrap = new ServerBootstrap(); |
||||
serverBootstrap.group(bossGroup, workerGroup) |
||||
.channel(NioServerSocketChannel.class) |
||||
.localAddress(port) |
||||
.option(ChannelOption.SO_BACKLOG, 1204) |
||||
.childHandler(new ServerChannelInitializer()); |
||||
|
||||
// 异步绑定端口
|
||||
ChannelFuture channelFuture = serverBootstrap.bind(); |
||||
|
||||
// 添加监听器处理绑定结果
|
||||
channelFuture.addListener(future -> { |
||||
if (future.isSuccess()) { |
||||
log.info("服务器启动成功,开始监听端口: {}", port); |
||||
} else { |
||||
log.error("服务器启动失败,端口: {}", port, future.cause()); |
||||
bossGroup.shutdownGracefully(); // 绑定失败立即关闭资源
|
||||
workerGroup.shutdownGracefully(); |
||||
} |
||||
}); |
||||
|
||||
// ❌ 移除 sync() 阻塞调用
|
||||
// channelFuture.channel().closeFuture().sync(); --> 删除这行
|
||||
|
||||
// 可选:添加 JVM 关闭钩子优雅关闭资源
|
||||
Runtime.getRuntime().addShutdownHook(new Thread(() -> { |
||||
log.info("JVM 正在关闭,准备释放 Netty 资源..."); |
||||
bossGroup.shutdownGracefully(); |
||||
workerGroup.shutdownGracefully(); |
||||
log.info("Netty 资源已释放"); |
||||
})); |
||||
} |
||||
|
||||
} |
@ -0,0 +1,336 @@
|
||||
package com.mh.framework.netty; |
||||
|
||||
import com.alibaba.fastjson2.JSONArray; |
||||
import com.alibaba.fastjson2.JSONObject; |
||||
import com.mh.common.constant.Constants; |
||||
import com.mh.common.core.domain.entity.CollectionParamsManage; |
||||
import com.mh.common.core.domain.entity.SysDictData; |
||||
import com.mh.common.core.redis.RedisCache; |
||||
import com.mh.common.model.request.AdvantechDatas; |
||||
import com.mh.common.model.request.AdvantechReceiver; |
||||
import com.mh.common.utils.*; |
||||
import com.mh.common.utils.spring.SpringUtils; |
||||
import com.mh.framework.netty.session.ServerSession; |
||||
import com.mh.framework.netty.session.SessionMap; |
||||
import com.mh.framework.netty.task.CallbackTask; |
||||
//import com.mh.framework.netty.task.CallbackTaskScheduler;
|
||||
import com.mh.framework.netty.task.CallbackTaskScheduler; |
||||
import com.mh.framework.rabbitmq.producer.SendMsgByTopic; |
||||
import com.mh.system.service.device.ICollectionParamsManageService; |
||||
import com.mh.system.service.device.IGatewayManageService; |
||||
import io.netty.buffer.ByteBuf; |
||||
import io.netty.channel.ChannelHandlerContext; |
||||
import io.netty.channel.ChannelInboundHandlerAdapter; |
||||
import io.netty.handler.timeout.IdleState; |
||||
import io.netty.handler.timeout.IdleStateEvent; |
||||
import io.netty.util.ReferenceCountUtil; |
||||
import lombok.extern.slf4j.Slf4j; |
||||
|
||||
import java.math.BigDecimal; |
||||
import java.util.ArrayList; |
||||
import java.util.Date; |
||||
import java.util.List; |
||||
|
||||
@Slf4j |
||||
public class EchoServerHandler extends ChannelInboundHandlerAdapter { |
||||
|
||||
// 调用service层的接口信息
|
||||
IGatewayManageService gatewayManageService = SpringUtils.getBean(IGatewayManageService.class); |
||||
ICollectionParamsManageService collectionParamsManageService = SpringUtils.getBean(ICollectionParamsManageService.class); |
||||
SendMsgByTopic sendMsgByTopic = SpringUtils.getBean(SendMsgByTopic.class); |
||||
RedisCache redisCache = SpringUtils.getBean(RedisCache.class); |
||||
|
||||
/** |
||||
* 空闲次数 |
||||
*/ |
||||
private int idleCount = 1; |
||||
private int count = 0; |
||||
private List<String> orderList; |
||||
private int num = 0; |
||||
private int size = 0; |
||||
private String IP; |
||||
private String port; |
||||
private String receiveStr = ""; |
||||
private List<CollectionParamsManage> deviceCodeParamList; |
||||
|
||||
/** |
||||
* 客户端连接会触发 |
||||
*/ |
||||
@Override |
||||
public void channelActive(ChannelHandlerContext ctx) throws Exception { |
||||
log.info("Channel active......"); |
||||
} |
||||
|
||||
/** |
||||
* 超时处理 |
||||
* 如果120秒没有接受客户端的心跳,就触发; |
||||
* 如果超过3次,则直接关闭; |
||||
*/ |
||||
@Override |
||||
public void userEventTriggered(ChannelHandlerContext ctx, Object obj) throws Exception { |
||||
if (obj instanceof IdleStateEvent) { |
||||
IdleStateEvent event = (IdleStateEvent) obj; |
||||
if (IdleState.READER_IDLE.equals(event.state())) { //如果读通道处于空闲状态,说明没有接收到心跳命令
|
||||
log.info("第{}已经40秒没有接收到客户端的信息了", idleCount); |
||||
receiveStr = ""; |
||||
num = num + 1; |
||||
if (num > size - 1) { |
||||
num = 0; |
||||
// // 关闭连接
|
||||
// ctx.close();
|
||||
// 继续发送下一个采集指令
|
||||
SendOrderUtils.sendAllOrder(deviceCodeParamList.get(num),ctx,num,size); |
||||
} else { |
||||
// 继续发送下一个采集指令
|
||||
SendOrderUtils.sendAllOrder(deviceCodeParamList.get(num), ctx, num, size); |
||||
} |
||||
} |
||||
} else { |
||||
super.userEventTriggered(ctx, obj); |
||||
} |
||||
} |
||||
|
||||
// 对于每一个传入的消息都要被调用
|
||||
@Override |
||||
public void channelRead(ChannelHandlerContext ctx, Object msg) { |
||||
try { |
||||
//接收到服务端发来的数据进行业务处理
|
||||
ByteBuf buf = (ByteBuf) msg; |
||||
byte[] bytes = new byte[buf.readableBytes()]; |
||||
buf.readBytes(bytes);//复制内容到字节数组bytes
|
||||
buf.clear(); |
||||
// 截取IP地址
|
||||
IP = ExchangeStringUtil.getMidString(ctx.channel().remoteAddress() + "", "/", ":"); |
||||
// 截取端口号
|
||||
port = ExchangeStringUtil.getMidString(ctx.channel().remoteAddress() + "", ":", ""); |
||||
if (bytes.length <= 1024) { |
||||
//将接收到的数据转为字符串,此字符串就是客户端发送的字符串
|
||||
receiveStr = receiveStr + ExchangeStringUtil.bytesToHexString(bytes);//将接收到的数据转为字符串,此字符串就是客户端发送的字符串
|
||||
receiveStr = receiveStr.replace("null", ""); //去null
|
||||
receiveStr = receiveStr.replace(" ", ""); //去空格
|
||||
//log.info("channelRead接收到的数据:" + receiveStr + ",length:" + receiveStr.length());
|
||||
} |
||||
} catch (Exception e) { |
||||
log.error("channelRead异常", e); |
||||
} finally { |
||||
ReferenceCountUtil.release(msg); |
||||
} |
||||
} |
||||
|
||||
// 当前批量读取中的最后一条消息
|
||||
@Override |
||||
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { |
||||
//心跳包报文: 24 00 60 95
|
||||
receiveStr = receiveStr.toUpperCase();//返回值全部变成大写
|
||||
log.info("channelReadComplete接收到的数据{}, 长度: ===> {}", receiveStr, receiveStr.length()); |
||||
//心跳包处理
|
||||
if ((receiveStr.length() == 8) && receiveStr.startsWith("24")) { |
||||
// if ((receiveStr.length() == 8) && receiveStr.startsWith("C0A801FE")) {
|
||||
log.info("接收到心跳包 ===> {}", receiveStr); |
||||
// 开始进行会话保存
|
||||
dealSession(ctx); |
||||
idleCount = 1; |
||||
port = receiveStr.substring(4, 8);//心跳包包含网关端口(自己定义返回心跳包)
|
||||
// 更新对应的网关在线情况
|
||||
gatewayManageService.updateGatewayStatus(receiveStr); |
||||
//根据端口或者IP或者心跳包查询网关对应的项目名称
|
||||
// 生成采集指令
|
||||
if (!SpringUtils.getBean(RedisCache.class).hasKey(receiveStr)) { |
||||
collectionParamsManageService.createDtuCollectionParams(); |
||||
} |
||||
JSONArray arrayCache = SpringUtils.getBean(RedisCache.class).getCacheObject(receiveStr); |
||||
if (StringUtils.isNotNull(arrayCache)) { |
||||
deviceCodeParamList = arrayCache.toList(CollectionParamsManage.class); |
||||
} |
||||
size = deviceCodeParamList.size(); |
||||
log.info("deviceCodeParam size ===> {}", size); |
||||
// 清空receiveStr
|
||||
receiveStr = ""; |
||||
num = 0; |
||||
// 发送采集报文
|
||||
if (size > 0) { |
||||
if (idleCount < 2) { |
||||
Thread.sleep(200); |
||||
SendOrderUtils.sendAllOrder(deviceCodeParamList.get(num), ctx, num, size); |
||||
idleCount++; |
||||
} else { |
||||
ctx.channel().close(); |
||||
} |
||||
} else { |
||||
log.info("gateway not find deviceCodeParam!"); |
||||
} |
||||
} else if (receiveStr.length() == 18) { |
||||
// 水电表、热泵设置返回数据解析
|
||||
idleCount = 1; |
||||
log.info("水电表、热泵设置接收==>{},长度:{}", receiveStr, receiveStr.length()); |
||||
nextSendOrder(ctx); |
||||
} else if (receiveStr.length() == 12 || receiveStr.length() == 14) { |
||||
// 热泵返回数据解析
|
||||
idleCount = 1; |
||||
log.info("热泵读取接收===>{},长度:{}", receiveStr, receiveStr.length()); |
||||
nextSendOrder(ctx); |
||||
} else if (receiveStr.length() == 16) { |
||||
// 热泵设置指令返回
|
||||
// 判断是否有指令发送
|
||||
if (redisCache.hasKey("order_send") && redisCache.getCacheObject("order_send").equals(receiveStr)) { |
||||
NettyTools.setReceiveMsg("order_wait", receiveStr); |
||||
} |
||||
nextSendOrder(ctx); |
||||
} else if (receiveStr.length() > 50 && receiveStr.length() < 100) { |
||||
idleCount = 1; |
||||
// 清空receiveStr
|
||||
nextSendOrder(ctx); |
||||
} |
||||
ctx.flush(); |
||||
} |
||||
|
||||
private void dealSession(ChannelHandlerContext ctx) { |
||||
// 获取表号
|
||||
String deviceCode =receiveStr; |
||||
String meterNum = deviceCode; |
||||
deviceCode = deviceCode + ctx.channel().remoteAddress(); |
||||
//新的session的创建
|
||||
ServerSession session = new ServerSession(ctx.channel(), deviceCode); |
||||
|
||||
//进行登录逻辑处理,异步进行处理。并且需要知道 处理的结果。 callbacktask就要
|
||||
//派上用场了
|
||||
String finalDeviceCode = deviceCode; |
||||
CallbackTaskScheduler.add(new CallbackTask<Boolean>() { |
||||
@Override |
||||
public Boolean execute() throws Exception { |
||||
//进行 login 逻辑的处理
|
||||
return action(session, finalDeviceCode, ctx); |
||||
} |
||||
//没有异常的话,我们进行处理
|
||||
@Override |
||||
public void onBack(Boolean result) { |
||||
if(result) { |
||||
log.info("设备保存会话: 设备号 = " + session.getSessionId()); |
||||
//ctx.pipeline().remove(LoginRequestHandler.class); //压测需要放开
|
||||
} else { |
||||
log.info("设备刷新会话: 设备号 = " + session.getSessionId()); |
||||
SessionMap.inst().updateSession(finalDeviceCode ,session, meterNum); |
||||
//log.info("设备登录失败: 设备号 = " + session.getSessionId());
|
||||
//ServerSession.closeSession(ctx);
|
||||
// 假如说已经在会话中了,直接断开连接
|
||||
//ctx.close();
|
||||
} |
||||
} |
||||
//有异常的话,我们进行处理
|
||||
@Override |
||||
public void onException(Throwable t) { |
||||
log.info("设备登录异常: 设备号 = " + session.getSessionId()); |
||||
ServerSession.closeSession(ctx); |
||||
} |
||||
}); |
||||
} |
||||
|
||||
private void nextSendOrder(ChannelHandlerContext ctx) throws InterruptedException { |
||||
// 发送指令响应不用解析
|
||||
if (receiveStr.length() != 16) { |
||||
// 解析采集的报文,并保存到数据库
|
||||
analysisReceiveData(receiveStr, deviceCodeParamList.get(num)); |
||||
} |
||||
// 清空receiveStr
|
||||
receiveStr = ""; |
||||
// 判断发送的下标,如果不等于指令数组大小
|
||||
num = num + 1; |
||||
if (num > size - 1) { |
||||
num = 0; |
||||
Thread.sleep(1000); |
||||
// 继续发送下一个采集指令
|
||||
SendOrderUtils.sendAllOrder(deviceCodeParamList.get(num), ctx, num, size); |
||||
log.info("------一轮采集完成,继续下一轮--------"); |
||||
} else { |
||||
// 添加一个状态值,判断是否继续发送指令 update by ljf on 2020-08-07
|
||||
if (Constants.WEB_FLAG) { |
||||
num = 0; |
||||
// 关闭连接
|
||||
receiveStr = null; |
||||
ctx.close(); |
||||
} else { |
||||
Thread.sleep(1000); |
||||
// 继续发送下一个采集指令
|
||||
SendOrderUtils.sendAllOrder(deviceCodeParamList.get(num), ctx, num, size); |
||||
} |
||||
} |
||||
} |
||||
|
||||
private void analysisReceiveData(final String receiveStr, final CollectionParamsManage deviceCodeParamEntity) { |
||||
AnalysisReceiveOrder485 analysisReceiveOrder485 = new AnalysisReceiveOrder485(); |
||||
String analysisData = ""; |
||||
switch (deviceCodeParamEntity.getParamType()) { |
||||
case "16" -> |
||||
// 电表
|
||||
analysisData = analysisReceiveOrder485.analysisMeterOrder485(receiveStr, deviceCodeParamEntity); |
||||
case "18" -> |
||||
// 水表
|
||||
analysisData = analysisReceiveOrder485.analysisWaterOrder485(receiveStr, deviceCodeParamEntity); |
||||
case "5" -> |
||||
// 热泵故障报警
|
||||
analysisData = analysisReceiveOrder485.analysisHeatPumpOrder485(receiveStr, deviceCodeParamEntity); |
||||
case "2" -> |
||||
// 热泵启停控制
|
||||
analysisData = analysisReceiveOrder485.analysisHeatPumpOrder485(receiveStr, deviceCodeParamEntity); |
||||
case "12" -> |
||||
// 热泵实际温度
|
||||
analysisData = analysisReceiveOrder485.analysisHeatPumpOrder485(receiveStr, deviceCodeParamEntity); |
||||
case "14" -> |
||||
// 热泵读取温度设置
|
||||
analysisData = analysisReceiveOrder485.analysisHeatPumpOrder485(receiveStr, deviceCodeParamEntity); |
||||
default -> { |
||||
log.info("设备类型错误"); |
||||
return; |
||||
} |
||||
} |
||||
if (analysisData.isEmpty()) { |
||||
log.info("解析数据为空"); |
||||
return; |
||||
} |
||||
// 格式化数据,配置成研华网关 AdvantechReceiver
|
||||
AdvantechReceiver advantechReceiver = new AdvantechReceiver(); |
||||
advantechReceiver.setTs(DateUtils.dateToString(new Date(), Constants.DATE_FORMAT)); |
||||
List<AdvantechDatas> advantechDatas = new ArrayList<>(); |
||||
AdvantechDatas datas = new AdvantechDatas(); |
||||
datas.setValue(new BigDecimal(analysisData)); |
||||
datas.setTag(deviceCodeParamEntity.getOtherName()); |
||||
advantechDatas.add(datas); |
||||
advantechReceiver.setD(advantechDatas); |
||||
sendMsgByTopic.sendToDeviceMQ(JSONObject.toJSONString(advantechReceiver)); |
||||
} |
||||
|
||||
// 异常捕捉
|
||||
@Override |
||||
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { |
||||
cause.getCause().printStackTrace(); |
||||
log.info("异常捕捉,执行ctx.close" + cause.getCause()); |
||||
ctx.close(); // 关闭该Channel
|
||||
} |
||||
|
||||
// 客户端断开
|
||||
@Override |
||||
public void channelInactive(ChannelHandlerContext ctx) throws Exception { |
||||
ctx.close();// 关闭流
|
||||
log.info("客户端断开,执行ctx.close()......"); |
||||
} |
||||
|
||||
private boolean action(ServerSession session, String deviceCode, ChannelHandlerContext ctx) { |
||||
//user验证
|
||||
boolean isValidUser = checkUser(deviceCode,session); |
||||
session.bind(); |
||||
return true; |
||||
} |
||||
|
||||
private boolean checkUser(String deviceCode,ServerSession session) { |
||||
//当前用户已经登录
|
||||
if(SessionMap.inst().hasLogin(deviceCode)) { |
||||
log.info("设备已经登录: 设备号 = " + deviceCode); |
||||
return false; |
||||
} |
||||
//一般情况下,我们会将 user存储到 DB中,然后对user的用户名和密码进行校验
|
||||
//但是,我们这边没有进行db的集成,所以我们想一个别的办法进行user的校验。在我们的sessionMap进行以下校验
|
||||
//为什么选sessionmap,因为我们user的会话,都是存储到sessionmap中的,sessionmap中只要有这个user的会话,说明就是ok的
|
||||
return true; |
||||
} |
||||
|
||||
} |
@ -0,0 +1,18 @@
|
||||
package com.mh.framework.netty; |
||||
|
||||
import com.mh.common.core.domain.entity.OrderEntity; |
||||
|
||||
import java.util.List; |
||||
|
||||
/** |
||||
* @author LJF |
||||
* @version 1.0 |
||||
* @project EEMCS |
||||
* @description netty |
||||
* @date 2025-06-06 15:13:06 |
||||
*/ |
||||
public interface INettyService { |
||||
|
||||
boolean sendOrder(List<OrderEntity> changeValues); |
||||
|
||||
} |
@ -0,0 +1,119 @@
|
||||
package com.mh.framework.netty; |
||||
|
||||
import com.mh.common.core.domain.AjaxResult; |
||||
import com.mh.common.core.domain.entity.CollectionParamsManage; |
||||
import com.mh.common.core.domain.entity.GatewayManage; |
||||
import com.mh.common.core.domain.entity.OrderEntity; |
||||
import com.mh.common.core.redis.RedisCache; |
||||
import com.mh.common.core.redis.RedisLock; |
||||
import com.mh.common.utils.ModbusUtils; |
||||
import com.mh.common.utils.NettyTools; |
||||
import com.mh.common.utils.StringUtils; |
||||
import com.mh.framework.netty.session.ServerSession; |
||||
import com.mh.framework.netty.session.SessionMap; |
||||
import com.mh.system.mapper.device.CollectionParamsManageMapper; |
||||
import com.mh.system.mapper.device.GatewayManageMapper; |
||||
import jakarta.annotation.Resource; |
||||
import lombok.extern.slf4j.Slf4j; |
||||
import org.springframework.stereotype.Service; |
||||
|
||||
import java.util.List; |
||||
import java.util.Map; |
||||
import java.util.Set; |
||||
import java.util.UUID; |
||||
import java.util.concurrent.ConcurrentHashMap; |
||||
import java.util.concurrent.TimeUnit; |
||||
|
||||
/** |
||||
* @author LJF |
||||
* @version 1.0 |
||||
* @project EEMCS |
||||
* @description netty实现类 |
||||
* @date 2025-06-06 15:13:23 |
||||
*/ |
||||
@Slf4j |
||||
@Service |
||||
public class NettyServiceImpl implements INettyService { |
||||
|
||||
@Resource |
||||
private CollectionParamsManageMapper collectionParamsManageMapper; |
||||
|
||||
@Resource |
||||
private GatewayManageMapper gatewayManageMapper; |
||||
|
||||
@Resource |
||||
private RedisCache redisCache; |
||||
|
||||
@Resource |
||||
private RedisLock redisLock; |
||||
|
||||
@Override |
||||
public boolean sendOrder(List<OrderEntity> changeValues) { |
||||
for (OrderEntity changeValue : changeValues) { |
||||
String cpmId = changeValue.getId(); |
||||
CollectionParamsManage collectionParamsManage = collectionParamsManageMapper.selectById(cpmId); |
||||
if (null == collectionParamsManage) { |
||||
return false; |
||||
} |
||||
GatewayManage gatewayManage = gatewayManageMapper.selectById(collectionParamsManage.getGatewayId()); |
||||
if (null == gatewayManage || StringUtils.isEmpty(gatewayManage.getHeartBeat())) { |
||||
return false; |
||||
} |
||||
ConcurrentHashMap<String, ServerSession> map = SessionMap.inst().getMap(); |
||||
Set<Map.Entry<String, ServerSession>> entries = map.entrySet(); |
||||
boolean flag = false; |
||||
String keyVal = null; |
||||
for (Map.Entry<String, ServerSession> entry : entries) { |
||||
String key = entry.getKey(); |
||||
if (key.contains(gatewayManage.getHeartBeat())){ |
||||
flag = true; |
||||
keyVal = key; |
||||
break; |
||||
} |
||||
} |
||||
if (flag) { |
||||
ServerSession serverSession = map.get(keyVal); |
||||
// 目前只有DTU,modbus方式,只创建modbus先
|
||||
String controlCode = ModbusUtils.createControlCode(collectionParamsManage.getMtCode(), |
||||
changeValue.getType(), |
||||
collectionParamsManage.getRegisterAddr(), |
||||
changeValue.getParam()); |
||||
if (StringUtils.isEmpty(controlCode)) { |
||||
log.error("创建控制码失败"); |
||||
return false; |
||||
} |
||||
|
||||
String requestId = UUID.randomUUID().toString(); // 唯一标识当前请求
|
||||
String lockKey = "lock:order_send:" + gatewayManage.getHeartBeat(); // 按网关分锁
|
||||
|
||||
try { |
||||
if (!redisLock.tryLock(lockKey, requestId, 10, 10)) { |
||||
log.warn("获取锁失败,当前操作繁忙"); |
||||
return false; |
||||
} |
||||
// 初始化发送指令
|
||||
NettyTools.initReceiveMsg("order_wait"); |
||||
// 设置缓存,方便在netty中判断发送的指令
|
||||
redisCache.setCacheObject("order_send", controlCode, 10, TimeUnit.SECONDS); |
||||
// 发送指令
|
||||
serverSession.getChannel().writeAndFlush(ModbusUtils.createByteBuf(controlCode)); |
||||
// 等待指令
|
||||
if (NettyTools.waitReceiveMsg("order_wait")) { |
||||
log.error("发送指令成功,心跳包:{}", gatewayManage.getHeartBeat()); |
||||
return true; |
||||
} else { |
||||
log.error("发送指令异常,心跳包:{}", gatewayManage.getHeartBeat()); |
||||
return false; |
||||
} |
||||
} catch (InterruptedException e) { |
||||
log.error("发送指令异常", e); |
||||
} finally { |
||||
redisLock.unlock(lockKey, requestId); |
||||
} |
||||
} |
||||
log.error("当前设备不在线,心跳包:{}",gatewayManage.getHeartBeat()); |
||||
return false; |
||||
} |
||||
return false; |
||||
} |
||||
} |
@ -0,0 +1,33 @@
|
||||
package com.mh.framework.netty; |
||||
|
||||
import io.netty.channel.ChannelInitializer; |
||||
import io.netty.channel.ChannelPipeline; |
||||
import io.netty.channel.socket.SocketChannel; |
||||
import io.netty.handler.timeout.IdleStateHandler; |
||||
|
||||
import java.util.concurrent.TimeUnit; |
||||
|
||||
public class ServerChannelInitializer extends ChannelInitializer<SocketChannel>{ |
||||
|
||||
@Override |
||||
protected void initChannel(SocketChannel socketChannel) throws Exception { |
||||
ChannelPipeline pipeline = socketChannel.pipeline(); |
||||
|
||||
/* LineBasedFrameDecoder的工作原理是:依次遍历ByteBuf中的可读字节, |
||||
判断看其是否有”\n” 或 “\r\n”, 如果有就以此位置为结束位置。 |
||||
从可读索引到结束位置的区间的字节就组成了一行。 它是以换行符为结束标志的解码器, |
||||
支持携带结束符和不带结束符两种解码方式,同时支持配置单行的最大长度, |
||||
如果读到了最大长度之后仍然没有发现换行符,则抛出异常,同时忽略掉之前读到的异常码流。*/ |
||||
// pipeline.addLast(new LineBasedFrameDecoder(10010));
|
||||
//字符串解码和编码
|
||||
//LineBasedFrameDecoder + StringDecoder 就是一个按行切换的文本解码器。
|
||||
// pipeline.addLast( new StringDecoder());
|
||||
// pipeline.addLast( new StringEncoder());
|
||||
// 设置读写超时操作
|
||||
// 入参说明: 读超时时间、写超时时间、所有类型的超时时间、时间格式
|
||||
pipeline.addLast(new IdleStateHandler(40, 40, 40, TimeUnit.SECONDS)); |
||||
//服务器的逻辑
|
||||
pipeline.addLast("handler", new EchoServerHandler()); |
||||
} |
||||
|
||||
} |
@ -0,0 +1,66 @@
|
||||
package com.mh.framework.netty.session; |
||||
|
||||
import io.netty.channel.Channel; |
||||
import io.netty.channel.ChannelFuture; |
||||
import io.netty.channel.ChannelFutureListener; |
||||
import io.netty.channel.ChannelHandlerContext; |
||||
import io.netty.util.AttributeKey; |
||||
import lombok.Data; |
||||
import lombok.extern.slf4j.Slf4j; |
||||
|
||||
@Data |
||||
@Slf4j |
||||
public class ServerSession { |
||||
public static final AttributeKey<ServerSession> SESSION_KEY = |
||||
AttributeKey.valueOf("SESSION_KEY"); |
||||
//通道
|
||||
private Channel channel; |
||||
private final String sessionId; |
||||
private boolean isLogin = false; |
||||
|
||||
public ServerSession(Channel channel, String deviceCode){ |
||||
this.channel = channel; |
||||
this.sessionId = deviceCode; |
||||
} |
||||
|
||||
//session需要和通道进行一定的关联,他是在构造函数中关联上的;
|
||||
//session还需要通过sessionkey和channel进行再次的关联;channel.attr方法.set当前的
|
||||
// serverSession
|
||||
//session需要被添加到我们的SessionMap中
|
||||
public void bind(){ |
||||
log.info("server Session 会话进行绑定 :" + channel.remoteAddress()); |
||||
channel.attr(SESSION_KEY).set(this); |
||||
SessionMap.inst().addSession(sessionId, this); |
||||
this.isLogin = true; |
||||
} |
||||
|
||||
//通过channel获取session
|
||||
public static ServerSession getSession(ChannelHandlerContext ctx){ |
||||
Channel channel = ctx.channel(); |
||||
return channel.attr(SESSION_KEY).get(); |
||||
} |
||||
|
||||
//关闭session,新增返回一个meterNum用于纪录设备下线时间2024-05-08
|
||||
public static String closeSession(ChannelHandlerContext ctx){ |
||||
String meterNum = null; |
||||
ServerSession serverSession = ctx.channel().attr(SESSION_KEY).get(); |
||||
if(serverSession != null && serverSession.getSessionId() != null) { |
||||
ChannelFuture future = serverSession.channel.close(); |
||||
future.addListener((ChannelFutureListener) future1 -> { |
||||
if(!future1.isSuccess()) { |
||||
log.info("Channel close error!"); |
||||
} |
||||
}); |
||||
ctx.close(); |
||||
meterNum = serverSession.sessionId; |
||||
SessionMap.inst().removeSession(serverSession.sessionId); |
||||
log.info(ctx.channel().remoteAddress()+" "+serverSession.sessionId + "==>移除会话"); |
||||
} |
||||
return meterNum; |
||||
} |
||||
|
||||
//写消息
|
||||
public void writeAndFlush(Object msg) { |
||||
channel.writeAndFlush(msg); |
||||
} |
||||
} |
@ -0,0 +1,96 @@
|
||||
package com.mh.framework.netty.session; |
||||
|
||||
import lombok.Data; |
||||
import lombok.extern.slf4j.Slf4j; |
||||
|
||||
import java.util.Iterator; |
||||
import java.util.List; |
||||
import java.util.Map; |
||||
import java.util.concurrent.ConcurrentHashMap; |
||||
import java.util.stream.Collectors; |
||||
|
||||
@Data |
||||
@Slf4j |
||||
public class SessionMap { |
||||
|
||||
private ThreadLocal<Boolean> sceneThreadLocal = new ThreadLocal<>(); |
||||
|
||||
//用单例模式进行sessionMap的创建
|
||||
private SessionMap(){} |
||||
|
||||
private static SessionMap singleInstance = new SessionMap(); |
||||
|
||||
public static SessionMap inst() { |
||||
return singleInstance; |
||||
} |
||||
|
||||
//进行会话的保存
|
||||
//key 我们使用 sessionId;value 需要是 serverSession
|
||||
private ConcurrentHashMap<String, ServerSession> map = new ConcurrentHashMap<>(256); |
||||
//添加session
|
||||
public void addSession(String sessionId, ServerSession s) { |
||||
map.put(sessionId, s); |
||||
log.info("IP地址:"+s.getChannel().remoteAddress()+" "+ sessionId + " 表具上线,总共表具:" + map.size()); |
||||
} |
||||
|
||||
//删除session
|
||||
public void removeSession(String sessionId) { |
||||
if(map.containsKey(sessionId)) { |
||||
ServerSession s = map.get(sessionId); |
||||
map.remove(sessionId); |
||||
log.info("设备id下线:{},在线设备:{}", s.getSessionId(), map.size() ); |
||||
} |
||||
return; |
||||
} |
||||
|
||||
public boolean hasLogin(String sessionId) { |
||||
Iterator<Map.Entry<String, ServerSession>> iterator = map.entrySet().iterator(); |
||||
while(iterator.hasNext()) { |
||||
Map.Entry<String, ServerSession> next = iterator.next(); |
||||
if(sessionId != null && sessionId.equalsIgnoreCase(next.getValue().getSessionId())) { |
||||
return true ; |
||||
} |
||||
} |
||||
return false; |
||||
} |
||||
|
||||
//如果在线,肯定有sessionMap里保存的 serverSession
|
||||
//如果不在线,serverSession也没有。用这个来判断是否在线
|
||||
public List<ServerSession> getSessionBy(String sessionId) { |
||||
return map.values().stream(). |
||||
filter(s -> s.getSessionId().equals(sessionId)). |
||||
collect(Collectors.toList()); |
||||
} |
||||
|
||||
public boolean getScene() { |
||||
return sceneThreadLocal.get(); |
||||
} |
||||
|
||||
public void initScene(Boolean status) { |
||||
if (sceneThreadLocal == null) { |
||||
log.info("======创建ThreadLocal======"); |
||||
sceneThreadLocal = new ThreadLocal<>(); |
||||
} |
||||
log.info("设置状态==>" + status); |
||||
sceneThreadLocal.set(status); |
||||
} |
||||
|
||||
public void clearScene() { |
||||
initScene(null); |
||||
sceneThreadLocal.remove(); |
||||
} |
||||
|
||||
public void updateSession(String sessionId, ServerSession session, String meterNum) { |
||||
Iterator<Map.Entry<String, ServerSession>> iterator = map.entrySet().iterator(); |
||||
while(iterator.hasNext()) { |
||||
Map.Entry<String, ServerSession> next = iterator.next(); |
||||
if (next.getKey().contains(meterNum)){ |
||||
iterator.remove(); |
||||
} |
||||
if(sessionId != null && sessionId.equalsIgnoreCase(next.getValue().getSessionId())) { |
||||
next.setValue(session); |
||||
} |
||||
} |
||||
} |
||||
|
||||
} |
@ -0,0 +1,20 @@
|
||||
package com.mh.framework.netty.task; |
||||
|
||||
/** |
||||
* @author LJF |
||||
* @version 1.0 |
||||
* @project TAD_Server |
||||
* @description 回调任务 |
||||
* @date 2023/7/3 15:34:11 |
||||
*/ |
||||
public interface CallbackTask<T> { |
||||
T execute() throws Exception; |
||||
|
||||
/** |
||||
* // 执行没有 异常的情况下的 返回值
|
||||
* @param t |
||||
*/ |
||||
void onBack(T t); |
||||
|
||||
void onException(Throwable t); |
||||
} |
@ -0,0 +1,78 @@
|
||||
package com.mh.framework.netty.task; |
||||
|
||||
import com.google.common.util.concurrent.*; |
||||
|
||||
import java.util.concurrent.Callable; |
||||
import java.util.concurrent.ConcurrentLinkedQueue; |
||||
import java.util.concurrent.ExecutorService; |
||||
import java.util.concurrent.Executors; |
||||
|
||||
/** |
||||
* @author LJF |
||||
* @version 1.0 |
||||
* @project TAD_Server |
||||
* @description 回调任务 |
||||
* @date 2023/7/3 15:34:11 |
||||
*/ |
||||
public class CallbackTaskScheduler extends Thread { |
||||
private ConcurrentLinkedQueue<CallbackTask> executeTaskQueue = |
||||
new ConcurrentLinkedQueue<>(); |
||||
private long sleepTime = 1000 * 10; |
||||
private final ExecutorService pool = Executors.newCachedThreadPool(); |
||||
ListeningExecutorService lpool = MoreExecutors.listeningDecorator(pool); |
||||
private static CallbackTaskScheduler inst = new CallbackTaskScheduler(); |
||||
private CallbackTaskScheduler() { |
||||
this.start(); |
||||
} |
||||
//add task
|
||||
public static <T> void add(CallbackTask<T> executeTask) { |
||||
inst.executeTaskQueue.add(executeTask); |
||||
} |
||||
|
||||
@Override |
||||
public void run() { |
||||
while (true) { |
||||
handleTask(); |
||||
//为了避免频繁连接服务器,但是当前连接服务器过长导致失败
|
||||
//threadSleep(sleepTime);
|
||||
} |
||||
} |
||||
|
||||
private void threadSleep(long sleepTime) { |
||||
try { |
||||
Thread.sleep(sleepTime); |
||||
}catch (Exception e) { |
||||
e.printStackTrace(); |
||||
} |
||||
} |
||||
|
||||
//任务执行
|
||||
private void handleTask() { |
||||
CallbackTask executeTask = null; |
||||
while (executeTaskQueue.peek() != null) { |
||||
executeTask = executeTaskQueue.poll(); |
||||
handleTask(executeTask); |
||||
} |
||||
} |
||||
private <T> void handleTask(CallbackTask<T> executeTask) { |
||||
ListenableFuture<T> future = lpool.submit(new Callable<T>() { |
||||
public T call() throws Exception { |
||||
return executeTask.execute(); |
||||
} |
||||
}); |
||||
Futures.addCallback(future, new FutureCallback<T>() { |
||||
@Override |
||||
public void onSuccess(T t) { |
||||
executeTask.onBack(t); |
||||
} |
||||
|
||||
@Override |
||||
public void onFailure(Throwable throwable) { |
||||
executeTask.onException(throwable); |
||||
} |
||||
|
||||
|
||||
}, pool); |
||||
} |
||||
} |
||||
|
@ -0,0 +1,6 @@
|
||||
package com.mh.framework.netty.task; |
||||
|
||||
//不需要知道异步线程的 返回值
|
||||
public interface ExecuteTask { |
||||
void execute(); |
||||
} |
@ -0,0 +1,67 @@
|
||||
package com.mh.framework.netty.task; |
||||
|
||||
import java.util.concurrent.ConcurrentLinkedQueue; |
||||
import java.util.concurrent.ExecutorService; |
||||
import java.util.concurrent.Executors; |
||||
|
||||
/** |
||||
* @author LJF |
||||
* @version 1.0 |
||||
* @project TAD_Server |
||||
* @description 任务定时 |
||||
* @date 2023/7/3 15:34:11 |
||||
*/ |
||||
public class FutureTaskScheduler extends Thread{ |
||||
private ConcurrentLinkedQueue<ExecuteTask> executeTaskQueue = |
||||
new ConcurrentLinkedQueue<>(); |
||||
private long sleepTime = 200; |
||||
private ExecutorService pool = Executors.newFixedThreadPool(10); |
||||
private static FutureTaskScheduler inst = new FutureTaskScheduler(); |
||||
public FutureTaskScheduler() { |
||||
this.start(); |
||||
} |
||||
//任务添加
|
||||
public static void add(ExecuteTask executeTask) { |
||||
inst.executeTaskQueue.add(executeTask); |
||||
} |
||||
|
||||
@Override |
||||
public void run() { |
||||
while (true) { |
||||
handleTask(); |
||||
//threadSleep(sleepTime);
|
||||
} |
||||
} |
||||
|
||||
private void threadSleep(long sleepTime) { |
||||
try { |
||||
Thread.sleep(sleepTime); |
||||
} catch (InterruptedException e) { |
||||
e.printStackTrace(); |
||||
} |
||||
} |
||||
|
||||
//执行任务
|
||||
private void handleTask() { |
||||
ExecuteTask executeTask; |
||||
while (executeTaskQueue.peek() != null) { |
||||
executeTask = executeTaskQueue.poll(); |
||||
handleTask(executeTask); |
||||
} |
||||
//刷新心跳时间
|
||||
} |
||||
private void handleTask(ExecuteTask executeTask) { |
||||
pool.execute(new ExecuteRunnable(executeTask)); |
||||
} |
||||
|
||||
class ExecuteRunnable implements Runnable { |
||||
ExecuteTask executeTask; |
||||
public ExecuteRunnable(ExecuteTask executeTask) { |
||||
this.executeTask = executeTask; |
||||
} |
||||
@Override |
||||
public void run() { |
||||
executeTask.execute(); |
||||
} |
||||
} |
||||
} |
@ -0,0 +1,11 @@
|
||||
package com.mh.quartz.domain; |
||||
|
||||
/** |
||||
* @Classname FuzzyLevel |
||||
* Todo: |
||||
* @Date 2025-05-31 14:19 |
||||
* @Created by LJF |
||||
*/ |
||||
public enum FuzzyLevel { |
||||
NB, NM, NS, ZO, PS, PM, PB; // 极小,较小,小,零,稍大,较大,极大
|
||||
} |
@ -0,0 +1,52 @@
|
||||
package com.mh.quartz.domain; |
||||
|
||||
/** |
||||
* @author LJF |
||||
* @version 1.0 |
||||
* @project EEMCS |
||||
* @description PID参数 |
||||
* @date 2025-05-30 13:51:22 |
||||
*/ |
||||
public class PIDParams { |
||||
|
||||
private volatile double kp; // 比例系数
|
||||
private volatile double ki; // 积分系数
|
||||
private volatile double kd; // 微分系数
|
||||
|
||||
public PIDParams(double kp, double ki, double kd) { |
||||
this.kp = kp; |
||||
this.ki = ki; |
||||
this.kd = kd; |
||||
} |
||||
|
||||
// 动态更新PID参数
|
||||
public void updateParams(double kp, double ki, double kd) { |
||||
this.kp = kp; |
||||
this.ki = ki; |
||||
this.kd = kd; |
||||
} |
||||
|
||||
public double getKp() { |
||||
return kp; |
||||
} |
||||
|
||||
public void setKp(double kp) { |
||||
this.kp = kp; |
||||
} |
||||
|
||||
public double getKi() { |
||||
return ki; |
||||
} |
||||
|
||||
public void setKi(double ki) { |
||||
this.ki = ki; |
||||
} |
||||
|
||||
public double getKd() { |
||||
return kd; |
||||
} |
||||
|
||||
public void setKd(double kd) { |
||||
this.kd = kd; |
||||
} |
||||
} |
@ -0,0 +1,257 @@
|
||||
package com.mh.quartz.task; |
||||
|
||||
import com.mh.common.config.MHConfig; |
||||
import com.mh.common.core.domain.entity.CollectionParamsManage; |
||||
import com.mh.common.core.domain.entity.CpmSpaceRelation; |
||||
import com.mh.common.core.domain.entity.OrderEntity; |
||||
import com.mh.common.core.domain.entity.PolicyManage; |
||||
import com.mh.common.utils.DateUtils; |
||||
import com.mh.framework.mqtt.service.IMqttGatewayService; |
||||
import com.mh.quartz.util.AHUPIDControlUtil; |
||||
import com.mh.quartz.util.FuzzyPIDControlUtil; |
||||
import com.mh.system.service.device.ICollectionParamsManageService; |
||||
import com.mh.system.service.operation.IOperationDeviceService; |
||||
import com.mh.system.service.policy.IPolicyManageService; |
||||
import com.mh.system.service.space.ICpmSpaceRelationService; |
||||
import lombok.extern.slf4j.Slf4j; |
||||
import org.springframework.beans.factory.annotation.Autowired; |
||||
import org.springframework.beans.factory.annotation.Value; |
||||
import org.springframework.stereotype.Component; |
||||
|
||||
import java.math.BigDecimal; |
||||
import java.time.LocalTime; |
||||
import java.util.*; |
||||
import java.util.stream.Collectors; |
||||
|
||||
/** |
||||
* @author LJF |
||||
* @version 1.0 |
||||
* @project EEMCS |
||||
* @description 风柜系统任务 |
||||
* @date 2025-05-30 08:36:43 |
||||
*/ |
||||
@Slf4j |
||||
@Component("ahuTask") |
||||
public class AHUTask { |
||||
|
||||
@Value("${control.topic}") |
||||
String controlTopic; |
||||
|
||||
@Autowired |
||||
private MHConfig mhConfig; |
||||
|
||||
private final ICollectionParamsManageService collectionParamsManageService; |
||||
|
||||
private final IPolicyManageService policyManageService; |
||||
|
||||
private final ICpmSpaceRelationService cpmSpaceRelationService; |
||||
|
||||
private final IOperationDeviceService iOperationService; |
||||
|
||||
private final IMqttGatewayService iMqttGatewayService; |
||||
|
||||
// 在 AHUTask 类中添加一个 PID 控制器成员变量
|
||||
private final Map<String, FuzzyPIDControlUtil> pidControllers = new HashMap<>(); |
||||
|
||||
@Autowired |
||||
public AHUTask(ICollectionParamsManageService collectionParamsManageService, IPolicyManageService policyManageService, ICpmSpaceRelationService cpmSpaceRelationService, IOperationDeviceService iOperationService, IMqttGatewayService iMqttGatewayService) { |
||||
this.collectionParamsManageService = collectionParamsManageService; |
||||
this.policyManageService = policyManageService; |
||||
this.cpmSpaceRelationService = cpmSpaceRelationService; |
||||
this.iOperationService = iOperationService; |
||||
this.iMqttGatewayService = iMqttGatewayService; |
||||
} |
||||
|
||||
|
||||
public void sendOrderToMqtt(List<OrderEntity> changeValues) { |
||||
try { |
||||
String sendOrder = iOperationService.operationDevice(changeValues); |
||||
String name = mhConfig.getName(); |
||||
// 获取mqtt操作队列(后期通过mqtt队列配置发送主题)
|
||||
log.info("发送主题:{},消息:{}", name + "/" + controlTopic, sendOrder); |
||||
iMqttGatewayService.publish(name + "/" + controlTopic, sendOrder, 1); |
||||
} catch (Exception e) { |
||||
log.error("设备操作失败", e); |
||||
} |
||||
} |
||||
|
||||
public void adjustWaterValve(String kp, String ki, String kd) { |
||||
// 西餐走廊2、宴会走廊需要调整PID参数,其他的ddc自己已经处理好
|
||||
String[] deviceLedgerIds = new String[]{"ddc0083b3a898d85f3a1205a2d82071e100", "ddc0133b3a898d85f3a1205a2d82071e100"}; |
||||
for (String deviceLedgerId : deviceLedgerIds) { |
||||
// 获取西餐走廊2的启停控制
|
||||
HashMap<String, Object> queryMap = new HashMap<>(); |
||||
queryMap.put("systemType", "2"); |
||||
queryMap.put("deviceLedgerId", deviceLedgerId); |
||||
queryMap.put("isUse", 0); |
||||
// 得出 systemType =2 的数据
|
||||
List<CollectionParamsManage> collectionParamsManages = collectionParamsManageService.selectListByParams(queryMap); |
||||
|
||||
// 过滤得出启停状态
|
||||
Optional<CollectionParamsManage> first = collectionParamsManages.stream().filter(item -> item.getCurValue().intValue() == 1 && item.getParamType().equals("2")).findFirst(); |
||||
if (first.isEmpty()) { |
||||
continue; |
||||
} |
||||
|
||||
// 过滤获取回风温度设置值
|
||||
Optional<CollectionParamsManage> second = collectionParamsManages |
||||
.stream() |
||||
.filter(item -> item.getOtherName().contains("回风温度") |
||||
&& item.getParamType().equals("14")).findFirst(); |
||||
if (second.isEmpty()) { |
||||
continue; |
||||
} |
||||
// 得出回风温度设置值
|
||||
double backTempSet = second.get().getCurValue().doubleValue(); |
||||
|
||||
// 设定目标温度(夏季制冷24℃)
|
||||
// ✅ 如果没有该设备的控制器,则创建一个新的并保存起来
|
||||
FuzzyPIDControlUtil controller = pidControllers.computeIfAbsent(deviceLedgerId, k -> new FuzzyPIDControlUtil(kp, ki, kd)); |
||||
|
||||
log.info("开始模糊PID控制循环,查看对象是否有变化:{}", controller); |
||||
|
||||
// 过滤获取当前回风温度
|
||||
Optional<CollectionParamsManage> third = collectionParamsManages |
||||
.stream() |
||||
.filter(item -> item.getOtherName().contains("回风温度") |
||||
&& item.getParamType().equals("12")).findFirst(); |
||||
if (third.isEmpty()) { |
||||
continue; |
||||
} |
||||
// 得出当前回风温度
|
||||
double temp = third.get().getCurValue().doubleValue(); |
||||
|
||||
// 2. 计算水阀开度(时间间隔1秒)
|
||||
double valveOpening1 = controller.calculate(backTempSet, temp, 1); |
||||
int valveOpening = new BigDecimal(valveOpening1).intValue(); |
||||
|
||||
// 过滤获取水阀调节参数
|
||||
Optional<CollectionParamsManage> fourth = collectionParamsManages |
||||
.stream() |
||||
.filter(item -> item.getOtherName().contains("水阀调节") |
||||
&& item.getParamType().equals("3")).findFirst(); |
||||
if (fourth.isEmpty()) { |
||||
continue; |
||||
} |
||||
// 得出水阀调节参数
|
||||
CollectionParamsManage collectionParamsManage = fourth.get(); |
||||
// 发送控制指令
|
||||
if (valveOpening > 0 && valveOpening <= 100) { |
||||
// 开启
|
||||
List<OrderEntity> changeValues = new ArrayList<>(); |
||||
changeValues.add(new OrderEntity(collectionParamsManage.getId(), String.valueOf(valveOpening), Integer.parseInt(collectionParamsManage.getParamType()), collectionParamsManage.getOtherName())); |
||||
sendOrderToMqtt(changeValues); |
||||
// 3. 应用水阀开度(实际应用发送给执行机构)
|
||||
log.info("回风温度: {} ℃ | 水阀开度: {} % ", |
||||
temp, valveOpening); |
||||
} |
||||
|
||||
} |
||||
} |
||||
|
||||
public void startOrStopAHU() { |
||||
// 扫描启动了定时开关机的风机,根据当前时间判断是否需要启动或停止
|
||||
// systemType 2: 风柜系统
|
||||
HashMap<String, Object> queryMap = new HashMap<>(); |
||||
queryMap.put("systemType", "2"); |
||||
// 得出 systemType =2 的数据
|
||||
List<CollectionParamsManage> collectionParamsManages = collectionParamsManageService.selectListByParams(queryMap); |
||||
// 判断当前时间是星期几
|
||||
String dayOfWeekValue = DateUtils.dayOfWeekValue(); |
||||
// 过滤otherName包含dayOfWeekValue,paramType=29, curValue=1的数据,代表已经启用定时开关机的功能
|
||||
List<CollectionParamsManage> needStartOrStopDataList = collectionParamsManages |
||||
.stream() |
||||
.filter(item -> item.getOtherName().contains(dayOfWeekValue) |
||||
&& item.getParamType().equals("29") |
||||
&& item.getCurValue().intValue() == 1) |
||||
.toList(); |
||||
// 查询得出对应的houseId
|
||||
List<PolicyManage> policyManageList = policyManageService.selectListByCpmIds(needStartOrStopDataList); |
||||
|
||||
// 开始:根据houseId查询出对应的风机启停id
|
||||
List<CpmSpaceRelation> cpmSpaceRelationList = cpmSpaceRelationService.selectListByHouseId(policyManageList); |
||||
// collectionParamsManages过滤出能够开启风机的点位,paramType=2,isUse=0
|
||||
List<CollectionParamsManage> startDeviceList = collectionParamsManages |
||||
.stream() |
||||
.filter(item -> item.getParamType().equals("2") |
||||
&& item.getIsUse() == 0) |
||||
.toList(); |
||||
// 结束:根据houseId查询出对应的风机启停id
|
||||
|
||||
// 在拼接出启用定时开关机的启动时间、关闭时间
|
||||
Map<String, List<PolicyManage>> groupedByHouseId = policyManageList.stream() |
||||
.collect(Collectors.groupingBy( |
||||
PolicyManage::getHouseId, |
||||
Collectors.toList() |
||||
)); |
||||
// groupedByHouseId for 循环遍历
|
||||
for (Map.Entry<String, List<PolicyManage>> entry : groupedByHouseId.entrySet()) { |
||||
// 得出houseId
|
||||
String houseId = entry.getKey(); |
||||
// 得出policyManageList
|
||||
List<PolicyManage> timeList = entry.getValue(); |
||||
int startHour = 0; |
||||
int startMinute = 0; |
||||
int endHour = 0; |
||||
int endMinute = 0; |
||||
for (PolicyManage policyManage : timeList) { |
||||
if (policyManage.getPointName().equals("开_时")) { |
||||
startHour = collectionParamsManageService.selectCollectionParamsManageById(policyManage.getCpmId()).getCurValue().intValue(); |
||||
} |
||||
if (policyManage.getPointName().equals("开_分")) { |
||||
startMinute = collectionParamsManageService.selectCollectionParamsManageById(policyManage.getCpmId()).getCurValue().intValue(); |
||||
} |
||||
if (policyManage.getPointName().equals("关_时")) { |
||||
endHour = collectionParamsManageService.selectCollectionParamsManageById(policyManage.getCpmId()).getCurValue().intValue(); |
||||
} |
||||
if (policyManage.getPointName().equals("关_分")) { |
||||
endMinute = collectionParamsManageService.selectCollectionParamsManageById(policyManage.getCpmId()).getCurValue().intValue(); |
||||
} |
||||
} |
||||
LocalTime nowTime = LocalTime.now(); |
||||
LocalTime startTime = LocalTime.of(startHour, startMinute); |
||||
LocalTime endTime = LocalTime.of(endHour, endMinute); |
||||
// collectionParamsManages过滤出能够开启风机的点位,paramType=2,isUse=0
|
||||
Set<String> validCpmIds = cpmSpaceRelationList.stream() |
||||
.filter(item -> item.getHouseId().equals(houseId)) |
||||
.map(CpmSpaceRelation::getCpmId) |
||||
.collect(Collectors.toSet()); |
||||
List<CollectionParamsManage> startDataList = startDeviceList |
||||
.stream() |
||||
.filter(item -> validCpmIds.contains(item.getId())) |
||||
.toList(); |
||||
// 判断当前风机是否在开启状态了
|
||||
if (null == startDataList || startDataList.size() == 0) { |
||||
return; |
||||
} |
||||
CollectionParamsManage first = startDataList.getFirst(); |
||||
// 判断当前时间是否在开启时间范围内
|
||||
if (DateUtils.isBetween(nowTime, startTime, endTime)) { |
||||
// 判断当前风机是否在开启状态了
|
||||
if (first.getCurValue().intValue() == 1) { |
||||
// 当前风机在开启状态,不需要启动
|
||||
log.info("当前风机在开启状态,不需要启动"); |
||||
} else { |
||||
// 当前风机不在开启状态,需要启动
|
||||
log.info("当前风机不在开启状态,需要启动"); |
||||
List<OrderEntity> changeValues = new ArrayList<>(); |
||||
changeValues.add(new OrderEntity(first.getId(), "1", Integer.parseInt(first.getParamType()), first.getOtherName())); |
||||
sendOrderToMqtt(changeValues); |
||||
} |
||||
; |
||||
} else { |
||||
// 判断当前风机是否在关闭状态了
|
||||
if (first.getCurValue().intValue() == 0) { |
||||
// 当前风机在关闭状态,不需要停止
|
||||
log.info("当前风机在关闭状态,不需要停止"); |
||||
} else { |
||||
// 当前风机不在关闭状态,需要停止
|
||||
log.info("当前风机不在关闭状态,需要停止"); |
||||
List<OrderEntity> changeValues = new ArrayList<>(); |
||||
changeValues.add(new OrderEntity(first.getId(), "0", Integer.parseInt(first.getParamType()), first.getOtherName())); |
||||
sendOrderToMqtt(changeValues); |
||||
} |
||||
} |
||||
} |
||||
} |
||||
} |
@ -0,0 +1,84 @@
|
||||
package com.mh.quartz.util; |
||||
|
||||
import com.mh.quartz.domain.FuzzyLevel; |
||||
|
||||
/** |
||||
* @author LJF |
||||
* @version 1.0 |
||||
* @project EEMCS |
||||
* @description 风柜系统PID调节工具类 |
||||
* @date 2025-05-30 13:47:40 |
||||
*/ |
||||
public class AHUPIDControlUtil { |
||||
|
||||
private double kp = 2.0, ki = 0.1, kd = 0.0; |
||||
private double integral = 0; |
||||
private double previousError = 0; |
||||
private static final double MAX_INTEGRAL = 100; |
||||
private static final double MIN_INTEGRAL = -100; |
||||
|
||||
// 模糊增益修正比例
|
||||
private double deltaKpScale = 0.5; |
||||
private double deltaKiScale = 0.01; |
||||
private double deltaKdScale = 0.01; |
||||
|
||||
public double compute(double setTemp, double currentTemp, double deltaTime) { |
||||
double error = currentTemp - setTemp; |
||||
double dError = (error - previousError) / deltaTime; |
||||
|
||||
// 模糊映射
|
||||
FuzzyLevel eLevel = toFuzzyLevel(error); |
||||
FuzzyLevel ecLevel = toFuzzyLevel(dError); |
||||
|
||||
// 获取PID参数调整
|
||||
FuzzyLevel kpAdjust = FuzzyRuleBase.getKpAdjust(eLevel, ecLevel); |
||||
FuzzyLevel kiAdjust = kpAdjust; // 简化处理
|
||||
FuzzyLevel kdAdjust = kpAdjust; |
||||
|
||||
kp += fuzzyDeltaToValue(kpAdjust, deltaKpScale); |
||||
ki += fuzzyDeltaToValue(kiAdjust, deltaKiScale); |
||||
kd += fuzzyDeltaToValue(kdAdjust, deltaKdScale); |
||||
|
||||
// 限幅
|
||||
kp = Math.max(0, Math.min(kp, 10)); |
||||
ki = Math.max(0, Math.min(ki, 1)); |
||||
kd = Math.max(0, Math.min(kd, 1)); |
||||
|
||||
// PID 计算
|
||||
// 积分项限幅
|
||||
integral += error * deltaTime; |
||||
integral = Math.max(MIN_INTEGRAL, Math.min(MAX_INTEGRAL, integral)); |
||||
double output = kp * error + ki * integral + kd * dError; |
||||
System.out.println("计算输出值:" + output + ",误差:" + error + ",误差变化:" + dError); |
||||
previousError = error; |
||||
|
||||
// 输出冷冻水阀开度,限制在0~100%
|
||||
return Math.max(0, Math.min(100, Math.abs(output))); |
||||
} |
||||
|
||||
// 将数值误差映射为模糊等级
|
||||
public static FuzzyLevel toFuzzyLevel(double value) { |
||||
if (value <= -3) return FuzzyLevel.NB; |
||||
else if (value <= -2) return FuzzyLevel.NM; |
||||
else if (value <= -1) return FuzzyLevel.NS; |
||||
else if (value <= 1) return FuzzyLevel.ZO; |
||||
else if (value <= 2) return FuzzyLevel.PS; |
||||
else if (value <= 3) return FuzzyLevel.PM; |
||||
else return FuzzyLevel.PB; |
||||
} |
||||
|
||||
// 将模糊等级转为实际数值调整量
|
||||
public static double fuzzyDeltaToValue(FuzzyLevel level, double scale) { |
||||
switch (level) { |
||||
case NB: return -3 * scale; |
||||
case NM: return -2 * scale; |
||||
case NS: return -1 * scale; |
||||
case ZO: return 0; |
||||
case PS: return 1 * scale; |
||||
case PM: return 2 * scale; |
||||
case PB: return 3 * scale; |
||||
default: return 0; |
||||
} |
||||
} |
||||
|
||||
} |
@ -0,0 +1,152 @@
|
||||
package com.mh.quartz.util; |
||||
|
||||
/** |
||||
* @author LJF |
||||
* @version 1.0 |
||||
* @project EEMCS |
||||
* @description 模糊PID控制算法 |
||||
* @date 2025-06-04 09:47:16 |
||||
*/ |
||||
public class FuzzyPIDControlUtil { |
||||
// PID参数
|
||||
private final double kp; // 比例增益
|
||||
private final double ki; // 积分增益
|
||||
private final double kd; // 微分增益
|
||||
|
||||
// 控制器状态
|
||||
private double prevError; |
||||
private double integral; |
||||
private double derivative; |
||||
|
||||
// 模糊规则参数
|
||||
private final double[] errorLevels = {-6, -3, -1, 0, 1, 3, 6}; // 温度误差级别(℃)
|
||||
private final double[] dErrorLevels = {-3, -1, 0, 1, 3}; // 误差变化率级别(℃/min)
|
||||
private final double[] kpAdjust = {1.5, 2.0, 2.5, 3.0, 4.0}; // Kp调整因子 (增强)
|
||||
private final double[] kiAdjust = {0.3, 0.7, 1.0, 1.3, 1.7}; // Ki调整因子
|
||||
|
||||
// 阀门限制
|
||||
private static final double MIN_VALVE = 0.0; // 最小开度(0%)
|
||||
private static final double MAX_VALVE = 100.0; // 最大开度(100%)
|
||||
|
||||
public FuzzyPIDControlUtil(String kp, String ki, String kd) { |
||||
this.kp = Double.parseDouble(kp); |
||||
this.ki = Double.parseDouble(ki); |
||||
this.kd = Double.parseDouble(kd); |
||||
this.prevError = 0; |
||||
this.integral = 0; |
||||
this.derivative = 0; |
||||
} |
||||
|
||||
// 模糊推理计算PID参数调整因子
|
||||
private double[] fuzzyInference(double error, double dError) { |
||||
// 模糊化:计算误差和误差变化率的隶属度
|
||||
double[] errorMembership = calculateMembership(error, errorLevels); |
||||
double[] dErrorMembership = calculateMembership(dError, dErrorLevels); |
||||
|
||||
// 模糊规则库
|
||||
double kpAdjustSum = 0.0; |
||||
double kiAdjustSum = 0.0; |
||||
double weightSum = 0.0; |
||||
|
||||
// 应用模糊规则 (增强大误差时的响应)
|
||||
for (int i = 0; i < errorMembership.length; i++) { |
||||
for (int j = 0; j < dErrorMembership.length; j++) { |
||||
double weight = errorMembership[i] * dErrorMembership[j]; |
||||
if (weight > 0) { |
||||
// 增强大误差时的响应
|
||||
int kpIndex; |
||||
if (Math.abs(error) > 3) { // 大误差
|
||||
kpIndex = Math.min(Math.max(i + j, 0), kpAdjust.length - 1); |
||||
} else { |
||||
kpIndex = Math.min(Math.max(i + j - 1, 0), kpAdjust.length - 1); |
||||
} |
||||
|
||||
// Ki调整:小误差时增强积分作用
|
||||
int kiIndex; |
||||
if (Math.abs(error) < 1) { // 小误差
|
||||
kiIndex = Math.min(Math.max(3 + j, 0), kiAdjust.length - 1); |
||||
} else { |
||||
kiIndex = Math.min(Math.max(2 + j, 0), kiAdjust.length - 1); |
||||
} |
||||
|
||||
kpAdjustSum += weight * kpAdjust[kpIndex]; |
||||
kiAdjustSum += weight * kiAdjust[kiIndex]; |
||||
weightSum += weight; |
||||
} |
||||
} |
||||
} |
||||
|
||||
// 反模糊化 (加权平均)
|
||||
double kpFactor = weightSum > 0 ? kpAdjustSum / weightSum : 1.0; |
||||
double kiFactor = weightSum > 0 ? kiAdjustSum / weightSum : 1.0; |
||||
|
||||
return new double[]{kpFactor, kiFactor, 1.0}; // Kd不调整
|
||||
} |
||||
|
||||
// 计算隶属度 (三角隶属函数)
|
||||
private double[] calculateMembership(double value, double[] levels) { |
||||
double[] membership = new double[levels.length]; |
||||
|
||||
for (int i = 0; i < levels.length; i++) { |
||||
if (i == 0) { |
||||
membership[i] = (value <= levels[i]) ? 1.0 : |
||||
(value < levels[i+1]) ? (levels[i+1] - value) / (levels[i+1] - levels[i]) : 0.0; |
||||
} else if (i == levels.length - 1) { |
||||
membership[i] = (value >= levels[i]) ? 1.0 : |
||||
(value > levels[i-1]) ? (value - levels[i-1]) / (levels[i] - levels[i-1]) : 0.0; |
||||
} else { |
||||
if (value >= levels[i-1] && value <= levels[i]) { |
||||
membership[i] = (value - levels[i-1]) / (levels[i] - levels[i-1]); |
||||
} else if (value >= levels[i] && value <= levels[i+1]) { |
||||
membership[i] = (levels[i+1] - value) / (levels[i+1] - levels[i]); |
||||
} else { |
||||
membership[i] = 0.0; |
||||
} |
||||
} |
||||
} |
||||
|
||||
return membership; |
||||
} |
||||
|
||||
// 计算控制输出 (阀门开度) - 修复了符号问题
|
||||
public double calculate(double setpoint, double currentValue, double dt) { |
||||
// 计算误差项 - 修复:当前值高于设定值需要冷却,误差应为正
|
||||
double error = currentValue - setpoint; |
||||
|
||||
// 计算微分项 (基于误差变化率)
|
||||
if (dt > 0) { |
||||
derivative = (error - prevError) / dt; |
||||
} |
||||
|
||||
// 模糊调整PID参数
|
||||
double[] adjustments = fuzzyInference(error, derivative); |
||||
double adjKp = kp * adjustments[0]; |
||||
double adjKi = ki * adjustments[1]; |
||||
double adjKd = kd * adjustments[2]; |
||||
|
||||
// 计算积分项 (带抗饱和)
|
||||
integral += error * dt; |
||||
|
||||
// 抗饱和限制
|
||||
double maxIntegral = MAX_VALVE / (adjKi + 1e-5); |
||||
if (Math.abs(integral) > maxIntegral) { |
||||
integral = Math.signum(integral) * maxIntegral; |
||||
} |
||||
|
||||
// PID计算 - 修复:误差为正时需要正输出打开阀门
|
||||
double output = adjKp * error + adjKi * integral + adjKd * derivative; |
||||
|
||||
// 保存误差用于下次计算
|
||||
prevError = error; |
||||
|
||||
// 阀门开度限制
|
||||
return Math.min(Math.max(output, MIN_VALVE), MAX_VALVE); |
||||
} |
||||
|
||||
// 重置控制器状态
|
||||
public void reset() { |
||||
integral = 0; |
||||
prevError = 0; |
||||
derivative = 0; |
||||
} |
||||
} |
@ -0,0 +1,29 @@
|
||||
package com.mh.quartz.util; |
||||
|
||||
import com.mh.quartz.domain.FuzzyLevel; |
||||
|
||||
/** |
||||
* @Classname FuzzyRuleBase |
||||
* Todo: |
||||
* @Date 2025-05-31 14:20 |
||||
* @Created by LJF |
||||
*/ |
||||
public class FuzzyRuleBase { |
||||
|
||||
// ΔKp 规则:根据误差 e 与误差变化 ec 得到修正量
|
||||
private static final FuzzyLevel[][] kpRuleTable = { |
||||
{FuzzyLevel.PB, FuzzyLevel.PM, FuzzyLevel.PS, FuzzyLevel.ZO, FuzzyLevel.NS, FuzzyLevel.NM, FuzzyLevel.NB}, |
||||
{FuzzyLevel.PM, FuzzyLevel.PS, FuzzyLevel.ZO, FuzzyLevel.NS, FuzzyLevel.NM, FuzzyLevel.NB, FuzzyLevel.NB}, |
||||
{FuzzyLevel.PS, FuzzyLevel.ZO, FuzzyLevel.NS, FuzzyLevel.NM, FuzzyLevel.NB, FuzzyLevel.NB, FuzzyLevel.NB}, |
||||
{FuzzyLevel.ZO, FuzzyLevel.ZO, FuzzyLevel.ZO, FuzzyLevel.ZO, FuzzyLevel.ZO, FuzzyLevel.ZO, FuzzyLevel.ZO}, |
||||
{FuzzyLevel.NS, FuzzyLevel.ZO, FuzzyLevel.PS, FuzzyLevel.PM, FuzzyLevel.PB, FuzzyLevel.PB, FuzzyLevel.PB}, |
||||
{FuzzyLevel.NM, FuzzyLevel.NS, FuzzyLevel.ZO, FuzzyLevel.PS, FuzzyLevel.PM, FuzzyLevel.PB, FuzzyLevel.PB}, |
||||
{FuzzyLevel.NB, FuzzyLevel.NB, FuzzyLevel.NS, FuzzyLevel.ZO, FuzzyLevel.PS, FuzzyLevel.PM, FuzzyLevel.PB} |
||||
}; |
||||
|
||||
public static FuzzyLevel getKpAdjust(FuzzyLevel e, FuzzyLevel ec) { |
||||
return kpRuleTable[e.ordinal()][ec.ordinal()]; |
||||
} |
||||
|
||||
// 可扩展为不同规则表:getKiAdjust(), getKdAdjust()
|
||||
} |
Loading…
Reference in new issue