25 changed files with 999 additions and 30 deletions
			
			
		@ -0,0 +1,63 @@
					 | 
				
			||||
package com.mh.common.core.redis; | 
				
			||||
 | 
				
			||||
import lombok.extern.slf4j.Slf4j; | 
				
			||||
import org.springframework.data.redis.core.StringRedisTemplate; | 
				
			||||
import org.springframework.data.redis.core.script.RedisScript; | 
				
			||||
import org.springframework.stereotype.Component; | 
				
			||||
 | 
				
			||||
import java.util.Collections; | 
				
			||||
import java.util.concurrent.TimeUnit; | 
				
			||||
 | 
				
			||||
/** | 
				
			||||
 * @author LJF | 
				
			||||
 * @version 1.0 | 
				
			||||
 * @project EEMCS | 
				
			||||
 * @description 锁 | 
				
			||||
 * @date 2025-06-06 16:08:13 | 
				
			||||
 */ | 
				
			||||
@Slf4j | 
				
			||||
@Component | 
				
			||||
public class RedisLock { | 
				
			||||
 | 
				
			||||
    private final StringRedisTemplate redisTemplate; | 
				
			||||
 | 
				
			||||
    public RedisLock(StringRedisTemplate redisTemplate) { | 
				
			||||
        this.redisTemplate = redisTemplate; | 
				
			||||
    } | 
				
			||||
 | 
				
			||||
    /** | 
				
			||||
     * 获取锁 | 
				
			||||
     */ | 
				
			||||
    public boolean lock(String key, String requestId, long expireTimeInSeconds) { | 
				
			||||
        Boolean success = redisTemplate.opsForValue().setIfAbsent(key, requestId, expireTimeInSeconds, TimeUnit.SECONDS); | 
				
			||||
        return Boolean.TRUE.equals(success); | 
				
			||||
    } | 
				
			||||
 | 
				
			||||
    /** | 
				
			||||
     * 尝试获取锁(带超时) | 
				
			||||
     */ | 
				
			||||
    public boolean tryLock(String key, String requestId, long expireTime, long timeoutMs) throws InterruptedException { | 
				
			||||
        long startTime = System.currentTimeMillis(); | 
				
			||||
        while (System.currentTimeMillis() - startTime < timeoutMs) { | 
				
			||||
            if (lock(key, requestId, expireTime)) { | 
				
			||||
                return true; | 
				
			||||
            } | 
				
			||||
            Thread.sleep(50); | 
				
			||||
        } | 
				
			||||
        return false; | 
				
			||||
    } | 
				
			||||
 | 
				
			||||
    /** | 
				
			||||
     * 释放锁(使用 Lua 脚本保证原子性) | 
				
			||||
     */ | 
				
			||||
    public void unlock(String key, String requestId) { | 
				
			||||
        String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; | 
				
			||||
        RedisScript<Long> redisScript = RedisScript.of(script, Long.class); | 
				
			||||
 | 
				
			||||
        Long result = redisTemplate.execute(redisScript, Collections.singletonList(key), requestId); | 
				
			||||
 | 
				
			||||
        if (result == null || result == 0) { | 
				
			||||
            log.warn("释放锁失败,可能已被其他线程释放 key={}", key); | 
				
			||||
        } | 
				
			||||
    } | 
				
			||||
} | 
				
			||||
@ -0,0 +1,47 @@
					 | 
				
			||||
package com.mh.common.utils; | 
				
			||||
 | 
				
			||||
import io.netty.buffer.ByteBuf; | 
				
			||||
import io.netty.buffer.Unpooled; | 
				
			||||
import io.netty.channel.ChannelHandlerContext; | 
				
			||||
 | 
				
			||||
/** | 
				
			||||
 * @author LJF | 
				
			||||
 * @version 1.0 | 
				
			||||
 * @project EEMCS | 
				
			||||
 * @description Modbus协议工具类 | 
				
			||||
 * @date 2025-06-06 14:40:24 | 
				
			||||
 */ | 
				
			||||
public class ModbusUtils { | 
				
			||||
    public static String createControlCode(String mtCode, Integer type, String registerAddr, String param) { | 
				
			||||
        String orderStr; | 
				
			||||
        mtCode = ExchangeStringUtil.addZeroForNum(mtCode, 2); | 
				
			||||
        registerAddr = ExchangeStringUtil.addZeroForNum(registerAddr, 4); | 
				
			||||
        param = ExchangeStringUtil.addZeroForNum(ExchangeStringUtil.decToHex(param), 4); | 
				
			||||
        orderStr = mtCode + "06" + registerAddr + param; | 
				
			||||
        byte[] strOrder = ExchangeStringUtil.hexStrToBinaryStr(orderStr); | 
				
			||||
        int checkNum = CRC16.CRC16_MODBUS(strOrder); | 
				
			||||
        String checkWord = ExchangeStringUtil.decToHex(String.valueOf(checkNum)); | 
				
			||||
        checkWord = checkWord.substring(2, 4) + checkWord.substring(0, 2); | 
				
			||||
        return orderStr + checkWord; | 
				
			||||
    } | 
				
			||||
 | 
				
			||||
    public static ByteBuf getByteBuf(ChannelHandlerContext ctx, String sendStr) { | 
				
			||||
        // byte类型的数据
 | 
				
			||||
//        String sendStr = "5803004900021914";          // 冷量计
 | 
				
			||||
        // 申请一个数据结构存储信息
 | 
				
			||||
        ByteBuf buffer = ctx.alloc().buffer(); | 
				
			||||
        // 将信息放入数据结构中
 | 
				
			||||
        buffer.writeBytes(ExchangeStringUtil.hexStrToBinaryStr(sendStr));//对接需要16进制
 | 
				
			||||
        return buffer; | 
				
			||||
    } | 
				
