From 36d60e0b798dcac05bf68421157e6f75ff837b61 Mon Sep 17 00:00:00 2001 From: mh Date: Fri, 6 Jun 2025 17:35:31 +0800 Subject: [PATCH] =?UTF-8?q?1=E3=80=81=E6=B7=BB=E5=8A=A0=E9=80=9A=E8=BF=87D?= =?UTF-8?q?TU-4G=EF=BC=8Cnetty=E6=96=B9=E5=BC=8F=E8=AE=BE=E7=BD=AE?= =?UTF-8?q?=E7=83=AD=E6=B3=B5=E4=BF=A1=E6=81=AF=EF=BC=9B=202=E3=80=81PID?= =?UTF-8?q?=E6=8E=A7=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../device/ChillersParamsController.java | 16 +- .../device/OperationController.java | 26 ++- mh-common/pom.xml | 6 + .../com/mh/common/core/redis/RedisLock.java | 63 ++++++++ .../java/com/mh/common/enums/ComputeEnum.java | 28 ++++ .../java/com/mh/common/utils/ModbusUtils.java | 47 ++++++ .../java/com/mh/common/utils/NettyTools.java | 77 +++++++++ mh-framework/pom.xml | 7 + .../aspectj/ControlDeviceAspect.java | 2 + .../mh/framework/netty/EchoServerHandler.java | 92 +++++++++-- .../com/mh/framework/netty/INettyService.java | 18 +++ .../mh/framework/netty/NettyServiceImpl.java | 119 ++++++++++++++ .../netty/session/ServerSession.java | 66 ++++++++ .../framework/netty/session/SessionMap.java | 96 +++++++++++ .../mh/framework/netty/task/CallbackTask.java | 20 +++ .../netty/task/CallbackTaskScheduler.java | 78 +++++++++ .../mh/framework/netty/task/ExecuteTask.java | 6 + .../netty/task/FutureTaskScheduler.java | 67 ++++++++ .../main/java/com/mh/quartz/task/AHUTask.java | 7 +- .../java/com/mh/quartz/task/DealDataTask.java | 3 + .../com/mh/quartz/util/AHUPIDControlUtil.java | 3 +- .../mh/quartz/util/FuzzyPIDControlUtil.java | 152 ++++++++++++++++++ .../CollectionParamsManageServiceImpl.java | 2 +- .../operation/IOperationDeviceService.java | 3 +- .../impl/OperationDeviceServiceImpl.java | 25 +++ 25 files changed, 999 insertions(+), 30 deletions(-) create mode 100644 mh-common/src/main/java/com/mh/common/core/redis/RedisLock.java create mode 100644 mh-common/src/main/java/com/mh/common/utils/ModbusUtils.java create mode 100644 mh-common/src/main/java/com/mh/common/utils/NettyTools.java create mode 100644 mh-framework/src/main/java/com/mh/framework/netty/INettyService.java create mode 100644 mh-framework/src/main/java/com/mh/framework/netty/NettyServiceImpl.java create mode 100644 mh-framework/src/main/java/com/mh/framework/netty/session/ServerSession.java create mode 100644 mh-framework/src/main/java/com/mh/framework/netty/session/SessionMap.java create mode 100644 mh-framework/src/main/java/com/mh/framework/netty/task/CallbackTask.java create mode 100644 mh-framework/src/main/java/com/mh/framework/netty/task/CallbackTaskScheduler.java create mode 100644 mh-framework/src/main/java/com/mh/framework/netty/task/ExecuteTask.java create mode 100644 mh-framework/src/main/java/com/mh/framework/netty/task/FutureTaskScheduler.java create mode 100644 mh-quartz/src/main/java/com/mh/quartz/util/FuzzyPIDControlUtil.java diff --git a/mh-admin/src/main/java/com/mh/web/controller/device/ChillersParamsController.java b/mh-admin/src/main/java/com/mh/web/controller/device/ChillersParamsController.java index b4f0a95..3ceb80a 100644 --- a/mh-admin/src/main/java/com/mh/web/controller/device/ChillersParamsController.java +++ b/mh-admin/src/main/java/com/mh/web/controller/device/ChillersParamsController.java @@ -18,6 +18,7 @@ import com.mh.system.service.device.ICommunicationParamsService; import com.mh.system.service.energy.IEnergyService; import org.springframework.web.bind.annotation.*; +import java.math.BigDecimal; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -51,6 +52,7 @@ public class ChillersParamsController extends BaseController { */ @GetMapping("/list") public TableDataInfo list(CollectionParamsManage collectionParamsManage) { + collectionParamsManage.setIsUse(0); List list = iCollectionParamsManageService.selectCollectionParamsManageList(collectionParamsManage); // list中的CollectionParamsManage中的other_name去掉“号主机”之前数据 list.forEach(item -> { @@ -71,27 +73,27 @@ public class ChillersParamsController extends BaseController { switch (vo.getParamType()) { case "1": // 运行状态 if (!StringUtils.isEmpty(vo.getCurValue())) { - vo.setCurValue(vo.getCurValue().equals("1") ? "运行" : "停止"); + vo.setCurValue(new BigDecimal(vo.getCurValue()).intValue() == 1 ? "运行" : "停止"); } break; case "2": // 启停 if (!StringUtils.isEmpty(vo.getCurValue())) { - vo.setCurValue(vo.getCurValue().equals("1") ? "启动" : "停止"); + vo.setCurValue(new BigDecimal(vo.getCurValue()).intValue() == 1 ? "启动" : "停止"); } break; case "5": // 故障 if (!StringUtils.isEmpty(vo.getCurValue())) { - vo.setCurValue(vo.getCurValue().equals("1") ? "故障" : "正常"); + vo.setCurValue(new BigDecimal(vo.getCurValue()).intValue() == 1 ? "故障" : "正常"); } break; case "6": // 手自动切换 if (!StringUtils.isEmpty(vo.getCurValue())) { - vo.setCurValue(vo.getCurValue().equals("1") ? "手动" : "自动"); + vo.setCurValue(new BigDecimal(vo.getCurValue()).intValue() == 1 ? "手动" : "自动"); } break; case "22": // 本地远程切换 if (!StringUtils.isEmpty(vo.getCurValue())) { - vo.setCurValue(vo.getCurValue().equals("1") ? "远程" : "本地"); + vo.setCurValue(new BigDecimal(vo.getCurValue()).intValue() == 1 ? "远程" : "本地"); } break; default: @@ -113,8 +115,8 @@ public class ChillersParamsController extends BaseController { return map; }).collect(Collectors.toList()); result.sort((map1, map2) -> { - Integer mtType1 = Integer.parseInt((String)map1.get("mtType")); - Integer mtType2 = Integer.parseInt((String)map2.get("mtType")); + Integer mtType1 = Integer.parseInt((String) map1.get("mtType")); + Integer mtType2 = Integer.parseInt((String) map2.get("mtType")); return mtType1.compareTo(mtType2); // 升序 }); return getDataTable(result); diff --git a/mh-admin/src/main/java/com/mh/web/controller/device/OperationController.java b/mh-admin/src/main/java/com/mh/web/controller/device/OperationController.java index a902d1a..63cdbf0 100644 --- a/mh-admin/src/main/java/com/mh/web/controller/device/OperationController.java +++ b/mh-admin/src/main/java/com/mh/web/controller/device/OperationController.java @@ -11,6 +11,7 @@ import com.mh.common.core.domain.vo.DeviceOperateMonitorVO; import com.mh.common.core.page.TableDataInfo; import com.mh.common.enums.BusinessType; import com.mh.framework.mqtt.service.IMqttGatewayService; +import com.mh.framework.netty.INettyService; import com.mh.system.service.device.ICollectionParamsManageService; import com.mh.system.service.operation.IOperationDeviceService; import lombok.extern.slf4j.Slf4j; @@ -45,13 +46,16 @@ public class OperationController extends BaseController { private final IMqttGatewayService iMqttGatewayService; + private final INettyService nettyService; + @Autowired public OperationController(ICollectionParamsManageService iCollectionParamsManageService, IOperationDeviceService iOperationService, - IMqttGatewayService iMqttGatewayService) { + IMqttGatewayService iMqttGatewayService, INettyService nettyService) { this.iCollectionParamsManageService = iCollectionParamsManageService; this.iOperationService = iOperationService; this.iMqttGatewayService = iMqttGatewayService; + this.nettyService = nettyService; } /** @@ -95,11 +99,21 @@ public class OperationController extends BaseController { @ControlDeviceAno(value = "设备操作") public AjaxResult operationDevice(@RequestBody List changeValues) { try { - String sendOrder = iOperationService.operationDevice(changeValues); - String name = mhConfig.getName(); - // 获取mqtt操作队列(后期通过mqtt队列配置发送主题) - log.info("发送主题:{},消息:{}", name + "/"+ controlTopic, sendOrder); - iMqttGatewayService.publish(name + "/"+ controlTopic, sendOrder, 1); + // 判断id是否是DTU设备类型 + if (!iOperationService.isAdvanTech(changeValues)) { + String sendOrder = iOperationService.operationDevice(changeValues); + String name = mhConfig.getName(); + // 获取mqtt操作队列(后期通过mqtt队列配置发送主题) + log.info("发送主题:{},消息:{}", name + "/" + controlTopic, sendOrder); + iMqttGatewayService.publish(name + "/" + controlTopic, sendOrder, 1); + } else { + // 目前只有DTU设备需要发送4G指令 + if (nettyService.sendOrder(changeValues)) { + return AjaxResult.success(); + } else { + return AjaxResult.error(); + } + } } catch (Exception e) { log.error("设备操作失败", e); return AjaxResult.error(); diff --git a/mh-common/pom.xml b/mh-common/pom.xml index 0947baa..65a35a3 100644 --- a/mh-common/pom.xml +++ b/mh-common/pom.xml @@ -159,6 +159,12 @@ org.projectlombok lombok + + com.google.guava + guava + 33.4.8-jre + compile + diff --git a/mh-common/src/main/java/com/mh/common/core/redis/RedisLock.java b/mh-common/src/main/java/com/mh/common/core/redis/RedisLock.java new file mode 100644 index 0000000..8b5e17e --- /dev/null +++ b/mh-common/src/main/java/com/mh/common/core/redis/RedisLock.java @@ -0,0 +1,63 @@ +package com.mh.common.core.redis; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.data.redis.core.script.RedisScript; +import org.springframework.stereotype.Component; + +import java.util.Collections; +import java.util.concurrent.TimeUnit; + +/** + * @author LJF + * @version 1.0 + * @project EEMCS + * @description 锁 + * @date 2025-06-06 16:08:13 + */ +@Slf4j +@Component +public class RedisLock { + + private final StringRedisTemplate redisTemplate; + + public RedisLock(StringRedisTemplate redisTemplate) { + this.redisTemplate = redisTemplate; + } + + /** + * 获取锁 + */ + public boolean lock(String key, String requestId, long expireTimeInSeconds) { + Boolean success = redisTemplate.opsForValue().setIfAbsent(key, requestId, expireTimeInSeconds, TimeUnit.SECONDS); + return Boolean.TRUE.equals(success); + } + + /** + * 尝试获取锁(带超时) + */ + public boolean tryLock(String key, String requestId, long expireTime, long timeoutMs) throws InterruptedException { + long startTime = System.currentTimeMillis(); + while (System.currentTimeMillis() - startTime < timeoutMs) { + if (lock(key, requestId, expireTime)) { + return true; + } + Thread.sleep(50); + } + return false; + } + + /** + * 释放锁(使用 Lua 脚本保证原子性) + */ + public void unlock(String key, String requestId) { + String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; + RedisScript redisScript = RedisScript.of(script, Long.class); + + Long result = redisTemplate.execute(redisScript, Collections.singletonList(key), requestId); + + if (result == null || result == 0) { + log.warn("释放锁失败,可能已被其他线程释放 key={}", key); + } + } +} diff --git a/mh-common/src/main/java/com/mh/common/enums/ComputeEnum.java b/mh-common/src/main/java/com/mh/common/enums/ComputeEnum.java index 1dc4437..2e22262 100644 --- a/mh-common/src/main/java/com/mh/common/enums/ComputeEnum.java +++ b/mh-common/src/main/java/com/mh/common/enums/ComputeEnum.java @@ -11,6 +11,34 @@ import java.util.*; */ public enum ComputeEnum implements ComputeService { + /** + * 水表 + */ + WATER("水表设备数据处理", 23) { + @Override + public ArrayList> getDataList( + Map.Entry>> entry) { + + ArrayList> result = new ArrayList<>(); + + //获取到电表的数据,按照表号分组分组,紧接着再按照小时分组。需要计算分组后的数据取出最大值 + Map> deviceMap = entry.getValue(); + String deviceNum = entry.getKey(); + Set>> groupEntryList = deviceMap.entrySet(); + for (Map.Entry> listEntry : groupEntryList) { + LocalDateTime key = listEntry.getKey(); + List value = listEntry.getValue(); + DeviceReport maxEntity = value.stream() + .max(Comparator.comparing(obj -> Double.valueOf(obj.getCurValue()))) + .orElse(null); + HashMap map = new HashMap<>(); + map.put(key, maxEntity); + result.add(map); + } + + return result; + } + }, /** * 电表 */ diff --git a/mh-common/src/main/java/com/mh/common/utils/ModbusUtils.java b/mh-common/src/main/java/com/mh/common/utils/ModbusUtils.java new file mode 100644 index 0000000..36a3637 --- /dev/null +++ b/mh-common/src/main/java/com/mh/common/utils/ModbusUtils.java @@ -0,0 +1,47 @@ +package com.mh.common.utils; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; + +/** + * @author LJF + * @version 1.0 + * @project EEMCS + * @description Modbus协议工具类 + * @date 2025-06-06 14:40:24 + */ +public class ModbusUtils { + public static String createControlCode(String mtCode, Integer type, String registerAddr, String param) { + String orderStr; + mtCode = ExchangeStringUtil.addZeroForNum(mtCode, 2); + registerAddr = ExchangeStringUtil.addZeroForNum(registerAddr, 4); + param = ExchangeStringUtil.addZeroForNum(ExchangeStringUtil.decToHex(param), 4); + orderStr = mtCode + "06" + registerAddr + param; + byte[] strOrder = ExchangeStringUtil.hexStrToBinaryStr(orderStr); + int checkNum = CRC16.CRC16_MODBUS(strOrder); + String checkWord = ExchangeStringUtil.decToHex(String.valueOf(checkNum)); + checkWord = checkWord.substring(2, 4) + checkWord.substring(0, 2); + return orderStr + checkWord; + } + + public static ByteBuf getByteBuf(ChannelHandlerContext ctx, String sendStr) { + // byte类型的数据 +// String sendStr = "5803004900021914"; // 冷量计 + // 申请一个数据结构存储信息 + ByteBuf buffer = ctx.alloc().buffer(); + // 将信息放入数据结构中 + buffer.writeBytes(ExchangeStringUtil.hexStrToBinaryStr(sendStr));//对接需要16进制 + return buffer; + } + + public static ByteBuf createByteBuf(String sendStr) { + // byte类型的数据 +// String sendStr = "5803004900021914"; // 冷量计 + // 申请一个数据结构存储信息 + ByteBuf buffer = Unpooled.buffer(); + // 将信息放入数据结构中 + buffer.writeBytes(ExchangeStringUtil.hexStrToBinaryStr(sendStr));//对接需要16进制 + return buffer; + } +} diff --git a/mh-common/src/main/java/com/mh/common/utils/NettyTools.java b/mh-common/src/main/java/com/mh/common/utils/NettyTools.java new file mode 100644 index 0000000..123a574 --- /dev/null +++ b/mh-common/src/main/java/com/mh/common/utils/NettyTools.java @@ -0,0 +1,77 @@ +package com.mh.common.utils; + + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; + +import java.util.Objects; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +/** + * @author LJF + * @version 1.0 + * @project TAD_Server + * @description 缓存等待数据 + * @date 2023/7/4 08:45:16 + */ +@Slf4j +public class NettyTools { + + /** + * 响应消息缓存 + */ + private static final Cache> responseMsgCache = CacheBuilder.newBuilder() + .maximumSize(500) + .expireAfterWrite(1000, TimeUnit.SECONDS) + .build(); + + + /** + * 等待响应消息 + * @param key 消息唯一标识 + * @return ReceiveDdcMsgVo + */ + public static boolean waitReceiveMsg(String key) { + + try { + //设置超时时间 + String vo = Objects.requireNonNull(responseMsgCache.getIfPresent(key)) + .poll(1000 * 10, TimeUnit.MILLISECONDS); + + //删除key + responseMsgCache.invalidate(key); + return StringUtils.isNotBlank(vo); + } catch (Exception e) { + log.error("获取数据异常,sn={},msg=null",key); + return false; + } + + } + + /** + * 初始化响应消息的队列 + * @param key 消息唯一标识 + */ + public static void initReceiveMsg(String key) { + responseMsgCache.put(key,new LinkedBlockingQueue(1)); + } + + /** + * 设置响应消息 + * @param key 消息唯一标识 + */ + public static void setReceiveMsg(String key, String msg) { + + if(responseMsgCache.getIfPresent(key) != null){ + responseMsgCache.getIfPresent(key).add(msg); + return; + } + + log.warn("sn {}不存在",key); + } + +} diff --git a/mh-framework/pom.xml b/mh-framework/pom.xml index 0527bf9..19073e1 100644 --- a/mh-framework/pom.xml +++ b/mh-framework/pom.xml @@ -64,6 +64,13 @@ spring-boot-starter-amqp + + + com.google.guava + guava + 33.4.8-jre + + \ No newline at end of file diff --git a/mh-framework/src/main/java/com/mh/framework/aspectj/ControlDeviceAspect.java b/mh-framework/src/main/java/com/mh/framework/aspectj/ControlDeviceAspect.java index 03be5ab..21cc568 100644 --- a/mh-framework/src/main/java/com/mh/framework/aspectj/ControlDeviceAspect.java +++ b/mh-framework/src/main/java/com/mh/framework/aspectj/ControlDeviceAspect.java @@ -96,6 +96,8 @@ public class ControlDeviceAspect { } else if (null != orderEntity.getType() && orderEntity.getType() == 3) { // 修改手动获取全自动 deviceControlLog.setControlContent(orderEntity.getParam().equals("0")?"关闭全自动":"开启全自动"); + } else { + deviceControlLog.setControlContent(orderEntity.toString()); } deviceControlLog.setCreateUser(SecurityUtils.getUsername()); deviceControlLog.setCreateTime(new Date()); diff --git a/mh-framework/src/main/java/com/mh/framework/netty/EchoServerHandler.java b/mh-framework/src/main/java/com/mh/framework/netty/EchoServerHandler.java index c467ffe..20ca39c 100644 --- a/mh-framework/src/main/java/com/mh/framework/netty/EchoServerHandler.java +++ b/mh-framework/src/main/java/com/mh/framework/netty/EchoServerHandler.java @@ -10,6 +10,11 @@ import com.mh.common.model.request.AdvantechDatas; import com.mh.common.model.request.AdvantechReceiver; import com.mh.common.utils.*; import com.mh.common.utils.spring.SpringUtils; +import com.mh.framework.netty.session.ServerSession; +import com.mh.framework.netty.session.SessionMap; +import com.mh.framework.netty.task.CallbackTask; +//import com.mh.framework.netty.task.CallbackTaskScheduler; +import com.mh.framework.netty.task.CallbackTaskScheduler; import com.mh.framework.rabbitmq.producer.SendMsgByTopic; import com.mh.system.service.device.ICollectionParamsManageService; import com.mh.system.service.device.IGatewayManageService; @@ -33,6 +38,7 @@ public class EchoServerHandler extends ChannelInboundHandlerAdapter { IGatewayManageService gatewayManageService = SpringUtils.getBean(IGatewayManageService.class); ICollectionParamsManageService collectionParamsManageService = SpringUtils.getBean(ICollectionParamsManageService.class); SendMsgByTopic sendMsgByTopic = SpringUtils.getBean(SendMsgByTopic.class); + RedisCache redisCache = SpringUtils.getBean(RedisCache.class); /** * 空闲次数 @@ -121,6 +127,8 @@ public class EchoServerHandler extends ChannelInboundHandlerAdapter { if ((receiveStr.length() == 8) && receiveStr.startsWith("24")) { // if ((receiveStr.length() == 8) && receiveStr.startsWith("C0A801FE")) { log.info("接收到心跳包 ===> {}", receiveStr); + // 开始进行会话保存 + dealSession(ctx); idleCount = 1; port = receiveStr.substring(4, 8);//心跳包包含网关端口(自己定义返回心跳包) // 更新对应的网关在线情况 @@ -151,22 +159,23 @@ public class EchoServerHandler extends ChannelInboundHandlerAdapter { } else { log.info("gateway not find deviceCodeParam!"); } - } else if (receiveStr.length() == 34 || receiveStr.length() == 36 || receiveStr.length() == 40 || receiveStr.length() == 44 || receiveStr.length() == 50) { - //电表返回数据解析 - idleCount = 1; - log.info("电表接收===> {},长度:{}", receiveStr, receiveStr.length()); - //解析采集的报文,并保存到数据库 - nextSendOrder(ctx); } else if (receiveStr.length() == 18) { - //冷量计返回数据解析 + // 水电表、热泵设置返回数据解析 idleCount = 1; log.info("水电表、热泵设置接收==>{},长度:{}", receiveStr, receiveStr.length()); nextSendOrder(ctx); } else if (receiveStr.length() == 12 || receiveStr.length() == 14) { - //冷水机返回数据解析 + // 热泵返回数据解析 idleCount = 1; log.info("热泵读取接收===>{},长度:{}", receiveStr, receiveStr.length()); nextSendOrder(ctx); + } else if (receiveStr.length() == 16) { + // 热泵设置指令返回 + // 判断是否有指令发送 + if (redisCache.hasKey("order_send") && redisCache.getCacheObject("order_send").equals(receiveStr)) { + NettyTools.setReceiveMsg("order_wait", receiveStr); + } + nextSendOrder(ctx); } else if (receiveStr.length() > 50 && receiveStr.length() < 100) { idleCount = 1; // 清空receiveStr @@ -175,9 +184,53 @@ public class EchoServerHandler extends ChannelInboundHandlerAdapter { ctx.flush(); } + private void dealSession(ChannelHandlerContext ctx) { + // 获取表号 + String deviceCode =receiveStr; + String meterNum = deviceCode; + deviceCode = deviceCode + ctx.channel().remoteAddress(); + //新的session的创建 + ServerSession session = new ServerSession(ctx.channel(), deviceCode); + + //进行登录逻辑处理,异步进行处理。并且需要知道 处理的结果。 callbacktask就要 + //派上用场了 + String finalDeviceCode = deviceCode; + CallbackTaskScheduler.add(new CallbackTask() { + @Override + public Boolean execute() throws Exception { + //进行 login 逻辑的处理 + return action(session, finalDeviceCode, ctx); + } + //没有异常的话,我们进行处理 + @Override + public void onBack(Boolean result) { + if(result) { + log.info("设备保存会话: 设备号 = " + session.getSessionId()); + //ctx.pipeline().remove(LoginRequestHandler.class); //压测需要放开 + } else { + log.info("设备刷新会话: 设备号 = " + session.getSessionId()); + SessionMap.inst().updateSession(finalDeviceCode ,session, meterNum); + //log.info("设备登录失败: 设备号 = " + session.getSessionId()); + //ServerSession.closeSession(ctx); + // 假如说已经在会话中了,直接断开连接 + //ctx.close(); + } + } + //有异常的话,我们进行处理 + @Override + public void onException(Throwable t) { + log.info("设备登录异常: 设备号 = " + session.getSessionId()); + ServerSession.closeSession(ctx); + } + }); + } + private void nextSendOrder(ChannelHandlerContext ctx) throws InterruptedException { - // 解析采集的报文,并保存到数据库 - analysisReceiveData(receiveStr, deviceCodeParamList.get(num)); + // 发送指令响应不用解析 + if (receiveStr.length() != 16) { + // 解析采集的报文,并保存到数据库 + analysisReceiveData(receiveStr, deviceCodeParamList.get(num)); + } // 清空receiveStr receiveStr = ""; // 判断发送的下标,如果不等于指令数组大小 @@ -261,4 +314,23 @@ public class EchoServerHandler extends ChannelInboundHandlerAdapter { log.info("客户端断开,执行ctx.close()......"); } + private boolean action(ServerSession session, String deviceCode, ChannelHandlerContext ctx) { + //user验证 + boolean isValidUser = checkUser(deviceCode,session); + session.bind(); + return true; + } + + private boolean checkUser(String deviceCode,ServerSession session) { + //当前用户已经登录 + if(SessionMap.inst().hasLogin(deviceCode)) { + log.info("设备已经登录: 设备号 = " + deviceCode); + return false; + } + //一般情况下,我们会将 user存储到 DB中,然后对user的用户名和密码进行校验 + //但是,我们这边没有进行db的集成,所以我们想一个别的办法进行user的校验。在我们的sessionMap进行以下校验 + //为什么选sessionmap,因为我们user的会话,都是存储到sessionmap中的,sessionmap中只要有这个user的会话,说明就是ok的 + return true; + } + } diff --git a/mh-framework/src/main/java/com/mh/framework/netty/INettyService.java b/mh-framework/src/main/java/com/mh/framework/netty/INettyService.java new file mode 100644 index 0000000..ae9d626 --- /dev/null +++ b/mh-framework/src/main/java/com/mh/framework/netty/INettyService.java @@ -0,0 +1,18 @@ +package com.mh.framework.netty; + +import com.mh.common.core.domain.entity.OrderEntity; + +import java.util.List; + +/** + * @author LJF + * @version 1.0 + * @project EEMCS + * @description netty + * @date 2025-06-06 15:13:06 + */ +public interface INettyService { + + boolean sendOrder(List changeValues); + +} diff --git a/mh-framework/src/main/java/com/mh/framework/netty/NettyServiceImpl.java b/mh-framework/src/main/java/com/mh/framework/netty/NettyServiceImpl.java new file mode 100644 index 0000000..4a92e8e --- /dev/null +++ b/mh-framework/src/main/java/com/mh/framework/netty/NettyServiceImpl.java @@ -0,0 +1,119 @@ +package com.mh.framework.netty; + +import com.mh.common.core.domain.AjaxResult; +import com.mh.common.core.domain.entity.CollectionParamsManage; +import com.mh.common.core.domain.entity.GatewayManage; +import com.mh.common.core.domain.entity.OrderEntity; +import com.mh.common.core.redis.RedisCache; +import com.mh.common.core.redis.RedisLock; +import com.mh.common.utils.ModbusUtils; +import com.mh.common.utils.NettyTools; +import com.mh.common.utils.StringUtils; +import com.mh.framework.netty.session.ServerSession; +import com.mh.framework.netty.session.SessionMap; +import com.mh.system.mapper.device.CollectionParamsManageMapper; +import com.mh.system.mapper.device.GatewayManageMapper; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; + +/** + * @author LJF + * @version 1.0 + * @project EEMCS + * @description netty实现类 + * @date 2025-06-06 15:13:23 + */ +@Slf4j +@Service +public class NettyServiceImpl implements INettyService { + + @Resource + private CollectionParamsManageMapper collectionParamsManageMapper; + + @Resource + private GatewayManageMapper gatewayManageMapper; + + @Resource + private RedisCache redisCache; + + @Resource + private RedisLock redisLock; + + @Override + public boolean sendOrder(List changeValues) { + for (OrderEntity changeValue : changeValues) { + String cpmId = changeValue.getId(); + CollectionParamsManage collectionParamsManage = collectionParamsManageMapper.selectById(cpmId); + if (null == collectionParamsManage) { + return false; + } + GatewayManage gatewayManage = gatewayManageMapper.selectById(collectionParamsManage.getGatewayId()); + if (null == gatewayManage || StringUtils.isEmpty(gatewayManage.getHeartBeat())) { + return false; + } + ConcurrentHashMap map = SessionMap.inst().getMap(); + Set> entries = map.entrySet(); + boolean flag = false; + String keyVal = null; + for (Map.Entry entry : entries) { + String key = entry.getKey(); + if (key.contains(gatewayManage.getHeartBeat())){ + flag = true; + keyVal = key; + break; + } + } + if (flag) { + ServerSession serverSession = map.get(keyVal); + // 目前只有DTU,modbus方式,只创建modbus先 + String controlCode = ModbusUtils.createControlCode(collectionParamsManage.getMtCode(), + changeValue.getType(), + collectionParamsManage.getRegisterAddr(), + changeValue.getParam()); + if (StringUtils.isEmpty(controlCode)) { + log.error("创建控制码失败"); + return false; + } + + String requestId = UUID.randomUUID().toString(); // 唯一标识当前请求 + String lockKey = "lock:order_send:" + gatewayManage.getHeartBeat(); // 按网关分锁 + + try { + if (!redisLock.tryLock(lockKey, requestId, 10, 10)) { + log.warn("获取锁失败,当前操作繁忙"); + return false; + } + // 初始化发送指令 + NettyTools.initReceiveMsg("order_wait"); + // 设置缓存,方便在netty中判断发送的指令 + redisCache.setCacheObject("order_send", controlCode, 10, TimeUnit.SECONDS); + // 发送指令 + serverSession.getChannel().writeAndFlush(ModbusUtils.createByteBuf(controlCode)); + // 等待指令 + if (NettyTools.waitReceiveMsg("order_wait")) { + log.error("发送指令成功,心跳包:{}", gatewayManage.getHeartBeat()); + return true; + } else { + log.error("发送指令异常,心跳包:{}", gatewayManage.getHeartBeat()); + return false; + } + } catch (InterruptedException e) { + log.error("发送指令异常", e); + } finally { + redisLock.unlock(lockKey, requestId); + } + } + log.error("当前设备不在线,心跳包:{}",gatewayManage.getHeartBeat()); + return false; + } + return false; + } +} diff --git a/mh-framework/src/main/java/com/mh/framework/netty/session/ServerSession.java b/mh-framework/src/main/java/com/mh/framework/netty/session/ServerSession.java new file mode 100644 index 0000000..f6b7b63 --- /dev/null +++ b/mh-framework/src/main/java/com/mh/framework/netty/session/ServerSession.java @@ -0,0 +1,66 @@ +package com.mh.framework.netty.session; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.util.AttributeKey; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; + +@Data +@Slf4j +public class ServerSession { + public static final AttributeKey SESSION_KEY = + AttributeKey.valueOf("SESSION_KEY"); + //通道 + private Channel channel; + private final String sessionId; + private boolean isLogin = false; + + public ServerSession(Channel channel, String deviceCode){ + this.channel = channel; + this.sessionId = deviceCode; + } + + //session需要和通道进行一定的关联,他是在构造函数中关联上的; + //session还需要通过sessionkey和channel进行再次的关联;channel.attr方法.set当前的 + // serverSession + //session需要被添加到我们的SessionMap中 + public void bind(){ + log.info("server Session 会话进行绑定 :" + channel.remoteAddress()); + channel.attr(SESSION_KEY).set(this); + SessionMap.inst().addSession(sessionId, this); + this.isLogin = true; + } + + //通过channel获取session + public static ServerSession getSession(ChannelHandlerContext ctx){ + Channel channel = ctx.channel(); + return channel.attr(SESSION_KEY).get(); + } + + //关闭session,新增返回一个meterNum用于纪录设备下线时间2024-05-08 + public static String closeSession(ChannelHandlerContext ctx){ + String meterNum = null; + ServerSession serverSession = ctx.channel().attr(SESSION_KEY).get(); + if(serverSession != null && serverSession.getSessionId() != null) { + ChannelFuture future = serverSession.channel.close(); + future.addListener((ChannelFutureListener) future1 -> { + if(!future1.isSuccess()) { + log.info("Channel close error!"); + } + }); + ctx.close(); + meterNum = serverSession.sessionId; + SessionMap.inst().removeSession(serverSession.sessionId); + log.info(ctx.channel().remoteAddress()+" "+serverSession.sessionId + "==>移除会话"); + } + return meterNum; + } + + //写消息 + public void writeAndFlush(Object msg) { + channel.writeAndFlush(msg); + } +} diff --git a/mh-framework/src/main/java/com/mh/framework/netty/session/SessionMap.java b/mh-framework/src/main/java/com/mh/framework/netty/session/SessionMap.java new file mode 100644 index 0000000..69362f8 --- /dev/null +++ b/mh-framework/src/main/java/com/mh/framework/netty/session/SessionMap.java @@ -0,0 +1,96 @@ +package com.mh.framework.netty.session; + +import lombok.Data; +import lombok.extern.slf4j.Slf4j; + +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +@Data +@Slf4j +public class SessionMap { + + private ThreadLocal sceneThreadLocal = new ThreadLocal<>(); + + //用单例模式进行sessionMap的创建 + private SessionMap(){} + + private static SessionMap singleInstance = new SessionMap(); + + public static SessionMap inst() { + return singleInstance; + } + + //进行会话的保存 + //key 我们使用 sessionId;value 需要是 serverSession + private ConcurrentHashMap map = new ConcurrentHashMap<>(256); + //添加session + public void addSession(String sessionId, ServerSession s) { + map.put(sessionId, s); + log.info("IP地址:"+s.getChannel().remoteAddress()+" "+ sessionId + " 表具上线,总共表具:" + map.size()); + } + + //删除session + public void removeSession(String sessionId) { + if(map.containsKey(sessionId)) { + ServerSession s = map.get(sessionId); + map.remove(sessionId); + log.info("设备id下线:{},在线设备:{}", s.getSessionId(), map.size() ); + } + return; + } + + public boolean hasLogin(String sessionId) { + Iterator> iterator = map.entrySet().iterator(); + while(iterator.hasNext()) { + Map.Entry next = iterator.next(); + if(sessionId != null && sessionId.equalsIgnoreCase(next.getValue().getSessionId())) { + return true ; + } + } + return false; + } + + //如果在线,肯定有sessionMap里保存的 serverSession + //如果不在线,serverSession也没有。用这个来判断是否在线 + public List getSessionBy(String sessionId) { + return map.values().stream(). + filter(s -> s.getSessionId().equals(sessionId)). + collect(Collectors.toList()); + } + + public boolean getScene() { + return sceneThreadLocal.get(); + } + + public void initScene(Boolean status) { + if (sceneThreadLocal == null) { + log.info("======创建ThreadLocal======"); + sceneThreadLocal = new ThreadLocal<>(); + } + log.info("设置状态==>" + status); + sceneThreadLocal.set(status); + } + + public void clearScene() { + initScene(null); + sceneThreadLocal.remove(); + } + + public void updateSession(String sessionId, ServerSession session, String meterNum) { + Iterator> iterator = map.entrySet().iterator(); + while(iterator.hasNext()) { + Map.Entry next = iterator.next(); + if (next.getKey().contains(meterNum)){ + iterator.remove(); + } + if(sessionId != null && sessionId.equalsIgnoreCase(next.getValue().getSessionId())) { + next.setValue(session); + } + } + } + +} diff --git a/mh-framework/src/main/java/com/mh/framework/netty/task/CallbackTask.java b/mh-framework/src/main/java/com/mh/framework/netty/task/CallbackTask.java new file mode 100644 index 0000000..37f7542 --- /dev/null +++ b/mh-framework/src/main/java/com/mh/framework/netty/task/CallbackTask.java @@ -0,0 +1,20 @@ +package com.mh.framework.netty.task; + +/** + * @author LJF + * @version 1.0 + * @project TAD_Server + * @description 回调任务 + * @date 2023/7/3 15:34:11 + */ +public interface CallbackTask { + T execute() throws Exception; + + /** + * // 执行没有 异常的情况下的 返回值 + * @param t + */ + void onBack(T t); + + void onException(Throwable t); +} diff --git a/mh-framework/src/main/java/com/mh/framework/netty/task/CallbackTaskScheduler.java b/mh-framework/src/main/java/com/mh/framework/netty/task/CallbackTaskScheduler.java new file mode 100644 index 0000000..3ef3c34 --- /dev/null +++ b/mh-framework/src/main/java/com/mh/framework/netty/task/CallbackTaskScheduler.java @@ -0,0 +1,78 @@ +package com.mh.framework.netty.task; + +import com.google.common.util.concurrent.*; + +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * @author LJF + * @version 1.0 + * @project TAD_Server + * @description 回调任务 + * @date 2023/7/3 15:34:11 + */ +public class CallbackTaskScheduler extends Thread { + private ConcurrentLinkedQueue executeTaskQueue = + new ConcurrentLinkedQueue<>(); + private long sleepTime = 1000 * 10; + private final ExecutorService pool = Executors.newCachedThreadPool(); + ListeningExecutorService lpool = MoreExecutors.listeningDecorator(pool); + private static CallbackTaskScheduler inst = new CallbackTaskScheduler(); + private CallbackTaskScheduler() { + this.start(); + } + //add task + public static void add(CallbackTask executeTask) { + inst.executeTaskQueue.add(executeTask); + } + + @Override + public void run() { + while (true) { + handleTask(); + //为了避免频繁连接服务器,但是当前连接服务器过长导致失败 + //threadSleep(sleepTime); + } + } + + private void threadSleep(long sleepTime) { + try { + Thread.sleep(sleepTime); + }catch (Exception e) { + e.printStackTrace(); + } + } + + //任务执行 + private void handleTask() { + CallbackTask executeTask = null; + while (executeTaskQueue.peek() != null) { + executeTask = executeTaskQueue.poll(); + handleTask(executeTask); + } + } + private void handleTask(CallbackTask executeTask) { + ListenableFuture future = lpool.submit(new Callable() { + public T call() throws Exception { + return executeTask.execute(); + } + }); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(T t) { + executeTask.onBack(t); + } + + @Override + public void onFailure(Throwable throwable) { + executeTask.onException(throwable); + } + + + }, pool); + } +} + diff --git a/mh-framework/src/main/java/com/mh/framework/netty/task/ExecuteTask.java b/mh-framework/src/main/java/com/mh/framework/netty/task/ExecuteTask.java new file mode 100644 index 0000000..2a29b3a --- /dev/null +++ b/mh-framework/src/main/java/com/mh/framework/netty/task/ExecuteTask.java @@ -0,0 +1,6 @@ +package com.mh.framework.netty.task; + +//不需要知道异步线程的 返回值 +public interface ExecuteTask { + void execute(); +} diff --git a/mh-framework/src/main/java/com/mh/framework/netty/task/FutureTaskScheduler.java b/mh-framework/src/main/java/com/mh/framework/netty/task/FutureTaskScheduler.java new file mode 100644 index 0000000..007a28e --- /dev/null +++ b/mh-framework/src/main/java/com/mh/framework/netty/task/FutureTaskScheduler.java @@ -0,0 +1,67 @@ +package com.mh.framework.netty.task; + +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * @author LJF + * @version 1.0 + * @project TAD_Server + * @description 任务定时 + * @date 2023/7/3 15:34:11 + */ +public class FutureTaskScheduler extends Thread{ + private ConcurrentLinkedQueue executeTaskQueue = + new ConcurrentLinkedQueue<>(); + private long sleepTime = 200; + private ExecutorService pool = Executors.newFixedThreadPool(10); + private static FutureTaskScheduler inst = new FutureTaskScheduler(); + public FutureTaskScheduler() { + this.start(); + } + //任务添加 + public static void add(ExecuteTask executeTask) { + inst.executeTaskQueue.add(executeTask); + } + + @Override + public void run() { + while (true) { + handleTask(); + //threadSleep(sleepTime); + } + } + + private void threadSleep(long sleepTime) { + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + //执行任务 + private void handleTask() { + ExecuteTask executeTask; + while (executeTaskQueue.peek() != null) { + executeTask = executeTaskQueue.poll(); + handleTask(executeTask); + } + //刷新心跳时间 + } + private void handleTask(ExecuteTask executeTask) { + pool.execute(new ExecuteRunnable(executeTask)); + } + + class ExecuteRunnable implements Runnable { + ExecuteTask executeTask; + public ExecuteRunnable(ExecuteTask executeTask) { + this.executeTask = executeTask; + } + @Override + public void run() { + executeTask.execute(); + } + } +} diff --git a/mh-quartz/src/main/java/com/mh/quartz/task/AHUTask.java b/mh-quartz/src/main/java/com/mh/quartz/task/AHUTask.java index e5af5e5..5fd9b34 100644 --- a/mh-quartz/src/main/java/com/mh/quartz/task/AHUTask.java +++ b/mh-quartz/src/main/java/com/mh/quartz/task/AHUTask.java @@ -8,6 +8,7 @@ import com.mh.common.core.domain.entity.PolicyManage; import com.mh.common.utils.DateUtils; import com.mh.framework.mqtt.service.IMqttGatewayService; import com.mh.quartz.util.AHUPIDControlUtil; +import com.mh.quartz.util.FuzzyPIDControlUtil; import com.mh.system.service.device.ICollectionParamsManageService; import com.mh.system.service.operation.IOperationDeviceService; import com.mh.system.service.policy.IPolicyManageService; @@ -50,7 +51,7 @@ public class AHUTask { private final IMqttGatewayService iMqttGatewayService; // 在 AHUTask 类中添加一个 PID 控制器成员变量 - private final Map pidControllers = new HashMap<>(); + private final Map pidControllers = new HashMap<>(); @Autowired public AHUTask(ICollectionParamsManageService collectionParamsManageService, IPolicyManageService policyManageService, ICpmSpaceRelationService cpmSpaceRelationService, IOperationDeviceService iOperationService, IMqttGatewayService iMqttGatewayService) { @@ -105,7 +106,7 @@ public class AHUTask { // 设定目标温度(夏季制冷24℃) // ✅ 如果没有该设备的控制器,则创建一个新的并保存起来 - AHUPIDControlUtil controller = pidControllers.computeIfAbsent(deviceLedgerId, k -> new AHUPIDControlUtil()); + FuzzyPIDControlUtil controller = pidControllers.computeIfAbsent(deviceLedgerId, k -> new FuzzyPIDControlUtil(kp, ki, kd)); log.info("开始模糊PID控制循环,查看对象是否有变化:{}", controller); @@ -121,7 +122,7 @@ public class AHUTask { double temp = third.get().getCurValue().doubleValue(); // 2. 计算水阀开度(时间间隔1秒) - double valveOpening1 = controller.compute(backTempSet, temp, 5); + double valveOpening1 = controller.calculate(backTempSet, temp, 1); int valveOpening = new BigDecimal(valveOpening1).intValue(); // 过滤获取水阀调节参数 diff --git a/mh-quartz/src/main/java/com/mh/quartz/task/DealDataTask.java b/mh-quartz/src/main/java/com/mh/quartz/task/DealDataTask.java index ccdbfc5..a59a518 100644 --- a/mh-quartz/src/main/java/com/mh/quartz/task/DealDataTask.java +++ b/mh-quartz/src/main/java/com/mh/quartz/task/DealDataTask.java @@ -325,6 +325,9 @@ public class DealDataTask { if (ObjectUtils.isEmpty(hourEntity)) { //查询设备信息初始值 lastValue = dataProcessService.queryInitValue(key, null); + if (StringUtils.isEmpty(lastValue)) { + lastValue = "0"; + } } else { lastValue = hourEntity.getLastValue(); lastDate = hourEntity.getLastTime(); diff --git a/mh-quartz/src/main/java/com/mh/quartz/util/AHUPIDControlUtil.java b/mh-quartz/src/main/java/com/mh/quartz/util/AHUPIDControlUtil.java index 1d84c84..25397d1 100644 --- a/mh-quartz/src/main/java/com/mh/quartz/util/AHUPIDControlUtil.java +++ b/mh-quartz/src/main/java/com/mh/quartz/util/AHUPIDControlUtil.java @@ -1,7 +1,6 @@ package com.mh.quartz.util; import com.mh.quartz.domain.FuzzyLevel; -import com.mh.quartz.domain.PIDParams; /** * @author LJF @@ -24,7 +23,7 @@ public class AHUPIDControlUtil { private double deltaKdScale = 0.01; public double compute(double setTemp, double currentTemp, double deltaTime) { - double error = setTemp - currentTemp; + double error = currentTemp - setTemp; double dError = (error - previousError) / deltaTime; // 模糊映射 diff --git a/mh-quartz/src/main/java/com/mh/quartz/util/FuzzyPIDControlUtil.java b/mh-quartz/src/main/java/com/mh/quartz/util/FuzzyPIDControlUtil.java new file mode 100644 index 0000000..d75b9df --- /dev/null +++ b/mh-quartz/src/main/java/com/mh/quartz/util/FuzzyPIDControlUtil.java @@ -0,0 +1,152 @@ +package com.mh.quartz.util; + +/** + * @author LJF + * @version 1.0 + * @project EEMCS + * @description 模糊PID控制算法 + * @date 2025-06-04 09:47:16 + */ +public class FuzzyPIDControlUtil { + // PID参数 + private final double kp; // 比例增益 + private final double ki; // 积分增益 + private final double kd; // 微分增益 + + // 控制器状态 + private double prevError; + private double integral; + private double derivative; + + // 模糊规则参数 + private final double[] errorLevels = {-6, -3, -1, 0, 1, 3, 6}; // 温度误差级别(℃) + private final double[] dErrorLevels = {-3, -1, 0, 1, 3}; // 误差变化率级别(℃/min) + private final double[] kpAdjust = {1.5, 2.0, 2.5, 3.0, 4.0}; // Kp调整因子 (增强) + private final double[] kiAdjust = {0.3, 0.7, 1.0, 1.3, 1.7}; // Ki调整因子 + + // 阀门限制 + private static final double MIN_VALVE = 0.0; // 最小开度(0%) + private static final double MAX_VALVE = 100.0; // 最大开度(100%) + + public FuzzyPIDControlUtil(String kp, String ki, String kd) { + this.kp = Double.parseDouble(kp); + this.ki = Double.parseDouble(ki); + this.kd = Double.parseDouble(kd); + this.prevError = 0; + this.integral = 0; + this.derivative = 0; + } + + // 模糊推理计算PID参数调整因子 + private double[] fuzzyInference(double error, double dError) { + // 模糊化:计算误差和误差变化率的隶属度 + double[] errorMembership = calculateMembership(error, errorLevels); + double[] dErrorMembership = calculateMembership(dError, dErrorLevels); + + // 模糊规则库 + double kpAdjustSum = 0.0; + double kiAdjustSum = 0.0; + double weightSum = 0.0; + + // 应用模糊规则 (增强大误差时的响应) + for (int i = 0; i < errorMembership.length; i++) { + for (int j = 0; j < dErrorMembership.length; j++) { + double weight = errorMembership[i] * dErrorMembership[j]; + if (weight > 0) { + // 增强大误差时的响应 + int kpIndex; + if (Math.abs(error) > 3) { // 大误差 + kpIndex = Math.min(Math.max(i + j, 0), kpAdjust.length - 1); + } else { + kpIndex = Math.min(Math.max(i + j - 1, 0), kpAdjust.length - 1); + } + + // Ki调整:小误差时增强积分作用 + int kiIndex; + if (Math.abs(error) < 1) { // 小误差 + kiIndex = Math.min(Math.max(3 + j, 0), kiAdjust.length - 1); + } else { + kiIndex = Math.min(Math.max(2 + j, 0), kiAdjust.length - 1); + } + + kpAdjustSum += weight * kpAdjust[kpIndex]; + kiAdjustSum += weight * kiAdjust[kiIndex]; + weightSum += weight; + } + } + } + + // 反模糊化 (加权平均) + double kpFactor = weightSum > 0 ? kpAdjustSum / weightSum : 1.0; + double kiFactor = weightSum > 0 ? kiAdjustSum / weightSum : 1.0; + + return new double[]{kpFactor, kiFactor, 1.0}; // Kd不调整 + } + + // 计算隶属度 (三角隶属函数) + private double[] calculateMembership(double value, double[] levels) { + double[] membership = new double[levels.length]; + + for (int i = 0; i < levels.length; i++) { + if (i == 0) { + membership[i] = (value <= levels[i]) ? 1.0 : + (value < levels[i+1]) ? (levels[i+1] - value) / (levels[i+1] - levels[i]) : 0.0; + } else if (i == levels.length - 1) { + membership[i] = (value >= levels[i]) ? 1.0 : + (value > levels[i-1]) ? (value - levels[i-1]) / (levels[i] - levels[i-1]) : 0.0; + } else { + if (value >= levels[i-1] && value <= levels[i]) { + membership[i] = (value - levels[i-1]) / (levels[i] - levels[i-1]); + } else if (value >= levels[i] && value <= levels[i+1]) { + membership[i] = (levels[i+1] - value) / (levels[i+1] - levels[i]); + } else { + membership[i] = 0.0; + } + } + } + + return membership; + } + + // 计算控制输出 (阀门开度) - 修复了符号问题 + public double calculate(double setpoint, double currentValue, double dt) { + // 计算误差项 - 修复:当前值高于设定值需要冷却,误差应为正 + double error = currentValue - setpoint; + + // 计算微分项 (基于误差变化率) + if (dt > 0) { + derivative = (error - prevError) / dt; + } + + // 模糊调整PID参数 + double[] adjustments = fuzzyInference(error, derivative); + double adjKp = kp * adjustments[0]; + double adjKi = ki * adjustments[1]; + double adjKd = kd * adjustments[2]; + + // 计算积分项 (带抗饱和) + integral += error * dt; + + // 抗饱和限制 + double maxIntegral = MAX_VALVE / (adjKi + 1e-5); + if (Math.abs(integral) > maxIntegral) { + integral = Math.signum(integral) * maxIntegral; + } + + // PID计算 - 修复:误差为正时需要正输出打开阀门 + double output = adjKp * error + adjKi * integral + adjKd * derivative; + + // 保存误差用于下次计算 + prevError = error; + + // 阀门开度限制 + return Math.min(Math.max(output, MIN_VALVE), MAX_VALVE); + } + + // 重置控制器状态 + public void reset() { + integral = 0; + prevError = 0; + derivative = 0; + } +} diff --git a/mh-system/src/main/java/com/mh/system/service/device/impl/CollectionParamsManageServiceImpl.java b/mh-system/src/main/java/com/mh/system/service/device/impl/CollectionParamsManageServiceImpl.java index ff628fc..78a579c 100644 --- a/mh-system/src/main/java/com/mh/system/service/device/impl/CollectionParamsManageServiceImpl.java +++ b/mh-system/src/main/java/com/mh/system/service/device/impl/CollectionParamsManageServiceImpl.java @@ -230,7 +230,7 @@ public class CollectionParamsManageServiceImpl implements ICollectionParamsManag // } else { // queryWrapper.ne("mt_type", mtType); // } - queryWrapper.eq("is_use", 0); +// queryWrapper.eq("is_use", 0); queryWrapper.orderByAsc("order_num"); return collectionParamsManageMapper.selectList(queryWrapper); } diff --git a/mh-system/src/main/java/com/mh/system/service/operation/IOperationDeviceService.java b/mh-system/src/main/java/com/mh/system/service/operation/IOperationDeviceService.java index fd6d1fd..a2df029 100644 --- a/mh-system/src/main/java/com/mh/system/service/operation/IOperationDeviceService.java +++ b/mh-system/src/main/java/com/mh/system/service/operation/IOperationDeviceService.java @@ -1,6 +1,5 @@ package com.mh.system.service.operation; -import com.mh.common.core.domain.AjaxResult; import com.mh.common.core.domain.entity.OrderEntity; import java.util.List; @@ -16,4 +15,6 @@ public interface IOperationDeviceService { String operationDevice(List changeValues); + boolean isAdvanTech(List changeValues); + } diff --git a/mh-system/src/main/java/com/mh/system/service/operation/impl/OperationDeviceServiceImpl.java b/mh-system/src/main/java/com/mh/system/service/operation/impl/OperationDeviceServiceImpl.java index b8d38f3..ad15673 100644 --- a/mh-system/src/main/java/com/mh/system/service/operation/impl/OperationDeviceServiceImpl.java +++ b/mh-system/src/main/java/com/mh/system/service/operation/impl/OperationDeviceServiceImpl.java @@ -2,14 +2,19 @@ package com.mh.system.service.operation.impl; import com.alibaba.fastjson2.JSONObject; import com.mh.common.constant.Constants; +import com.mh.common.core.domain.AjaxResult; import com.mh.common.core.domain.entity.CollectionParamsManage; +import com.mh.common.core.domain.entity.GatewayManage; import com.mh.common.core.domain.entity.OrderEntity; import com.mh.common.core.domain.entity.PolicyManage; import com.mh.common.model.request.AdvantechDatas; import com.mh.common.model.response.AdvantechResponse; import com.mh.common.utils.DateUtils; +import com.mh.common.utils.ModbusUtils; +import com.mh.common.utils.NettyTools; import com.mh.common.utils.StringUtils; import com.mh.system.mapper.device.CollectionParamsManageMapper; +import com.mh.system.mapper.device.GatewayManageMapper; import com.mh.system.mapper.policy.PolicyManageMapper; import com.mh.system.service.operation.IOperationDeviceService; import jakarta.annotation.Resource; @@ -19,6 +24,9 @@ import org.springframework.stereotype.Service; import java.math.BigDecimal; import java.util.ArrayList; import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; /** * @author LJF @@ -37,6 +45,23 @@ public class OperationDeviceServiceImpl implements IOperationDeviceService { @Resource private PolicyManageMapper policyManageMapper; + @Override + public boolean isAdvanTech(List changeValues) { + // 判断是否存在非研华网关设备 + if (!changeValues.isEmpty()) { + for (OrderEntity changeValue : changeValues) { + String cpmId = changeValue.getId(); + CollectionParamsManage collectionParamsManage = collectionParamsManageMapper.selectById(cpmId); + if (null != collectionParamsManage) { + if (!collectionParamsManage.getGatewayId().contains("99")) { + return true; + } + } + } + } + return false; + } + @Override public String operationDevice(List changeValues) { // 拼接发送的报文