diff --git a/user-service/src/main/java/com/mh/user/netty/ClientManage.java b/user-service/src/main/java/com/mh/user/netty/ClientManage.java index ea5fbe6..ae7d7b0 100644 --- a/user-service/src/main/java/com/mh/user/netty/ClientManage.java +++ b/user-service/src/main/java/com/mh/user/netty/ClientManage.java @@ -6,12 +6,9 @@ import io.netty.channel.ChannelHandlerContext; import lombok.extern.slf4j.Slf4j; import org.springframework.context.ApplicationContext; -import java.lang.reflect.Field; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; /** * @author ljf @@ -23,28 +20,34 @@ import java.util.Map; @Slf4j public class ClientManage { - // 创建一个map容器,管理连接的客户端 - public static Map clientMap = new HashMap<>(); - // 创建一个map容器,管理创建的对象 - public static Map echoServerHandlerMap = new HashMap<>(); - // 创建一个容器,管理生成的报文 - public static Map> orderList = new HashMap<>(); + // 创建一个map容器,管理连接的客户端(使用 ConcurrentHashMap 保证线程安全) + public static Map clientMap = new ConcurrentHashMap<>(); + // 创建一个map容器,管理创建的对象(使用 ConcurrentHashMap 保证线程安全) + public static Map echoServerHandlerMap = new ConcurrentHashMap<>(); + // 创建一个容器,管理生成的报文(使用 ConcurrentHashMap 保证线程安全) + public static Map> orderList = new ConcurrentHashMap<>(); ApplicationContext applicationContext = SpringContextUtils.getApplicationContext(); Cache cache = (Cache) applicationContext.getBean("caffeineCache"); - private static ClientManage instance = null; + // 使用双重检查锁定实现线程安全的单例模式 + private static volatile ClientManage instance = null; // 创建一个私有的构造函数 private ClientManage(){} - // 创建 + /** + * 获取单例实例(线程安全,使用双重检查锁定) + */ public static ClientManage getInstance() { - if (null == instance) { - return new ClientManage(); - } else { - return instance; + if (instance == null) { + synchronized (ClientManage.class) { + if (instance == null) { + instance = new ClientManage(); + } + } } + return instance; } /** @@ -55,13 +58,13 @@ public class ClientManage { public void saveOrderMap(String DTUId, List list) { try { if (cache.getIfPresent(DTUId) != null) { - log.info("==> 已存在报文"); + log.info("指令已存在于缓存中, DTUId:{}", DTUId); } else { cache.put(DTUId, list); + log.info("新指令已保存到缓存, DTUId:{}, 指令数量:{}", DTUId, list != null ? list.size() : 0); } } catch (Exception e) { - // TODO Auto-generated catch block - e.printStackTrace(); + log.error("保存指令到缓存异常, DTUId:{}", DTUId, e); } } @@ -76,16 +79,15 @@ public class ClientManage { public void saveEchoServerHandlerMap(String DTUId, EchoServerHandler echoServerHandler) { try { - if (echoServerHandlerMap.get(DTUId) != null && echoServerHandlerMap.get(DTUId).equals(echoServerHandler)) { - // TODO 已存在 - log.info("==> 已存在报文"); + EchoServerHandler existing = echoServerHandlerMap.get(DTUId); + if (existing != null && existing.equals(echoServerHandler)) { + log.info("EchoServerHandler已存在, DTUId:{}", DTUId); } else { - //如果map中没有此ctx 将连接存入map中 echoServerHandlerMap.put(DTUId, echoServerHandler); + log.info("EchoServerHandler已保存, DTUId:{}, Handler:{}", DTUId, echoServerHandler.hashCode()); } } catch (Exception e) { - // TODO Auto-generated catch block - e.printStackTrace(); + log.error("保存EchoServerHandler异常, DTUId:{}", DTUId, e); } } @@ -102,16 +104,15 @@ public class ClientManage { public void saveChannelMap(String DTUId, ChannelHandlerContext ctx) { try { - if (ClientManage.clientMap.get(DTUId) != null && ClientManage.clientMap.get(DTUId).equals(ctx)) { - // TODO 已存在 - log.info("==> 已存在报文"); + ChannelHandlerContext existing = ClientManage.clientMap.get(DTUId); + if (existing != null && existing.equals(ctx)) { + log.info("Channel已存在, DTUId:{}", DTUId); } else { - //如果map中没有此ctx 将连接存入map中 ClientManage.clientMap.put(DTUId, ctx); + log.info("Channel已保存, DTUId:{}, Channel:{}", DTUId, ctx.channel().id().asShortText()); } } catch (Exception e) { - // TODO Auto-generated catch block - e.printStackTrace(); + log.error("保存Channel异常, DTUId:{}", DTUId, e); } } diff --git a/user-service/src/main/java/com/mh/user/netty/EchoServer.java b/user-service/src/main/java/com/mh/user/netty/EchoServer.java index ee589d1..f850b92 100644 --- a/user-service/src/main/java/com/mh/user/netty/EchoServer.java +++ b/user-service/src/main/java/com/mh/user/netty/EchoServer.java @@ -11,15 +11,26 @@ import lombok.extern.slf4j.Slf4j; public class EchoServer { private final int port; + // 保存 EventLoopGroup 引用,用于资源管理 + private NioEventLoopGroup bossGroup; + private NioEventLoopGroup workerGroup; + // 防止多次调用 start() + private volatile boolean isStarted = false; public EchoServer(int port) { this.port = port; } public void start() { + // 防止重复启动 + if (isStarted) { + log.warn("服务器已经在运行中,端口:{}, 无需重复启动", port); + return; + } + // 创建Event-LoopGroup - NioEventLoopGroup bossGroup = new NioEventLoopGroup(); // bossGroup 用于监听客户端连接,专门负责与客户端创建连接,并把连接注册到workerGroup的Selector中。 - NioEventLoopGroup workerGroup = new NioEventLoopGroup(); // workerGroup用于处理每一个连接发生的读写事件。 + bossGroup = new NioEventLoopGroup(); // bossGroup 用于监听客户端连接,专门负责与客户端创建连接,并把连接注册到workerGroup的Selector中。 + workerGroup = new NioEventLoopGroup(); // workerGroup用于处理每一个连接发生的读写事件。 try { ServerBootstrap serverBootstrap = new ServerBootstrap(); // 创建Server-Bootstrap @@ -33,22 +44,18 @@ public class EchoServer { ChannelFuture channelFuture = serverBootstrap.bind(); channelFuture.addListener(future -> { if (future.isSuccess()) { + isStarted = true; log.info("服务器启动成功,开始监听端口:{}", port); } else { log.error("服务器启动失败", future.cause()); + shutdown(); } }); // 添加JVM关闭钩子,确保优雅关闭 Runtime.getRuntime().addShutdownHook(new Thread(() -> { log.info("正在关闭服务器..."); - try { - bossGroup.shutdownGracefully(); - workerGroup.shutdownGracefully(); - log.info("服务器已关闭"); - } catch (Exception e) { - log.error("关闭服务器时发生错误", e); - } + shutdown(); })); // 不阻塞主线程,服务器在后台运行 @@ -57,10 +64,30 @@ public class EchoServer { } catch (Exception e) { log.error("服务器启动过程中发生异常", e); // 发生异常时关闭资源 + shutdown(); + } + // 注意:这里不再等待channel关闭,方法会立即返回 + } + + /** + * 关闭服务器,释放资源 + */ + public void shutdown() { + isStarted = false; + if (bossGroup != null) { bossGroup.shutdownGracefully(); + } + if (workerGroup != null) { workerGroup.shutdownGracefully(); } - // 注意:这里不再等待channel关闭,方法会立即返回 + log.info("服务器已关闭, 端口:{}", port); + } + + /** + * 判断服务器是否已启动 + */ + public boolean isStarted() { + return isStarted; } } diff --git a/user-service/src/main/java/com/mh/user/netty/EchoServerHandler.java b/user-service/src/main/java/com/mh/user/netty/EchoServerHandler.java index 9ee5bf0..7924017 100644 --- a/user-service/src/main/java/com/mh/user/netty/EchoServerHandler.java +++ b/user-service/src/main/java/com/mh/user/netty/EchoServerHandler.java @@ -24,10 +24,25 @@ public class EchoServerHandler extends ChannelInboundHandlerAdapter { List deviceCodeParamList; - // 调用service层的接口信息 - ApplicationContext context = SpringBeanUtil.getApplicationContext(); - DeviceCodeParamService deviceCodeParamService = context.getBean(DeviceCodeParamService.class); - GatewayManageService gatewayManageService = context.getBean(GatewayManageService.class); + // 调用service层的接口信息(懒加载,避免启动时Spring容器未就绪导致NPE) + private DeviceCodeParamService deviceCodeParamService; + private GatewayManageService gatewayManageService; + + private DeviceCodeParamService getDeviceCodeParamService() { + if (deviceCodeParamService == null) { + ApplicationContext context = SpringBeanUtil.getApplicationContext(); + deviceCodeParamService = context.getBean(DeviceCodeParamService.class); + } + return deviceCodeParamService; + } + + private GatewayManageService getGatewayManageService() { + if (gatewayManageService == null) { + ApplicationContext context = SpringBeanUtil.getApplicationContext(); + gatewayManageService = context.getBean(GatewayManageService.class); + } + return gatewayManageService; + } /** * 空闲次数 */ @@ -39,6 +54,8 @@ public class EchoServerHandler extends ChannelInboundHandlerAdapter { private String IP; private String port; private String receiveStr = ""; + // 限制最大接收字符串长度,防止内存溢出 + private static final int MAX_RECEIVE_STR_LENGTH = 4096; /** * 客户端连接会触发 @@ -59,6 +76,11 @@ public class EchoServerHandler extends ChannelInboundHandlerAdapter { IdleStateEvent event = (IdleStateEvent) obj; if (IdleState.READER_IDLE.equals(event.state())) { //如果读通道处于空闲状态,说明没有接收到心跳命令 log.info("第{}已经10秒没有接收到客户端的信息了", idleCount); + // 打印发送没有响应的日志 + if (deviceCodeParamList != null && deviceCodeParamList.size() > 0 && num < deviceCodeParamList.size()) { + String sendStr = deviceCodeParamList.get(num).getStrData(); + log.error("网关端口:{} 发送的指令无响应:{}", port, sendStr); + } receiveStr = ""; num = num + 1; if (num > size - 1) { @@ -95,10 +117,15 @@ public class EchoServerHandler extends ChannelInboundHandlerAdapter { receiveStr = receiveStr + ExchangeStringUtil.bytesToHexString(bytes);//将接收到的数据转为字符串,此字符串就是客户端发送的字符串 receiveStr = receiveStr.replace("null", ""); //去null receiveStr = receiveStr.replace(" ", ""); //去空格 + // 检查接收字符串长度,超过限制则清空,防止内存溢出 + if (receiveStr.length() > MAX_RECEIVE_STR_LENGTH) { + log.warn("接收到的数据超过最大长度限制,清空缓冲区, 当前长度:{}, 网关端口:{}", receiveStr.length(), port); + receiveStr = ""; + } //log.info("channelRead接收到的数据:" + receiveStr + ",length:" + receiveStr.length()); } } catch (Exception e) { - log.error("channelRead异常", e); + log.error("channelRead异常, 网关端口:{}", port, e); } finally { ReferenceCountUtil.release(msg); } @@ -119,12 +146,12 @@ public class EchoServerHandler extends ChannelInboundHandlerAdapter { // 清空receiveStr receiveStr = ""; // 更新对应的网关在线情况 - gatewayManageService.updateGatewayManage2(port); + getGatewayManageService().updateGatewayManage2(port); //根据端口或者IP或者心跳包查询网关对应的项目名称 - String projectName = gatewayManageService.selectProjectName(port); + String projectName = getGatewayManageService().selectProjectName(port); log.info("---------------------{}项目网关上线---------------------", projectName); // 生成采集指令 - deviceCodeParamList = deviceCodeParamService.queryCodeParam(port); //心跳包包含网关端口(自己定义返回心跳包) + deviceCodeParamList = getDeviceCodeParamService().queryCodeParam(port); //心跳包包含网关端口(自己定义返回心跳包) size = deviceCodeParamList.size(); log.info("deviceCodeParam size ===> {}", size); num = 0; @@ -205,11 +232,41 @@ public class EchoServerHandler extends ChannelInboundHandlerAdapter { AnalysisReceiveOrder485 analysisReceiveOrder485 = new AnalysisReceiveOrder485(); analysisReceiveOrder485.analysisReceiveOrder485(receiveStr, deviceCodeParamEntity); // 根据项目id更新网关在线 - gatewayManageService.updateGatewayManage2(deviceCodeParamEntity.getDataPort()); + getGatewayManageService().updateGatewayManage2(deviceCodeParamEntity.getDataPort()); }); } + /** + * 白色网关发送下一个采集指令(减少重复代码) + */ + private void whiteGatewaySendNextOrder(ChannelHandlerContext ctx, String snr, String ip) throws InterruptedException { + num = num + 1; + if (num > size - 1) { + num = 0; + Thread.sleep(200); + SendOrderUtils.sendAllOrder2(deviceCodeParamList.get(num), ctx, snr, ip, num, size); + log.info("------一轮采集完成,继续下一轮--------"); + } else { + if (Constant.WEB_FLAG) { + log.info("not send code and close collection!"); + num = 0; + receiveStr = ""; + ctx.close(); + } else { + Thread.sleep(200); + SendOrderUtils.sendAllOrder2(deviceCodeParamList.get(num), ctx, snr, ip, num, size); + } + } + } + private void whiteGateway(ChannelHandlerContext ctx) throws InterruptedException { + // 添加字符串长度检查,防止StringIndexOutOfBoundsException + if (receiveStr == null || receiveStr.length() < 8) { + log.warn("白色网关接收数据长度不足,跳过处理, 长度:{}, 网关端口:{}", receiveStr != null ? receiveStr.length() : 0, port); + receiveStr = ""; + return; + } + if (receiveStr.substring(0, 2).equalsIgnoreCase("2b") && receiveStr.substring(6, 8).equalsIgnoreCase("7b")) { receiveStr = receiveStr.substring(6); } @@ -243,9 +300,9 @@ public class EchoServerHandler extends ChannelInboundHandlerAdapter { Thread.sleep(200); ctx.channel().writeAndFlush(buffer); //发送数据 // 更新对应的网关在线情况 - gatewayManageService.updateGatewayManage2(port); + getGatewayManageService().updateGatewayManage2(port); // 生成采集指令 - deviceCodeParamList = deviceCodeParamService.queryCodeParam(port); //心跳包包含网关端口(自己定义返回心跳包) + deviceCodeParamList = getDeviceCodeParamService().queryCodeParam(port); //心跳包包含网关端口(自己定义返回心跳包) size = deviceCodeParamList.size(); log.info("白色网关接收长度===> " + size); num = 0; @@ -282,111 +339,30 @@ public class EchoServerHandler extends ChannelInboundHandlerAdapter { // 解析采集的报文,并保存到数据库 AnalysisReceiveOrder485 analysisReceiveOrder485 = new AnalysisReceiveOrder485(); analysisReceiveOrder485.analysisMeterOrder485(dataStr, deviceCodeParamList.get(num)); //电表报文解析 - // 清空dataStr - // 判断发送的下标,如果不等于指令数组大小 - num = num + 1; - if (num > size - 1) { - num = 0; - Thread.sleep(200); - // 继续发送下一个采集指令 - SendOrderUtils.sendAllOrder2(deviceCodeParamList.get(num), ctx, port, IP, num, size); - log.info("------一轮采集完成,继续下一轮--------"); - } else { - // 添加一个状态值,判断是否继续发送指令 update by ljf on 2020-08-07 - if (Constant.WEB_FLAG) { - num = 0; - // 关闭连接 - dataStr = null; - ctx.close(); - } else { - Thread.sleep(200); - // 继续发送下一个采集指令 - SendOrderUtils.sendAllOrder2(deviceCodeParamList.get(num), ctx, port, IP, num, size); - } - } + dataStr = ""; + // 使用提取的方法发送下一个指令 + whiteGatewaySendNextOrder(ctx, port, IP); } else if (dataStr.length() == 12 || dataStr.length() == 14) { log.info("白色网关冷水机接收===>" + dataStr); // 解析采集的报文,并保存到数据库 AnalysisReceiveOrder485 analysisReceiveOrder485 = new AnalysisReceiveOrder485(); analysisReceiveOrder485.analysisChillerOrder485(dataStr, deviceCodeParamList.get(0)); - // 清空dataStr dataStr = ""; - // 判断发送的下标,如果不等于指令数组大小 - num = num + 1; - if (num > size - 1) { - num = 0; - Thread.sleep(200); - // 继续发送下一个采集指令 - SendOrderUtils.sendAllOrder2(deviceCodeParamList.get(num), ctx, port, IP, num, size); - log.info("------一轮采集完成,继续下一轮--------"); - } else { - // 添加一个状态值,判断是否继续发送指令 update by ljf on 2020-08-07 - if (Constant.WEB_FLAG) { - log.info("not send code and close collection!"); - num = 0; - // 关闭连接 - dataStr = null; - ctx.close(); - } else { - Thread.sleep(200); - // 继续发送下一个采集指令 - SendOrderUtils.sendAllOrder2(deviceCodeParamList.get(num), ctx, port, IP, num, size); - } - } + // 使用提取的方法发送下一个指令 + whiteGatewaySendNextOrder(ctx, port, IP); } else if (dataStr.length() == 18) { -// log.info("white gateway cloud receive message ===> " + dataStr); log.info("白色网关冷量计接收===> " + dataStr); // 解析采集的报文,并保存到数据库 AnalysisReceiveOrder485 analysisReceiveOrder485 = new AnalysisReceiveOrder485(); //冷量机报文解析 analysisReceiveOrder485.analysisCloudOrder485(dataStr, deviceCodeParamList.get(num)); - // 清空dataStr dataStr = ""; - // 判断发送的下标,如果不等于指令数组大小 - num = num + 1; - if (num > size - 1) { - num = 0; - Thread.sleep(200); - // 继续发送下一个采集指令 - SendOrderUtils.sendAllOrder2(deviceCodeParamList.get(num), ctx, port, IP, num, size); - log.info("------一轮采集完成,继续下一轮--------"); - } else { - // 添加一个状态值,判断是否继续发送指令 update by ljf on 2020-08-07 - if (Constant.WEB_FLAG) { - log.info("not send code and close collection!"); - num = 0; - // 关闭连接 - dataStr = null; - ctx.close(); - } else { - Thread.sleep(200); - // 继续发送下一个采集指令 - SendOrderUtils.sendAllOrder2(deviceCodeParamList.get(num), ctx, port, IP, num, size); - } - } + // 使用提取的方法发送下一个指令 + whiteGatewaySendNextOrder(ctx, port, IP); } else { //if(dataStr.length() > 50) // 清空dataStr - dataStr = null; - // 判断发送的下标,如果不等于指令数组大小 - num = num + 1; - if (num > size - 1) { - num = 0; - Thread.sleep(200); - // 继续发送下一个采集指令 - SendOrderUtils.sendAllOrder2(deviceCodeParamList.get(num), ctx, port, IP, num, size); - log.info("------一轮采集完成,继续下一轮--------"); - } else { - // 添加一个状态值,判断是否继续发送指令 update by ljf on 2020-08-07 - if (Constant.WEB_FLAG) { - num = 0; - // 关闭连接 - dataStr = null; - ctx.close(); - } else { - Thread.sleep(200); - // 继续发送下一个采集指令 - SendOrderUtils.sendAllOrder2(deviceCodeParamList.get(num), ctx, port, IP, num, size); - } - } + dataStr = ""; + // 使用提取的方法发送下一个指令 + whiteGatewaySendNextOrder(ctx, port, IP); } } } @@ -394,8 +370,7 @@ public class EchoServerHandler extends ChannelInboundHandlerAdapter { // 异常捕捉 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - cause.getCause().printStackTrace(); - log.info("异常捕捉,执行ctx.close" + cause.getCause()); + log.error("异常捕捉,执行ctx.close, 网关端口:{}", port, cause); ctx.close(); // 关闭该Channel }