			||||
 | 
				
			||||
    public static ByteBuf createByteBuf(String sendStr) { | 
				
			||||
        // byte类型的数据
 | 
				
			||||
//        String sendStr = "5803004900021914";          // 冷量计
 | 
				
			||||
        // 申请一个数据结构存储信息
 | 
				
			||||
        ByteBuf buffer = Unpooled.buffer(); | 
				
			||||
        // 将信息放入数据结构中
 | 
				
			||||
        buffer.writeBytes(ExchangeStringUtil.hexStrToBinaryStr(sendStr));//对接需要16进制
 | 
				
			||||
        return buffer; | 
				
			||||
    } | 
				
			||||
} | 
				
			||||
@ -0,0 +1,77 @@
					 | 
				
			||||
package com.mh.common.utils; | 
				
			||||
 | 
				
			||||
 | 
				
			||||
import com.google.common.cache.Cache; | 
				
			||||
import com.google.common.cache.CacheBuilder; | 
				
			||||
import lombok.extern.slf4j.Slf4j; | 
				
			||||
import org.apache.commons.lang3.StringUtils; | 
				
			||||
 | 
				
			||||
import java.util.Objects; | 
				
			||||
import java.util.concurrent.BlockingQueue; | 
				
			||||
import java.util.concurrent.LinkedBlockingQueue; | 
				
			||||
import java.util.concurrent.TimeUnit; | 
				
			||||
 | 
				
			||||
/** | 
				
			||||
 * @author LJF | 
				
			||||
 * @version 1.0 | 
				
			||||
 * @project TAD_Server | 
				
			||||
 * @description 缓存等待数据 | 
				
			||||
 * @date 2023/7/4 08:45:16 | 
				
			||||
 */ | 
				
			||||
@Slf4j | 
				
			||||
public class NettyTools { | 
				
			||||
 | 
				
			||||
    /** | 
				
			||||
     * 响应消息缓存 | 
				
			||||
     */ | 
				
			||||
    private static final Cache<String, BlockingQueue<String>> responseMsgCache = CacheBuilder.newBuilder() | 
				
			||||
            .maximumSize(500) | 
				
			||||
            .expireAfterWrite(1000, TimeUnit.SECONDS) | 
				
			||||
            .build(); | 
				
			||||
 | 
				
			||||
 | 
				
			||||
    /** | 
				
			||||
     * 等待响应消息 | 
				
			||||
     * @param key 消息唯一标识 | 
				
			||||
     * @return ReceiveDdcMsgVo | 
				
			||||
     */ | 
				
			||||
    public static boolean waitReceiveMsg(String key) { | 
				
			||||
 | 
				
			||||
        try { | 
				
			||||
            //设置超时时间
 | 
				
			||||
            String vo = Objects.requireNonNull(responseMsgCache.getIfPresent(key)) | 
				
			||||
                .poll(1000 * 10, TimeUnit.MILLISECONDS); | 
				
			||||
 | 
				
			||||
            //删除key
 | 
				
			||||
            responseMsgCache.invalidate(key); | 
				
			||||
            return StringUtils.isNotBlank(vo); | 
				
			||||
        } catch (Exception e) { | 
				
			||||
            log.error("获取数据异常,sn={},msg=null",key); | 
				
			||||
            return false; | 
				
			||||
        } | 
				
			||||
 | 
				
			||||
    } | 
				
			||||
 | 
				
			||||
    /** | 
				
			||||
     * 初始化响应消息的队列 | 
				
			||||
     * @param key 消息唯一标识 | 
				
			||||
     */ | 
				
			||||
    public static void initReceiveMsg(String key) { | 
				
			||||
        responseMsgCache.put(key,new LinkedBlockingQueue<String>(1)); | 
				
			||||
    } | 
				
			||||
 | 
				
			||||
    /** | 
				
			||||
     * 设置响应消息 | 
				
			||||
     * @param key 消息唯一标识 | 
				
			||||
     */ | 
				
			||||
    public static void setReceiveMsg(String key, String msg) { | 
				
			||||
 | 
				
			||||
        if(responseMsgCache.getIfPresent(key) != null){ | 
				
			||||
            responseMsgCache.getIfPresent(key).add(msg); | 
				
			||||
            return; | 
				
			||||
        } | 
				
			||||
 | 
				
			||||
        log.warn("sn {}不存在",key); | 
				
			||||
    } | 
				
			||||
 | 
				
			||||
} | 
				
			||||
@ -0,0 +1,18 @@
					 | 
				
			||||
package com.mh.framework.netty; | 
				
			||||
 | 
				
			||||
import com.mh.common.core.domain.entity.OrderEntity; | 
				
			||||
 | 
				
			||||
import java.util.List; | 
				
			||||
 | 
				
			||||
/** | 
				
			||||
 * @author LJF | 
				
			||||
 * @version 1.0 | 
				
			||||
 * @project EEMCS | 
				
			||||
 * @description netty | 
				
			||||
 * @date 2025-06-06 15:13:06 | 
				
			||||
 */ | 
				
			||||
public interface INettyService { | 
				
			||||
 | 
				
			||||
    boolean sendOrder(List<OrderEntity> changeValues); | 
				
			||||
 | 
				
			||||
} | 
				
			||||
@ -0,0 +1,119 @@
					 | 
				
			||||
package com.mh.framework.netty; | 
				
			||||
 | 
				
			||||
