Browse Source

1、netty优化

dev
3067418132@qq.com 2 weeks ago
parent
commit
cdad2b721d
  1. 63
      user-service/src/main/java/com/mh/user/netty/ClientManage.java
  2. 47
      user-service/src/main/java/com/mh/user/netty/EchoServer.java
  3. 181
      user-service/src/main/java/com/mh/user/netty/EchoServerHandler.java

63
user-service/src/main/java/com/mh/user/netty/ClientManage.java

@ -6,12 +6,9 @@ import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationContext;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author ljf
@ -23,29 +20,35 @@ import java.util.Map;
@Slf4j
public class ClientManage {
// 创建一个map容器,管理连接的客户端
public static Map<String, ChannelHandlerContext> clientMap = new HashMap<>();
// 创建一个map容器,管理创建的对象
public static Map<String, EchoServerHandler> echoServerHandlerMap = new HashMap<>();
// 创建一个容器,管理生成的报文
public static Map<String, List<String>> orderList = new HashMap<>();
// 创建一个map容器,管理连接的客户端(使用 ConcurrentHashMap 保证线程安全)
public static Map<String, ChannelHandlerContext> clientMap = new ConcurrentHashMap<>();
// 创建一个map容器,管理创建的对象(使用 ConcurrentHashMap 保证线程安全)
public static Map<String, EchoServerHandler> echoServerHandlerMap = new ConcurrentHashMap<>();
// 创建一个容器,管理生成的报文(使用 ConcurrentHashMap 保证线程安全)
public static Map<String, List<String>> orderList = new ConcurrentHashMap<>();
ApplicationContext applicationContext = SpringContextUtils.getApplicationContext();
Cache<String, Object> cache = (Cache<String, Object>) applicationContext.getBean("caffeineCache");
private static ClientManage instance = null;
// 使用双重检查锁定实现线程安全的单例模式
private static volatile ClientManage instance = null;
// 创建一个私有的构造函数
private ClientManage(){}
// 创建
/**
* 获取单例实例线程安全使用双重检查锁定
*/
public static ClientManage getInstance() {
if (null == instance) {
return new ClientManage();
} else {
return instance;
if (instance == null) {
synchronized (ClientManage.class) {
if (instance == null) {
instance = new ClientManage();
}
}
}
return instance;
}
/**
* 保存指令
@ -55,13 +58,13 @@ public class ClientManage {
public void saveOrderMap(String DTUId, List<String> list) {
try {
if (cache.getIfPresent(DTUId) != null) {
log.info("==> 已存在报文");
log.info("指令已存在于缓存中, DTUId:{}", DTUId);
} else {
cache.put(DTUId, list);
log.info("新指令已保存到缓存, DTUId:{}, 指令数量:{}", DTUId, list != null ? list.size() : 0);
}
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
log.error("保存指令到缓存异常, DTUId:{}", DTUId, e);
}
}
@ -76,16 +79,15 @@ public class ClientManage {
public void saveEchoServerHandlerMap(String DTUId, EchoServerHandler echoServerHandler) {
try {
if (echoServerHandlerMap.get(DTUId) != null && echoServerHandlerMap.get(DTUId).equals(echoServerHandler)) {
// TODO 已存在
log.info("==> 已存在报文");
EchoServerHandler existing = echoServerHandlerMap.get(DTUId);
if (existing != null && existing.equals(echoServerHandler)) {
log.info("EchoServerHandler已存在, DTUId:{}", DTUId);
} else {
//如果map中没有此ctx 将连接存入map中
echoServerHandlerMap.put(DTUId, echoServerHandler);
log.info("EchoServerHandler已保存, DTUId:{}, Handler:{}", DTUId, echoServerHandler.hashCode());
}
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
log.error("保存EchoServerHandler异常, DTUId:{}", DTUId, e);
}
}
@ -102,16 +104,15 @@ public class ClientManage {
public void saveChannelMap(String DTUId, ChannelHandlerContext ctx) {
try {
if (ClientManage.clientMap.get(DTUId) != null && ClientManage.clientMap.get(DTUId).equals(ctx)) {
// TODO 已存在
log.info("==> 已存在报文");
ChannelHandlerContext existing = ClientManage.clientMap.get(DTUId);
if (existing != null && existing.equals(ctx)) {
log.info("Channel已存在, DTUId:{}", DTUId);
} else {
//如果map中没有此ctx 将连接存入map中
ClientManage.clientMap.put(DTUId, ctx);
log.info("Channel已保存, DTUId:{}, Channel:{}", DTUId, ctx.channel().id().asShortText());
}
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
log.error("保存Channel异常, DTUId:{}", DTUId, e);
}
}

47
user-service/src/main/java/com/mh/user/netty/EchoServer.java

@ -11,15 +11,26 @@ import lombok.extern.slf4j.Slf4j;
public class EchoServer {
private final int port;
// 保存 EventLoopGroup 引用,用于资源管理
private NioEventLoopGroup bossGroup;
private NioEventLoopGroup workerGroup;
// 防止多次调用 start()
private volatile boolean isStarted = false;
public EchoServer(int port) {
this.port = port;
}
public void start() {
// 防止重复启动
if (isStarted) {
log.warn("服务器已经在运行中,端口:{}, 无需重复启动", port);
return;
}
// 创建Event-LoopGroup
NioEventLoopGroup bossGroup = new NioEventLoopGroup(); // bossGroup 用于监听客户端连接,专门负责与客户端创建连接,并把连接注册到workerGroup的Selector中。
NioEventLoopGroup workerGroup = new NioEventLoopGroup(); // workerGroup用于处理每一个连接发生的读写事件。
bossGroup = new NioEventLoopGroup(); // bossGroup 用于监听客户端连接,专门负责与客户端创建连接,并把连接注册到workerGroup的Selector中。
workerGroup = new NioEventLoopGroup(); // workerGroup用于处理每一个连接发生的读写事件。
try {
ServerBootstrap serverBootstrap = new ServerBootstrap(); // 创建Server-Bootstrap
@ -33,22 +44,18 @@ public class EchoServer {
ChannelFuture channelFuture = serverBootstrap.bind();
channelFuture.addListener(future -> {
if (future.isSuccess()) {
isStarted = true;
log.info("服务器启动成功,开始监听端口:{}", port);
} else {
log.error("服务器启动失败", future.cause());
shutdown();
}
});
// 添加JVM关闭钩子,确保优雅关闭
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
log.info("正在关闭服务器...");
try {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
log.info("服务器已关闭");
} catch (Exception e) {
log.error("关闭服务器时发生错误", e);
}
shutdown();
}));
// 不阻塞主线程,服务器在后台运行
@ -57,10 +64,30 @@ public class EchoServer {
} catch (Exception e) {
log.error("服务器启动过程中发生异常", e);
// 发生异常时关闭资源
shutdown();
}
// 注意:这里不再等待channel关闭,方法会立即返回
}
/**
* 关闭服务器释放资源
*/
public void shutdown() {
isStarted = false;
if (bossGroup != null) {
bossGroup.shutdownGracefully();
}
if (workerGroup != null) {
workerGroup.shutdownGracefully();
}
// 注意:这里不再等待channel关闭,方法会立即返回
log.info("服务器已关闭, 端口:{}", port);
}
/**
* 判断服务器是否已启动
*/
public boolean isStarted() {
return isStarted;
}
}

181
user-service/src/main/java/com/mh/user/netty/EchoServerHandler.java

@ -24,10 +24,25 @@ public class EchoServerHandler extends ChannelInboundHandlerAdapter {
List<DeviceCodeParamEntity> deviceCodeParamList;
// 调用service层的接口信息
// 调用service层的接口信息(懒加载,避免启动时Spring容器未就绪导致NPE)
private DeviceCodeParamService deviceCodeParamService;
private GatewayManageService gatewayManageService;
private DeviceCodeParamService getDeviceCodeParamService() {
if (deviceCodeParamService == null) {
ApplicationContext context = SpringBeanUtil.getApplicationContext();
deviceCodeParamService = context.getBean(DeviceCodeParamService.class);
}
return deviceCodeParamService;
}
private GatewayManageService getGatewayManageService() {
if (gatewayManageService == null) {
ApplicationContext context = SpringBeanUtil.getApplicationContext();
DeviceCodeParamService deviceCodeParamService = context.getBean(DeviceCodeParamService.class);
GatewayManageService gatewayManageService = context.getBean(GatewayManageService.class);
gatewayManageService = context.getBean(GatewayManageService.class);
}
return gatewayManageService;
}
/**
* 空闲次数
*/
@ -39,6 +54,8 @@ public class EchoServerHandler extends ChannelInboundHandlerAdapter {
private String IP;
private String port;
private String receiveStr = "";
// 限制最大接收字符串长度,防止内存溢出
private static final int MAX_RECEIVE_STR_LENGTH = 4096;
/**
* 客户端连接会触发
@ -59,6 +76,11 @@ public class EchoServerHandler extends ChannelInboundHandlerAdapter {
IdleStateEvent event = (IdleStateEvent) obj;
if (IdleState.READER_IDLE.equals(event.state())) { //如果读通道处于空闲状态,说明没有接收到心跳命令
log.info("第{}已经10秒没有接收到客户端的信息了", idleCount);
// 打印发送没有响应的日志
if (deviceCodeParamList != null && deviceCodeParamList.size() > 0 && num < deviceCodeParamList.size()) {
String sendStr = deviceCodeParamList.get(num).getStrData();
log.error("网关端口:{} 发送的指令无响应:{}", port, sendStr);
}
receiveStr = "";
num = num + 1;
if (num > size - 1) {
@ -95,10 +117,15 @@ public class EchoServerHandler extends ChannelInboundHandlerAdapter {
receiveStr = receiveStr + ExchangeStringUtil.bytesToHexString(bytes);//将接收到的数据转为字符串,此字符串就是客户端发送的字符串
receiveStr = receiveStr.replace("null", ""); //去null
receiveStr = receiveStr.replace(" ", ""); //去空格
// 检查接收字符串长度,超过限制则清空,防止内存溢出
if (receiveStr.length() > MAX_RECEIVE_STR_LENGTH) {
log.warn("接收到的数据超过最大长度限制,清空缓冲区, 当前长度:{}, 网关端口:{}", receiveStr.length(), port);
receiveStr = "";
}
//log.info("channelRead接收到的数据:" + receiveStr + ",length:" + receiveStr.length());
}
} catch (Exception e) {
log.error("channelRead异常", e);
log.error("channelRead异常, 网关端口:{}", port, e);
} finally {
ReferenceCountUtil.release(msg);
}
@ -119,12 +146,12 @@ public class EchoServerHandler extends ChannelInboundHandlerAdapter {
// 清空receiveStr
receiveStr = "";
// 更新对应的网关在线情况
gatewayManageService.updateGatewayManage2(port);
getGatewayManageService().updateGatewayManage2(port);
//根据端口或者IP或者心跳包查询网关对应的项目名称
String projectName = gatewayManageService.selectProjectName(port);
String projectName = getGatewayManageService().selectProjectName(port);
log.info("---------------------{}项目网关上线---------------------", projectName);
// 生成采集指令
deviceCodeParamList = deviceCodeParamService.queryCodeParam(port); //心跳包包含网关端口(自己定义返回心跳包)
deviceCodeParamList = getDeviceCodeParamService().queryCodeParam(port); //心跳包包含网关端口(自己定义返回心跳包)
size = deviceCodeParamList.size();
log.info("deviceCodeParam size ===> {}", size);
num = 0;
@ -205,11 +232,41 @@ public class EchoServerHandler extends ChannelInboundHandlerAdapter {
AnalysisReceiveOrder485 analysisReceiveOrder485 = new AnalysisReceiveOrder485();
analysisReceiveOrder485.analysisReceiveOrder485(receiveStr, deviceCodeParamEntity);
// 根据项目id更新网关在线
gatewayManageService.updateGatewayManage2(deviceCodeParamEntity.getDataPort());
getGatewayManageService().updateGatewayManage2(deviceCodeParamEntity.getDataPort());
});
}
/**
* 白色网关发送下一个采集指令减少重复代码
*/
private void whiteGatewaySendNextOrder(ChannelHandlerContext ctx, String snr, String ip) throws InterruptedException {
num = num + 1;
if (num > size - 1) {
num = 0;
Thread.sleep(200);
SendOrderUtils.sendAllOrder2(deviceCodeParamList.get(num), ctx, snr, ip, num, size);
log.info("------一轮采集完成,继续下一轮--------");
} else {
if (Constant.WEB_FLAG) {
log.info("not send code and close collection!");
num = 0;
receiveStr = "";
ctx.close();
} else {
Thread.sleep(200);
SendOrderUtils.sendAllOrder2(deviceCodeParamList.get(num), ctx, snr, ip, num, size);
}
}
}
private void whiteGateway(ChannelHandlerContext ctx) throws InterruptedException {
// 添加字符串长度检查,防止StringIndexOutOfBoundsException
if (receiveStr == null || receiveStr.length() < 8) {
log.warn("白色网关接收数据长度不足,跳过处理, 长度:{}, 网关端口:{}", receiveStr != null ? receiveStr.length() : 0, port);
receiveStr = "";
return;
}
if (receiveStr.substring(0, 2).equalsIgnoreCase("2b") && receiveStr.substring(6, 8).equalsIgnoreCase("7b")) {
receiveStr = receiveStr.substring(6);
}
@ -243,9 +300,9 @@ public class EchoServerHandler extends ChannelInboundHandlerAdapter {
Thread.sleep(200);
ctx.channel().writeAndFlush(buffer); //发送数据
// 更新对应的网关在线情况
gatewayManageService.updateGatewayManage2(port);
getGatewayManageService().updateGatewayManage2(port);
// 生成采集指令
deviceCodeParamList = deviceCodeParamService.queryCodeParam(port); //心跳包包含网关端口(自己定义返回心跳包)
deviceCodeParamList = getDeviceCodeParamService().queryCodeParam(port); //心跳包包含网关端口(自己定义返回心跳包)
size = deviceCodeParamList.size();
log.info("白色网关接收长度===> " + size);
num = 0;
@ -282,111 +339,30 @@ public class EchoServerHandler extends ChannelInboundHandlerAdapter {
// 解析采集的报文,并保存到数据库
AnalysisReceiveOrder485 analysisReceiveOrder485 = new AnalysisReceiveOrder485();
analysisReceiveOrder485.analysisMeterOrder485(dataStr, deviceCodeParamList.get(num)); //电表报文解析
// 清空dataStr
// 判断发送的下标,如果不等于指令数组大小
num = num + 1;
if (num > size - 1) {
num = 0;
Thread.sleep(200);
// 继续发送下一个采集指令
SendOrderUtils.sendAllOrder2(deviceCodeParamList.get(num), ctx, port, IP, num, size);
log.info("------一轮采集完成,继续下一轮--------");
} else {
// 添加一个状态值,判断是否继续发送指令 update by ljf on 2020-08-07
if (Constant.WEB_FLAG) {
num = 0;
// 关闭连接
dataStr = null;
ctx.close();
} else {
Thread.sleep(200);
// 继续发送下一个采集指令
SendOrderUtils.sendAllOrder2(deviceCodeParamList.get(num), ctx, port, IP, num, size);
}
}
dataStr = "";
// 使用提取的方法发送下一个指令
whiteGatewaySendNextOrder(ctx, port, IP);
} else if (dataStr.length() == 12 || dataStr.length() == 14) {
log.info("白色网关冷水机接收===>" + dataStr);
// 解析采集的报文,并保存到数据库
AnalysisReceiveOrder485 analysisReceiveOrder485 = new AnalysisReceiveOrder485();
analysisReceiveOrder485.analysisChillerOrder485(dataStr, deviceCodeParamList.get(0));
// 清空dataStr
dataStr = "";
// 判断发送的下标,如果不等于指令数组大小
num = num + 1;
if (num > size - 1) {
num = 0;
Thread.sleep(200);
// 继续发送下一个采集指令
SendOrderUtils.sendAllOrder2(deviceCodeParamList.get(num), ctx, port, IP, num, size);
log.info("------一轮采集完成,继续下一轮--------");
} else {
// 添加一个状态值,判断是否继续发送指令 update by ljf on 2020-08-07
if (Constant.WEB_FLAG) {
log.info("not send code and close collection!");
num = 0;
// 关闭连接
dataStr = null;
ctx.close();
} else {
Thread.sleep(200);
// 继续发送下一个采集指令
SendOrderUtils.sendAllOrder2(deviceCodeParamList.get(num), ctx, port, IP, num, size);
}
}
// 使用提取的方法发送下一个指令
whiteGatewaySendNextOrder(ctx, port, IP);
} else if (dataStr.length() == 18) {
// log.info("white gateway cloud receive message ===> " + dataStr);
log.info("白色网关冷量计接收===> " + dataStr);
// 解析采集的报文,并保存到数据库
AnalysisReceiveOrder485 analysisReceiveOrder485 = new AnalysisReceiveOrder485(); //冷量机报文解析
analysisReceiveOrder485.analysisCloudOrder485(dataStr, deviceCodeParamList.get(num));
// 清空dataStr
dataStr = "";
// 判断发送的下标,如果不等于指令数组大小
num = num + 1;
if (num > size - 1) {
num = 0;
Thread.sleep(200);
// 继续发送下一个采集指令
SendOrderUtils.sendAllOrder2(deviceCodeParamList.get(num), ctx, port, IP, num, size);
log.info("------一轮采集完成,继续下一轮--------");
} else {
// 添加一个状态值,判断是否继续发送指令 update by ljf on 2020-08-07
if (Constant.WEB_FLAG) {
log.info("not send code and close collection!");
num = 0;
// 关闭连接
dataStr = null;
ctx.close();
} else {
Thread.sleep(200);
// 继续发送下一个采集指令
SendOrderUtils.sendAllOrder2(deviceCodeParamList.get(num), ctx, port, IP, num, size);
}
}
// 使用提取的方法发送下一个指令
whiteGatewaySendNextOrder(ctx, port, IP);
} else { //if(dataStr.length() > 50)
// 清空dataStr
dataStr = null;
// 判断发送的下标,如果不等于指令数组大小
num = num + 1;
if (num > size - 1) {
num = 0;
Thread.sleep(200);
// 继续发送下一个采集指令
SendOrderUtils.sendAllOrder2(deviceCodeParamList.get(num), ctx, port, IP, num, size);
log.info("------一轮采集完成,继续下一轮--------");
} else {
// 添加一个状态值,判断是否继续发送指令 update by ljf on 2020-08-07
if (Constant.WEB_FLAG) {
num = 0;
// 关闭连接
dataStr = null;
ctx.close();
} else {
Thread.sleep(200);
// 继续发送下一个采集指令
SendOrderUtils.sendAllOrder2(deviceCodeParamList.get(num), ctx, port, IP, num, size);
}
}
dataStr = "";
// 使用提取的方法发送下一个指令
whiteGatewaySendNextOrder(ctx, port, IP);
}
}
}
@ -394,8 +370,7 @@ public class EchoServerHandler extends ChannelInboundHandlerAdapter {
// 异常捕捉
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.getCause().printStackTrace();
log.info("异常捕捉,执行ctx.close" + cause.getCause());
log.error("异常捕捉,执行ctx.close, 网关端口:{}", port, cause);
ctx.close(); // 关闭该Channel
}

Loading…
Cancel
Save