|
|
|
@ -59,7 +59,7 @@ public class EchoServerHandler extends ChannelInboundHandlerAdapter { |
|
|
|
*/ |
|
|
|
*/ |
|
|
|
@Override |
|
|
|
@Override |
|
|
|
public void channelActive(ChannelHandlerContext ctx) throws Exception { |
|
|
|
public void channelActive(ChannelHandlerContext ctx) throws Exception { |
|
|
|
log.info("Channel active......"); |
|
|
|
// log.info("Channel active......");
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
/** |
|
|
|
@ -72,7 +72,7 @@ public class EchoServerHandler extends ChannelInboundHandlerAdapter { |
|
|
|
if (obj instanceof IdleStateEvent) { |
|
|
|
if (obj instanceof IdleStateEvent) { |
|
|
|
IdleStateEvent event = (IdleStateEvent) obj; |
|
|
|
IdleStateEvent event = (IdleStateEvent) obj; |
|
|
|
if (IdleState.READER_IDLE.equals(event.state())) { //如果读通道处于空闲状态,说明没有接收到心跳命令
|
|
|
|
if (IdleState.READER_IDLE.equals(event.state())) { //如果读通道处于空闲状态,说明没有接收到心跳命令
|
|
|
|
log.info("第{}已经40秒没有接收到客户端的信息了", idleCount); |
|
|
|
// log.info("第{}已经40秒没有接收到客户端的信息了", idleCount);
|
|
|
|
receiveStr = ""; |
|
|
|
receiveStr = ""; |
|
|
|
num = num + 1; |
|
|
|
num = num + 1; |
|
|
|
if (num > size - 1) { |
|
|
|
if (num > size - 1) { |
|
|
|
@ -109,7 +109,7 @@ public class EchoServerHandler extends ChannelInboundHandlerAdapter { |
|
|
|
receiveStr = receiveStr + ExchangeStringUtil.bytesToHexString(bytes);//将接收到的数据转为字符串,此字符串就是客户端发送的字符串
|
|
|
|
receiveStr = receiveStr + ExchangeStringUtil.bytesToHexString(bytes);//将接收到的数据转为字符串,此字符串就是客户端发送的字符串
|
|
|
|
receiveStr = receiveStr.replace("null", ""); //去null
|
|
|
|
receiveStr = receiveStr.replace("null", ""); //去null
|
|
|
|
receiveStr = receiveStr.replace(" ", ""); //去空格
|
|
|
|
receiveStr = receiveStr.replace(" ", ""); //去空格
|
|
|
|
//log.info("channelRead接收到的数据:" + receiveStr + ",length:" + receiveStr.length());
|
|
|
|
//// log.info("channelRead接收到的数据:" + receiveStr + ",length:" + receiveStr.length());
|
|
|
|
} |
|
|
|
} |
|
|
|
} catch (Exception e) { |
|
|
|
} catch (Exception e) { |
|
|
|
log.error("channelRead异常", e); |
|
|
|
log.error("channelRead异常", e); |
|
|
|
@ -123,11 +123,11 @@ public class EchoServerHandler extends ChannelInboundHandlerAdapter { |
|
|
|
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { |
|
|
|
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { |
|
|
|
//心跳包报文: 24 00 60 95
|
|
|
|
//心跳包报文: 24 00 60 95
|
|
|
|
receiveStr = receiveStr.toUpperCase();//返回值全部变成大写
|
|
|
|
receiveStr = receiveStr.toUpperCase();//返回值全部变成大写
|
|
|
|
log.info("channelReadComplete接收到的数据{}, 长度: ===> {}", receiveStr, receiveStr.length()); |
|
|
|
// log.info("channelReadComplete接收到的数据{}, 长度: ===> {}", receiveStr, receiveStr.length());
|
|
|
|
//心跳包处理
|
|
|
|
//心跳包处理
|
|
|
|
if ((receiveStr.length() == 8) && receiveStr.startsWith("24")) { |
|
|
|
if ((receiveStr.length() == 8) && receiveStr.startsWith("24")) { |
|
|
|
// if ((receiveStr.length() == 8) && receiveStr.startsWith("C0A801FE")) {
|
|
|
|
// if ((receiveStr.length() == 8) && receiveStr.startsWith("C0A801FE")) {
|
|
|
|
log.info("接收到心跳包 ===> {}", receiveStr); |
|
|
|
// log.info("接收到心跳包 ===> {}", receiveStr);
|
|
|
|
// 开始进行会话保存
|
|
|
|
// 开始进行会话保存
|
|
|
|
dealSession(ctx); |
|
|
|
dealSession(ctx); |
|
|
|
idleCount = 1; |
|
|
|
idleCount = 1; |
|
|
|
@ -144,7 +144,7 @@ public class EchoServerHandler extends ChannelInboundHandlerAdapter { |
|
|
|
deviceCodeParamList = arrayCache.toList(CollectionParamsManage.class); |
|
|
|
deviceCodeParamList = arrayCache.toList(CollectionParamsManage.class); |
|
|
|
} |
|
|
|
} |
|
|
|
size = deviceCodeParamList.size(); |
|
|
|
size = deviceCodeParamList.size(); |
|
|
|
// log.info("deviceCodeParam size ===> {}", size);
|
|
|
|
// // log.info("deviceCodeParam size ===> {}", size);
|
|
|
|
// 清空receiveStr
|
|
|
|
// 清空receiveStr
|
|
|
|
receiveStr = ""; |
|
|
|
receiveStr = ""; |
|
|
|
num = 0; |
|
|
|
num = 0; |
|
|
|
@ -158,17 +158,17 @@ public class EchoServerHandler extends ChannelInboundHandlerAdapter { |
|
|
|
ctx.channel().close(); |
|
|
|
ctx.channel().close(); |
|
|
|
} |
|
|
|
} |
|
|
|
} else { |
|
|
|
} else { |
|
|
|
log.info("gateway not find deviceCodeParam!"); |
|
|
|
// log.info("gateway not find deviceCodeParam!");
|
|
|
|
} |
|
|
|
} |
|
|
|
} else if (receiveStr.length() == 18) { |
|
|
|
} else if (receiveStr.length() == 18) { |
|
|
|
// 水电表返回数据解析
|
|
|
|
// 水电表返回数据解析
|
|
|
|
idleCount = 1; |
|
|
|
idleCount = 1; |
|
|
|
log.info("水电表、热泵设置接收==>{},长度:{}", receiveStr, receiveStr.length()); |
|
|
|
// log.info("水电表、热泵设置接收==>{},长度:{}", receiveStr, receiveStr.length());
|
|
|
|
nextSendOrder(ctx); |
|
|
|
nextSendOrder(ctx); |
|
|
|
} else if (receiveStr.length() == 12 || receiveStr.length() == 14) { |
|
|
|
} else if (receiveStr.length() == 12 || receiveStr.length() == 14) { |
|
|
|
// 热泵返回数据解析
|
|
|
|
// 热泵返回数据解析
|
|
|
|
idleCount = 1; |
|
|
|
idleCount = 1; |
|
|
|
log.info("热泵读取接收===>{},长度:{},是否存在order_send_read: {}", receiveStr, receiveStr.length(), redisCache.hasKey("order_send_read")); |
|
|
|
// log.info("热泵读取接收===>{},长度:{},是否存在order_send_read: {}", receiveStr, receiveStr.length(), redisCache.hasKey("order_send_read"));
|
|
|
|
if (redisCache.hasKey("order_send_read")) { |
|
|
|
if (redisCache.hasKey("order_send_read")) { |
|
|
|
log.error("order_send_read存在,接收到指令是{}", receiveStr); |
|
|
|
log.error("order_send_read存在,接收到指令是{}", receiveStr); |
|
|
|
if (redisCache.hasKey("order_send_register")) { |
|
|
|
if (redisCache.hasKey("order_send_register")) { |
|
|
|
@ -284,12 +284,12 @@ public class EchoServerHandler extends ChannelInboundHandlerAdapter { |
|
|
|
@Override |
|
|
|
@Override |
|
|
|
public void onBack(Boolean result) { |
|
|
|
public void onBack(Boolean result) { |
|
|
|
if(result) { |
|
|
|
if(result) { |
|
|
|
log.info("设备保存会话: 设备号 = " + session.getSessionId()); |
|
|
|
// log.info("设备保存会话: 设备号 = " + session.getSessionId());
|
|
|
|
//ctx.pipeline().remove(LoginRequestHandler.class); //压测需要放开
|
|
|
|
//ctx.pipeline().remove(LoginRequestHandler.class); //压测需要放开
|
|
|
|
} else { |
|
|
|
} else { |
|
|
|
log.info("设备刷新会话: 设备号 = " + session.getSessionId()); |
|
|
|
// log.info("设备刷新会话: 设备号 = " + session.getSessionId());
|
|
|
|
SessionMap.inst().updateSession(finalDeviceCode ,session, meterNum); |
|
|
|
SessionMap.inst().updateSession(finalDeviceCode ,session, meterNum); |
|
|
|
//log.info("设备登录失败: 设备号 = " + session.getSessionId());
|
|
|
|
//// log.info("设备登录失败: 设备号 = " + session.getSessionId());
|
|
|
|
//ServerSession.closeSession(ctx);
|
|
|
|
//ServerSession.closeSession(ctx);
|
|
|
|
// 假如说已经在会话中了,直接断开连接
|
|
|
|
// 假如说已经在会话中了,直接断开连接
|
|
|
|
//ctx.close();
|
|
|
|
//ctx.close();
|
|
|
|
@ -298,7 +298,7 @@ public class EchoServerHandler extends ChannelInboundHandlerAdapter { |
|
|
|
//有异常的话,我们进行处理
|
|
|
|
//有异常的话,我们进行处理
|
|
|
|
@Override |
|
|
|
@Override |
|
|
|
public void onException(Throwable t) { |
|
|
|
public void onException(Throwable t) { |
|
|
|
log.info("设备登录异常: 设备号 = " + session.getSessionId()); |
|
|
|
// log.info("设备登录异常: 设备号 = " + session.getSessionId());
|
|
|
|
ServerSession.closeSession(ctx); |
|
|
|
ServerSession.closeSession(ctx); |
|
|
|
} |
|
|
|
} |
|
|
|
}); |
|
|
|
}); |
|
|
|
@ -324,7 +324,7 @@ public class EchoServerHandler extends ChannelInboundHandlerAdapter { |
|
|
|
Thread.sleep(1000); |
|
|
|
Thread.sleep(1000); |
|
|
|
// 继续发送下一个采集指令
|
|
|
|
// 继续发送下一个采集指令
|
|
|
|
SendOrderUtils.sendAllOrder(deviceCodeParamList.get(num), ctx, num, size); |
|
|
|
SendOrderUtils.sendAllOrder(deviceCodeParamList.get(num), ctx, num, size); |
|
|
|
log.info("------一轮采集完成,继续下一轮--------"); |
|
|
|
// log.info("------一轮采集完成,继续下一轮--------");
|
|
|
|
} else { |
|
|
|
} else { |
|
|
|
// 添加一个状态值,判断是否继续发送指令 update by ljf on 2020-08-07
|
|
|
|
// 添加一个状态值,判断是否继续发送指令 update by ljf on 2020-08-07
|
|
|
|
if (Constants.WEB_FLAG) { |
|
|
|
if (Constants.WEB_FLAG) { |
|
|
|
@ -369,12 +369,12 @@ public class EchoServerHandler extends ChannelInboundHandlerAdapter { |
|
|
|
// 液位计
|
|
|
|
// 液位计
|
|
|
|
analysisData = analysisReceiveOrder485.analysisLiquidOrder485(receiveStr, deviceCodeParamEntity); |
|
|
|
analysisData = analysisReceiveOrder485.analysisLiquidOrder485(receiveStr, deviceCodeParamEntity); |
|
|
|
default -> { |
|
|
|
default -> { |
|
|
|
log.info("设备类型错误"); |
|
|
|
// log.info("设备类型错误");
|
|
|
|
return; |
|
|
|
return; |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
if (analysisData.isEmpty()) { |
|
|
|
if (analysisData.isEmpty()) { |
|
|
|
log.info("解析数据为空"); |
|
|
|
// log.info("解析数据为空");
|
|
|
|
return; |
|
|
|
return; |
|
|
|
} |
|
|
|
} |
|
|
|
// 格式化数据,配置成研华网关 AdvantechReceiver
|
|
|
|
// 格式化数据,配置成研华网关 AdvantechReceiver
|
|
|
|
@ -387,6 +387,7 @@ public class EchoServerHandler extends ChannelInboundHandlerAdapter { |
|
|
|
datas.setQuality(0); |
|
|
|
datas.setQuality(0); |
|
|
|
advantechDatas.add(datas); |
|
|
|
advantechDatas.add(datas); |
|
|
|
advantechReceiver.setD(advantechDatas); |
|
|
|
advantechReceiver.setD(advantechDatas); |
|
|
|
|
|
|
|
// log.error("接收到的指令==》{},发送数据到MQTT==》{}", receiveStr, JSONObject.toJSONString(advantechReceiver));
|
|
|
|
sendMsgByTopic.sendToDeviceMQ(JSONObject.toJSONString(advantechReceiver)); |
|
|
|
sendMsgByTopic.sendToDeviceMQ(JSONObject.toJSONString(advantechReceiver)); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@ -394,7 +395,7 @@ public class EchoServerHandler extends ChannelInboundHandlerAdapter { |
|
|
|
@Override |
|
|
|
@Override |
|
|
|
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { |
|
|
|
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { |
|
|
|
cause.getCause().printStackTrace(); |
|
|
|
cause.getCause().printStackTrace(); |
|
|
|
log.info("异常捕捉,执行ctx.close" + cause.getCause()); |
|
|
|
// log.info("异常捕捉,执行ctx.close" + cause.getCause());
|
|
|
|
ctx.close(); // 关闭该Channel
|
|
|
|
ctx.close(); // 关闭该Channel
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@ -402,7 +403,7 @@ public class EchoServerHandler extends ChannelInboundHandlerAdapter { |
|
|
|
@Override |
|
|
|
@Override |
|
|
|
public void channelInactive(ChannelHandlerContext ctx) throws Exception { |
|
|
|
public void channelInactive(ChannelHandlerContext ctx) throws Exception { |
|
|
|
ctx.close();// 关闭流
|
|
|
|
ctx.close();// 关闭流
|
|
|
|
log.info("客户端断开,执行ctx.close()......"); |
|
|
|
// log.info("客户端断开,执行ctx.close()......");
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
private boolean action(ServerSession session, String deviceCode, ChannelHandlerContext ctx) { |
|
|
|
private boolean action(ServerSession session, String deviceCode, ChannelHandlerContext ctx) { |
|
|
|
@ -415,7 +416,7 @@ public class EchoServerHandler extends ChannelInboundHandlerAdapter { |
|
|
|
private boolean checkUser(String deviceCode,ServerSession session) { |
|
|
|
private boolean checkUser(String deviceCode,ServerSession session) { |
|
|
|
//当前用户已经登录
|
|
|
|
//当前用户已经登录
|
|
|
|
if(SessionMap.inst().hasLogin(deviceCode)) { |
|
|
|
if(SessionMap.inst().hasLogin(deviceCode)) { |
|
|
|
log.info("设备已经登录: 设备号 = " + deviceCode); |
|
|
|
// log.info("设备已经登录: 设备号 = " + deviceCode);
|
|
|
|
return false; |
|
|
|
return false; |
|
|
|
} |
|
|
|
} |
|
|
|
//一般情况下,我们会将 user存储到 DB中,然后对user的用户名和密码进行校验
|
|
|
|
//一般情况下,我们会将 user存储到 DB中,然后对user的用户名和密码进行校验
|
|
|
|
|