import com.mh.common.core.domain.AjaxResult; | 
				
			||||
import com.mh.common.core.domain.entity.CollectionParamsManage; | 
				
			||||
import com.mh.common.core.domain.entity.GatewayManage; | 
				
			||||
import com.mh.common.core.domain.entity.OrderEntity; | 
				
			||||
import com.mh.common.core.redis.RedisCache; | 
				
			||||
import com.mh.common.core.redis.RedisLock; | 
				
			||||
import com.mh.common.utils.ModbusUtils; | 
				
			||||
import com.mh.common.utils.NettyTools; | 
				
			||||
import com.mh.common.utils.StringUtils; | 
				
			||||
import com.mh.framework.netty.session.ServerSession; | 
				
			||||
import com.mh.framework.netty.session.SessionMap; | 
				
			||||
import com.mh.system.mapper.device.CollectionParamsManageMapper; | 
				
			||||
import com.mh.system.mapper.device.GatewayManageMapper; | 
				
			||||
import jakarta.annotation.Resource; | 
				
			||||
import lombok.extern.slf4j.Slf4j; | 
				
			||||
import org.springframework.stereotype.Service; | 
				
			||||
 | 
				
			||||
import java.util.List; | 
				
			||||
import java.util.Map; | 
				
			||||
import java.util.Set; | 
				
			||||
import java.util.UUID; | 
				
			||||
import java.util.concurrent.ConcurrentHashMap; | 
				
			||||
import java.util.concurrent.TimeUnit; | 
				
			||||
 | 
				
			||||
/** | 
				
			||||
 * @author LJF | 
				
			||||
 * @version 1.0 | 
				
			||||
 * @project EEMCS | 
				
			||||
 * @description netty实现类 | 
				
			||||
 * @date 2025-06-06 15:13:23 | 
				
			||||
 */ | 
				
			||||
@Slf4j | 
				
			||||
@Service | 
				
			||||
public class NettyServiceImpl implements INettyService { | 
				
			||||
 | 
				
			||||
    @Resource | 
				
			||||
    private CollectionParamsManageMapper collectionParamsManageMapper; | 
				
			||||
 | 
				
			||||
    @Resource | 
				
			||||
    private GatewayManageMapper gatewayManageMapper; | 
				
			||||
 | 
				
			||||
    @Resource | 
				
			||||
    private RedisCache redisCache; | 
				
			||||
 | 
				
			||||
    @Resource | 
				
			||||
    private RedisLock redisLock; | 
				
			||||
 | 
				
			||||
    @Override | 
				
			||||
    public boolean sendOrder(List<OrderEntity> changeValues) { | 
				
			||||
        for (OrderEntity changeValue : changeValues) { | 
				
			||||
            String cpmId = changeValue.getId(); | 
				
			||||
            CollectionParamsManage collectionParamsManage = collectionParamsManageMapper.selectById(cpmId); | 
				
			||||
            if (null == collectionParamsManage) { | 
				
			||||
                return false; | 
				
			||||
            } | 
				
			||||
            GatewayManage gatewayManage = gatewayManageMapper.selectById(collectionParamsManage.getGatewayId()); | 
				
			||||
            if (null == gatewayManage || StringUtils.isEmpty(gatewayManage.getHeartBeat())) { | 
				
			||||
                return false; | 
				
			||||
            } | 
				
			||||
            ConcurrentHashMap<String, ServerSession> map = SessionMap.inst().getMap(); | 
				
			||||
            Set<Map.Entry<String, ServerSession>> entries = map.entrySet(); | 
				
			||||
            boolean flag = false; | 
				
			||||
            String keyVal = null; | 
				
			||||
            for (Map.Entry<String, ServerSession> entry : entries) { | 
				
			||||
                String key = entry.getKey(); | 
				
			||||
                if (key.contains(gatewayManage.getHeartBeat())){ | 
				
			||||
                    flag = true; | 
				
			||||
                    keyVal = key; | 
				
			||||
                    break; | 
				
			||||
                } | 
				
			||||
            } | 
				
			||||
            if (flag) { | 
				
			||||
                ServerSession serverSession = map.get(keyVal); | 
				
			||||
                // 目前只有DTU,modbus方式,只创建modbus先
 | 
				
			||||
                String controlCode = ModbusUtils.createControlCode(collectionParamsManage.getMtCode(), | 
				
			||||
                        changeValue.getType(), | 
				
			||||
                        collectionParamsManage.getRegisterAddr(), | 
				
			||||
                        changeValue.getParam()); | 
				
			||||
                if (StringUtils.isEmpty(controlCode)) { | 
				
			||||
                    log.error("创建控制码失败"); | 
				
			||||
                    return false; | 
				
			||||
                } | 
				
			||||
 | 
				
			||||
                String requestId = UUID.randomUUID().toString(); // 唯一标识当前请求
 | 
				
			||||
                String lockKey = "lock:order_send:" + gatewayManage.getHeartBeat(); // 按网关分锁
 | 
				
			||||
 | 
				
			||||
                try { | 
				
			||||
                    if (!redisLock.tryLock(lockKey, requestId, 10, 10)) { | 
				
			||||
                        log.warn("获取锁失败,当前操作繁忙"); | 
				
			||||
                        return false; | 
				
			||||
                    } | 
				
			||||
                    // 初始化发送指令
 | 
				
			||||
                    NettyTools.initReceiveMsg("order_wait"); | 
				
			||||
                    // 设置缓存,方便在netty中判断发送的指令
 | 
				
			||||
                    redisCache.setCacheObject("order_send", controlCode, 10, TimeUnit.SECONDS); | 
				
			||||
                    // 发送指令
 | 
				
			||||
                    serverSession.getChannel().writeAndFlush(ModbusUtils.createByteBuf(controlCode)); | 
				
			||||
                    // 等待指令
 | 
				
			||||
                    if (NettyTools.waitReceiveMsg("order_wait")) { | 
				
			||||
                        log.error("发送指令成功,心跳包:{}", gatewayManage.getHeartBeat()); | 
				
			||||
                        return true; | 
				
			||||
                    } else { | 
				
			||||
                        log.error("发送指令异常,心跳包:{}", gatewayManage.getHeartBeat()); | 
				
			||||
                        return false; | 
				
			||||
                    } | 
				
			||||
                } catch (InterruptedException e) { | 
				
			||||
                    log.error("发送指令异常", e); | 
				
			||||
                } finally { | 
				
			||||
                    redisLock.unlock(lockKey, requestId); | 
				
			||||
                } | 
				
			||||
            } | 
				
			||||
            log.error("当前设备不在线,心跳包:{}",gatewayManage.getHeartBeat()); | 
				
			||||
            return false; | 
				
			||||
        } | 
				
			||||
        return false; | 
				
			||||
    } | 
				
			||||
} | 
				
			||||
@ -0,0 +1,66 @@
					 | 
				
			||||
package com.mh.framework.netty.session; | 
				
			||||
 | 
				
			||||
import io.netty.channel.Channel; | 
				
			||||
import io.netty.channel.ChannelFuture; | 
				
			||||
import io.netty.channel.ChannelFutureListener; | 
				
			||||
import io.netty.channel.ChannelHandlerContext; | 
				
			||||
import io.netty.util.AttributeKey; | 
				
			||||
import lombok.Data; | 
				
			||||
import lombok.extern.slf4j.Slf4j; | 
				
			||||
 | 
				
			||||
@Data | 
				
			||||
@Slf4j | 
				
			||||
public class ServerSession { | 
				
			||||
    public static final AttributeKey<ServerSession> SESSION_KEY = | 
				
			||||
            AttributeKey.valueOf("SESSION_KEY"); | 
				
			||||
    //通道
 | 
				
			||||
    private Channel channel; | 
				
			||||
    private final String sessionId; | 
				
			||||
    private boolean isLogin = false; | 
				
			||||
 | 
				
			||||
    public ServerSession(Channel channel, String deviceCode){ | 
				
			||||
        this.channel = channel; | 
				
			||||
        this.sessionId = deviceCode; | 
				
			||||
    } | 
				
			||||
 | 
				
			||||
    //session需要和通道进行一定的关联,他是在构造函数中关联上的;
 | 
				
			||||
    //session还需要通过sessionkey和channel进行再次的关联;channel.attr方法.set当前的
 | 
				
			||||
    // serverSession
 | 
				
			||||
    //session需要被添加到我们的SessionMap中
 | 
				
			||||
    public void bind(){ | 
				
			||||
        log.info("server Session 会话进行绑定 :" + channel.remoteAddress()); | 
				
			||||
        channel.attr(SESSION_KEY).set(this); | 
				
			||||
        SessionMap.inst().addSession(sessionId, this); | 
				
			||||
        this.isLogin = true; | 
				
			||||
    } | 
				
			||||
 | 
				
			||||
    //通过channel获取session
 | 
				
			||||
    public static ServerSession getSession(ChannelHandlerContext ctx){ | 
				
			||||
        Channel channel = ctx.channel(); | 
				
			||||
        return channel.attr(SESSION_KEY).get(); | 
				
			||||
    } | 
				
			||||
 | 
				
			||||
    //关闭session,新增返回一个meterNum用于纪录设备下线时间2024-05-08
 | 
				
			||||
    public static String closeSession(ChannelHandlerContext ctx){ | 
				
			||||
        String meterNum = null; | 
				
			||||
        ServerSession serverSession = ctx.channel().attr(SESSION_KEY).get(); | 
				
			||||
        if(serverSession != null && serverSession.getSessionId() != null) { | 
				
			||||
            ChannelFuture future = serverSession.channel.close(); | 
				
			||||
            future.addListener((ChannelFutureListener) future1 -> { | 
				
			||||
                if(!future1.isSuccess()) { | 
				
			||||
                    log.info("Channel close error!"); | 
				
			||||
                } | 
				
			||||
            }); | 
				
			||||
            ctx.close(); | 
				
			||||
            meterNum = serverSession.sessionId; | 
				
			||||
            SessionMap.inst().removeSession(serverSession.sessionId); | 
				
			||||
            log.info(ctx.channel().remoteAddress()+"  "+serverSession.sessionId + "==>移除会话"); | 
				
			||||
        } | 
				
			||||
        return meterNum; | 
				
			||||
    } | 
				
			||||
 | 
				
			||||
    //写消息
 | 
				
			||||
    public void writeAndFlush(Object msg) { | 
				
			||||
        channel.writeAndFlush(msg); | 
				
			||||
    } | 
				
			||||
} | 
				
			||||
@ -0,0 +1,96 @@
					 | 
				
			||||
package com.mh.framework.netty.session; | 
				
			||||
 | 
				
			||||
import lombok.Data; | 
				
			||||
import lombok.extern.slf4j.Slf4j; | 
				
			||||
 | 
				
			||||
import java.util.Iterator; | 
				
			||||
import java.util.List; | 
				
			||||
import java.util.Map; | 
				
			||||
import java.util.concurrent.ConcurrentHashMap; | 
				
			||||
import java.util.stream.Collectors; | 
				
			||||
 | 
				
			||||
@Data | 
				
			||||
@Slf4j | 
				
			||||
public class SessionMap { | 
				
			||||
 | 
				
			||||
    private ThreadLocal<Boolean> sceneThreadLocal = new ThreadLocal<>(); | 
				
			||||
 | 
				
			||||
    //用单例模式进行sessionMap的创建
 | 
				
			||||
    private SessionMap(){} | 
				
			||||
 | 
				
			||||
    private static SessionMap singleInstance = new SessionMap(); | 
				
			||||
 | 
				
			||||
    public static SessionMap inst() { | 
				
			||||
        return singleInstance; | 
				
			||||
    } | 
				
			||||
 | 
				
			||||
    //进行会话的保存
 | 
				
			||||
    //key 我们使用 sessionId;value 需要是 serverSession
 | 
				
			||||
    private ConcurrentHashMap<String, ServerSession> map = new ConcurrentHashMap<>(256); | 
				
			||||
    //添加session
 | 
				
			||||
    public void addSession(String sessionId, ServerSession s) { | 
				
			||||
        map.put(sessionId, s); | 
				
			||||
        log.info("IP地址:"+s.getChannel().remoteAddress()+"  "+ sessionId + " 表具上线,总共表具:" + map.size()); | 
				
			||||
    } | 
				
			||||
 | 
				
			||||
    //删除session
 | 
				
			||||
    public void removeSession(String sessionId) { | 
				
			||||
        if(map.containsKey(sessionId)) { | 
				
			||||
            ServerSession s = map.get(sessionId); | 
				
			||||
            map.remove(sessionId); | 
				
			||||
            log.info("设备id下线:{},在线设备:{}", s.getSessionId(), map.size() ); | 
				
			||||
        } | 
				
			||||
        return; | 
				
			||||
    } | 
				
			||||
 | 
				
			||||
    public boolean hasLogin(String sessionId) { | 
				
			||||
        Iterator<Map.Entry<String, ServerSession>> iterator = map.entrySet().iterator(); | 
				
			||||
        while(iterator.hasNext()) { | 
				
			||||
            Map.Entry<String, ServerSession> next = iterator.next(); | 
				
			||||
            if(sessionId != null && sessionId.equalsIgnoreCase(next.getValue().getSessionId())) { | 
				
			||||
                return true ; | 
				
			||||
            } | 
				
			||||
        } | 
				
			||||
        return false; | 
				
			||||
    } | 
				
			||||
 | 
				
			||||
    //如果在线,肯定有sessionMap里保存的 serverSession
 | 
				
			||||
    //如果不在线,serverSession也没有。用这个来判断是否在线
 | 
				
			||||
    public List<ServerSession> getSessionBy(String sessionId) { | 
				
			||||
        return map.values().stream(). | 
				
			||||
                filter(s -> s.getSessionId().equals(sessionId)). | 
				
			||||
                collect(Collectors.toList()); | 
				
			||||
    } | 
				
			||||
 | 
				
			||||
    public boolean getScene() { | 
				
			||||
        return sceneThreadLocal.get(); | 
				
			||||
    } | 
				
			||||
 | 
				
			||||
    public void initScene(Boolean status) { | 
				
			||||
        if (sceneThreadLocal == null) { | 
				
			||||
            log.info("======创建ThreadLocal======"); | 
				
			||||
            sceneThreadLocal = new ThreadLocal<>(); | 
				
			||||
        } | 
				
			||||
        log.info("设置状态==>" + status); | 
				
			||||
        sceneThreadLocal.set(status); | 
				
			||||
    } | 
				
			||||
 | 
				
			||||
    public void clearScene() { | 
				
			||||
        initScene(null); | 
				
			||||
        sceneThreadLocal.remove(); | 
				
			||||
    } | 
				
			||||
 | 
				
			||||
    public void updateSession(String sessionId, ServerSession session, String meterNum) { | 
				
			||||
        Iterator<Map.Entry<String, ServerSession>> iterator = map.entrySet().iterator(); | 
				
			||||
        while(iterator.hasNext()) { | 
				
			||||
            Map.Entry<String, ServerSession> next = iterator.next(); | 
				
			||||
            if (next.getKey().contains(meterNum)){ | 
				
			||||
                iterator.remove(); | 
				
			||||
            } | 
				
			||||
            if(sessionId != null && sessionId.equalsIgnoreCase(next.getValue().getSessionId())) { | 
				
			||||
                next.setValue(session); | 
				
			||||
            } | 
				
			||||
        } | 
				
			||||
    } | 
				
			||||
 | 
				
			||||
} | 
				
			||||
@ -0,0 +1,20 @@
					 | 
				
			||||
package com.mh.framework.netty.task; | 
				
			||||
 | 
				
			||||
/** | 
				
			||||
 * @author LJF | 
				
			||||
 * @version 1.0 | 
				
			||||
 * @project TAD_Server | 
				
			||||
 * @description 回调任务 | 
				
			||||
 * @date 2023/7/3 15:34:11 | 
				
			||||
 */ | 
				
			||||
public interface CallbackTask<T> { | 
				
			||||
    T execute() throws Exception; | 
				
			||||
 | 
				
			||||
    /** | 
				
			||||
     * // 执行没有 异常的情况下的 返回值
 | 
				
			||||
     * @param t | 
				
			||||
     */ | 
				
			||||
    void onBack(T t); | 
				
			||||
 | 
				
			||||
    void onException(Throwable t); | 
				
			||||
} | 
				
			||||
@ -0,0 +1,78 @@
					 | 
				
			||||
package com.mh.framework.netty.task; | 
				
			||||
 | 
				
			||||
import com.google.common.util.concurrent.*; | 
				
			||||
 | 
				
			||||
import java.util.concurrent.Callable; | 
				
			||||
import java.util.concurrent.ConcurrentLinkedQueue; | 
				
			||||
import java.util.concurrent.ExecutorService; | 
				
			||||
import java.util.concurrent.Executors; | 
				
			||||
 | 
				
			||||
/** | 
				
			||||
 * @author LJF | 
				
			||||
 * @version 1.0 | 
				
			||||
 * @project TAD_Server | 
				
			||||
 * @description 回调任务 | 
				
			||||
 * @date 2023/7/3 15:34:11 | 
				
			||||
 */ | 
				
			||||
public class CallbackTaskScheduler extends Thread { | 
				
			||||
    private ConcurrentLinkedQueue<CallbackTask> executeTaskQueue = | 
				
			||||
            new ConcurrentLinkedQueue<>(); | 
				
			||||
    private long sleepTime = 1000 * 10; | 
				
			||||
    private final ExecutorService pool = Executors.newCachedThreadPool(); | 
				
			||||
    ListeningExecutorService lpool = MoreExecutors.listeningDecorator(pool); | 
				
			||||
    private static CallbackTaskScheduler inst = new CallbackTaskScheduler(); | 
				
			||||
    private CallbackTaskScheduler() { | 
				
			||||
        this.start(); | 
				
			||||
    } | 
				
			||||
    //add task
 | 
				
			||||
    public static <T> void add(CallbackTask<T> executeTask) { | 
				
			||||
        inst.executeTaskQueue.add(executeTask); | 
				
			||||
    } | 
				
			||||
 | 
				
			||||
    @Override | 
				
			||||
    public void run() { | 
				
			||||
        while (true) { | 
				
			||||
            handleTask(); | 
				
			||||
            //为了避免频繁连接服务器,但是当前连接服务器过长导致失败
 | 
				
			||||
            //threadSleep(sleepTime);
 | 
				
			||||
        } | 
				
			||||
    } | 
				
			||||
 | 
				
			||||
    private void threadSleep(long sleepTime) { | 
				
			||||
        try { | 
				
			||||
            Thread.sleep(sleepTime); | 
				
			||||
        }catch (Exception e) { | 
				
			||||
            e.printStackTrace(); | 
				
			||||
        } | 
				
			||||
    } | 
				
			||||
 | 
				
			||||
    //任务执行
 | 
				
			||||
    private void handleTask() { | 
				
			||||
        CallbackTask executeTask = null; | 
				
			||||
        while (executeTaskQueue.peek() != null) { | 
				
			||||
            executeTask = executeTaskQueue.poll(); | 
				
			||||
            handleTask(executeTask); | 
				
			||||
        } | 
				
			||||
    } | 
				
			||||
    private <T> void handleTask(CallbackTask<T> executeTask) { | 
				
			||||
        ListenableFuture<T> future = lpool.submit(new Callable<T>() { | 
				
			||||
            public T call() throws Exception { | 
				
			||||
                return executeTask.execute(); | 
				
			||||
            } | 
				
			||||
        }); | 
				
			||||
        Futures.addCallback(future, new FutureCallback<T>() { | 
				
			||||
            @Override | 
				
			||||
            public void onSuccess(T t) { | 
				
			||||
                executeTask.onBack(t); | 
				
			||||
            } | 
				
			||||
 | 
				
			||||
            @Override | 
				
			||||
            public void onFailure(Throwable throwable) { | 
				
			||||
                executeTask.onException(throwable); | 
				
			||||
            } | 
				
			||||
 | 
				
			||||
 | 
				
			||||
        }, pool); | 
				
			||||
    } | 
				
			||||
} | 
				
			||||
 | 
				
			||||
@ -0,0 +1,6 @@
					 | 
				
			||||
package com.mh.framework.netty.task; | 
				
			||||
 | 
				
			||||
//不需要知道异步线程的 返回值
 | 
				
			||||
public interface ExecuteTask { | 
				
			||||
    void execute(); | 
				
			||||
} | 
				
			||||
@ -0,0 +1,67 @@
					 | 
				
			||||
package com.mh.framework.netty.task; | 
				
			||||
 | 
				
			||||
import java.util.concurrent.ConcurrentLinkedQueue; | 
				
			||||
import java.util.concurrent.ExecutorService; | 
				
			||||
import java.util.concurrent.Executors; | 
				
			||||
 | 
				
			||||
/** | 
				
			||||
 * @author LJF | 
				
			||||
 * @version 1.0 | 
				
			||||
 * @project TAD_Server | 
				
			||||
 * @description 任务定时 | 
				
			||||
 * @date 2023/7/3 15:34:11 | 
				
			||||
 */ | 
				
			||||
public class FutureTaskScheduler extends Thread{ | 
				
			||||
    private ConcurrentLinkedQueue<ExecuteTask> executeTaskQueue = | 
				
			||||
            new ConcurrentLinkedQueue<>(); | 
				
			||||
    private long sleepTime = 200; | 
				
			||||
    private ExecutorService pool = Executors.newFixedThreadPool(10); | 
				
			||||
    private static FutureTaskScheduler inst = new FutureTaskScheduler(); | 
				
			||||
    public FutureTaskScheduler() { | 
				
			||||
        this.start(); | 
				
			||||
    } | 
				
			||||
    //任务添加
 | 
				
			||||
    public static void add(ExecuteTask executeTask) { | 
				
			||||
        inst.executeTaskQueue.add(executeTask); | 
				
			||||
    } | 
				
			||||
 | 
				
			||||
    @Override | 
				
			||||
    public void run() { | 
				
			||||
        while (true) { | 
				
			||||
            handleTask(); | 
				
			||||
            //threadSleep(sleepTime);
 | 
				
			||||
        } | 
				
			||||
    } | 
				
			||||
 | 
				
			||||
    private void threadSleep(long sleepTime) { | 
				
			||||
        try { | 
				
			||||
            Thread.sleep(sleepTime); | 
				
			||||
        } catch (InterruptedException e) { | 
				
			||||
            e.printStackTrace(); | 
				
			||||
        } | 
				
			||||
    } | 
				
			||||
 | 
				
			||||
    //执行任务
 | 
				
			||||
    private void handleTask() { | 
				
			||||
        ExecuteTask executeTask; | 
				
			||||
        while (executeTaskQueue.peek() != null) { | 
				
			||||
            executeTask = executeTaskQueue.poll(); | 
				
			||||
            handleTask(executeTask); | 
				
			||||
        } | 
				
			||||
        //刷新心跳时间
 | 
				
			||||
    } | 
				
			||||
    private void handleTask(ExecuteTask executeTask) { | 
				
			||||
        pool.execute(new ExecuteRunnable(executeTask)); | 
				
			||||
    } | 
				
			||||
 | 
				
			||||
    class ExecuteRunnable implements Runnable { | 
				
			||||
        ExecuteTask executeTask; | 
				
			||||
        public ExecuteRunnable(ExecuteTask executeTask) { | 
				
			||||
            this.executeTask = executeTask; | 
				
			||||
        } | 
				
			||||
        @Override | 
				
			||||
        public void run() { | 
				
			||||
            executeTask.execute(); | 
				
			||||
        } | 
				
			||||
    } | 
				
			||||
} | 
				
			||||
@ -0,0 +1,152 @@
					 | 
				
			||||
package com.mh.quartz.util; | 
				
			||||
 | 
				
			||||
/** | 
				
			||||
 * @author LJF | 
				
			||||
 * @version 1.0 | 
				
			||||
 * @project EEMCS | 
				
			||||
 * @description 模糊PID控制算法 | 
				
			||||
 * @date 2025-06-04 09:47:16 | 
				
			||||
 */ | 
				
			||||
public class FuzzyPIDControlUtil { | 
				
			||||
    // PID参数
 | 
				
			||||
    private final double kp;   // 比例增益
 | 
				
			||||
    private final double ki;   // 积分增益
 | 
				
			||||
    private final double kd;   // 微分增益
 | 
				
			||||
 | 
				
			||||
    // 控制器状态
 | 
				
			||||
    private double prevError; | 
				
			||||
    private double integral; | 
				
			||||
    private double derivative; | 
				
			||||
 | 
				
			||||
    // 模糊规则参数
 | 
				
			||||
    private final double[] errorLevels = {-6, -3, -1, 0, 1, 3, 6};  // 温度误差级别(℃)
 | 
				
			||||
    private final double[] dErrorLevels = {-3, -1, 0, 1, 3};         // 误差变化率级别(℃/min)
 | 
				
			||||
    private final double[] kpAdjust = {1.5, 2.0, 2.5, 3.0, 4.0};    // Kp调整因子 (增强)
 | 
				
			||||
    private final double[] kiAdjust = {0.3, 0.7, 1.0, 1.3, 1.7};    // Ki调整因子
 | 
				
			||||
 | 
				
			||||
    // 阀门限制
 | 
				
			||||
    private static final double MIN_VALVE = 0.0;    // 最小开度(0%)
 | 
				
			||||
    private static final double MAX_VALVE = 100.0;   // 最大开度(100%)
 | 
				
			||||
 | 
				
			||||
    public FuzzyPIDControlUtil(String kp, String ki, String kd) { | 
				
			||||
        this.kp = Double.parseDouble(kp); | 
				
			||||
        this.ki = Double.parseDouble(ki); | 
				
			||||
        this.kd = Double.parseDouble(kd); | 
				
			||||
        this.prevError = 0; | 
				
			||||
        this.integral = 0; | 
				
			||||
        this.derivative = 0; | 
				
			||||
    } | 
				
			||||
 | 
				
			||||
    // 模糊推理计算PID参数调整因子
 | 
				
			||||
    private double[] fuzzyInference(double error, double dError) { | 
				
			||||
        // 模糊化:计算误差和误差变化率的隶属度
 | 
				
			||||
        double[] errorMembership = calculateMembership(error, errorLevels); | 
				
			||||
        double[] dErrorMembership = calculateMembership(dError, dErrorLevels); | 
				
			||||
 | 
				
			||||
        // 模糊规则库
 | 
				
			||||
        double kpAdjustSum = 0.0; | 
				
			||||
        double kiAdjustSum = 0.0; | 
				
			||||
        double weightSum = 0.0; | 
				
			||||
 | 
				
			||||
        // 应用模糊规则 (增强大误差时的响应)
 | 
				
			||||
        for (int i = 0; i < errorMembership.length; i++) { | 
				
			||||
            for (int j = 0; j < dErrorMembership.length; j++) { | 
				
			||||
                double weight = errorMembership[i] * dErrorMembership[j]; | 
				
			||||
                if (weight > 0) { | 
				
			||||
                    // 增强大误差时的响应
 | 
				
			||||
                    int kpIndex; | 
				
			||||
                    if (Math.abs(error) > 3) { // 大误差
 | 
				
			||||
                        kpIndex = Math.min(Math.max(i + j, 0), kpAdjust.length - 1); | 
				
			||||
                    } else { | 
				
			||||
                        kpIndex = Math.min(Math.max(i + j - 1, 0), kpAdjust.length - 1); | 
				
			||||
                    } | 
				
			||||
 | 
				
			||||
                    // Ki调整:小误差时增强积分作用
 | 
				
			||||
                    int kiIndex; | 
				
			||||
                    if (Math.abs(error) < 1) { // 小误差
 | 
				
			||||
                        kiIndex = Math.min(Math.max(3 + j, 0), kiAdjust.length - 1); | 
				
			||||
                    } else { | 
				
			||||
                        kiIndex = Math.min(Math.max(2 + j, 0), kiAdjust.length - 1); | 
				
			||||
                    } | 
				
			||||
 | 
				
			||||
                    kpAdjustSum += weight * kpAdjust[kpIndex]; | 
				
			||||
                    kiAdjustSum += weight * kiAdjust[kiIndex]; | 
				
			||||
                    weightSum += weight; | 
				
			||||
                } | 
				
			||||
            } | 
				
			||||
        } | 
				
			||||
 | 
				
			||||
        // 反模糊化 (加权平均)
 | 
				
			||||
        double kpFactor = weightSum > 0 ? kpAdjustSum / weightSum : 1.0; | 
				
			||||
        double kiFactor = weightSum > 0 ? kiAdjustSum / weightSum : 1.0; | 
				
			||||
 | 
				
			||||
        return new double[]{kpFactor, kiFactor, 1.0}; // Kd不调整
 | 
				
			||||
    } | 
				
			||||
 | 
				
			||||
    // 计算隶属度 (三角隶属函数)
 | 
				
			||||
    private double[] calculateMembership(double value, double[] levels) { | 
				
			||||
        double[] membership = new double[levels.length]; | 
				
			||||
 | 
				
			||||
        for (int i = 0; i < levels.length; i++) { | 
				
			||||
            if (i == 0) { | 
				
			||||
                membership[i] = (value <= levels[i]) ? 1.0 : | 
				
			||||
                        (value < levels[i+1]) ? (levels[i+1] - value) / (levels[i+1] - levels[i]) : 0.0; | 
				
			||||
            } else if (i == levels.length - 1) { | 
				
			||||
                membership[i] = (value >= levels[i]) ? 1.0 : | 
				
			||||
                        (value > levels[i-1]) ? (value - levels[i-1]) / (levels[i] - levels[i-1]) : 0.0; | 
				
			||||
            } else { | 
				
			||||
                if (value >= levels[i-1] && value <= levels[i]) { | 
				
			||||
                    membership[i] = (value - levels[i-1]) / (levels[i] - levels[i-1]); | 
				
			||||
                } else if (value >= levels[i] && value <= levels[i+1]) { | 
				
			||||
                    membership[i] = (levels[i+1] - value) / (levels[i+1] - levels[i]); | 
				
			||||
                } else { | 
				
			||||
                    membership[i] = 0.0; | 
				
			||||
                } | 
				
			||||
            } | 
				
			||||
        } | 
				
			||||
 | 
				
			||||
        return membership; | 
				
			||||
    } | 
				
			||||
 | 
				
			||||
    // 计算控制输出 (阀门开度) - 修复了符号问题
 | 
				
			||||
    public double calculate(double setpoint, double currentValue, double dt) { | 
				
			||||
        // 计算误差项 - 修复:当前值高于设定值需要冷却,误差应为正
 | 
				
			||||
        double error = currentValue - setpoint; | 
				
			||||
 | 
				
			||||
        // 计算微分项 (基于误差变化率)
 | 
				
			||||
        if (dt > 0) { | 
				
			||||
            derivative = (error - prevError) / dt; | 
				
			||||
        } | 
				
			||||
 | 
				
			||||
        // 模糊调整PID参数
 | 
				
			||||
        double[] adjustments = fuzzyInference(error, derivative); | 
				
			||||
        double adjKp = kp * adjustments[0]; | 
				
			||||
        double adjKi = ki * adjustments[1]; | 
				
			||||
        double adjKd = kd * adjustments[2]; | 
				
			||||
 | 
				
			||||
        // 计算积分项 (带抗饱和)
 | 
				
			||||
        integral += error * dt; | 
				
			||||
 | 
				
			||||
        // 抗饱和限制
 | 
				
			||||
        double maxIntegral = MAX_VALVE / (adjKi + 1e-5); | 
				
			||||
        if (Math.abs(integral) > maxIntegral) { | 
				
			||||
            integral = Math.signum(integral) * maxIntegral; | 
				
			||||
        } | 
				
			||||
 | 
				
			||||
        // PID计算 - 修复:误差为正时需要正输出打开阀门
 | 
				
			||||
        double output = adjKp * error + adjKi * integral + adjKd * derivative; | 
				
			||||
 | 
				
			||||
        // 保存误差用于下次计算
 | 
				
			||||
        prevError = error; | 
				
			||||
 | 
				
			||||
        // 阀门开度限制
 | 
				
			||||
        return Math.min(Math.max(output, MIN_VALVE), MAX_VALVE); | 
				
			||||
    } | 
				
			||||
 | 
				
			||||
    // 重置控制器状态
 | 
				
			||||
    public void reset() { | 
				
			||||
        integral = 0; | 
				
			||||
        prevError = 0; | 
				
			||||
        derivative = 0; | 
				
			||||
    } | 
				
			||||
} | 
				
			||||
					Loading…
					
					
				
		Reference in new issue