You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
295 lines
13 KiB
295 lines
13 KiB
package com.mh.user.netty; |
|
|
|
import com.mh.user.constants.Constant; |
|
import com.mh.user.entity.DeviceManageEntity; |
|
import com.mh.user.service.DeviceManageService; |
|
import com.mh.user.utils.*; |
|
import io.netty.buffer.ByteBuf; |
|
import io.netty.channel.Channel; |
|
import io.netty.channel.ChannelHandlerAdapter; |
|
import io.netty.channel.ChannelHandlerContext; |
|
import io.netty.util.ReferenceCountUtil; |
|
import lombok.extern.slf4j.Slf4j; |
|
import org.springframework.context.ApplicationContext; |
|
|
|
import java.text.SimpleDateFormat; |
|
import java.util.Date; |
|
import java.util.List; |
|
|
|
/** |
|
* @author ljf |
|
* @title : |
|
* @description :客户端异步消息处理机制 |
|
* @updateTime 2020-05-13 |
|
* @throws : |
|
*/ |
|
@Slf4j |
|
public class NettyClientHandler extends ChannelHandlerAdapter { |
|
|
|
private int num = 0; |
|
private int size = 0; |
|
private String receiveStr = null; |
|
private String IP = null; |
|
private String port = null; |
|
List<DeviceManageEntity> deviceManageEntityList; |
|
|
|
// 调用service |
|
ApplicationContext context = SpringBeanUtil.getApplicationContext(); |
|
DeviceManageService deviceManageService = context.getBean(DeviceManageService.class); |
|
|
|
|
|
@Override |
|
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { |
|
log.info("当前channel从EventLoop取消注册"); |
|
super.channelUnregistered(ctx); |
|
} |
|
|
|
@Override |
|
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { |
|
// super.exceptionCaught(ctx, cause); |
|
log.info("通信异常!!"); |
|
receiveStr = null; |
|
Channel incoming = ctx.channel(); |
|
if (incoming.isActive()){ |
|
log.info("SimpleClient: " + incoming.remoteAddress() + "异常"); |
|
cause.printStackTrace(); |
|
ctx.close(); |
|
// receiveStr = null; |
|
// try { |
|
// TimeUnit.SECONDS.sleep(5); |
|
// SocketAddress remoteAddress = ctx.channel().remoteAddress(); |
|
// String port = ExchangeStringUtil.endData(remoteAddress.toString(),":"); |
|
// String host = ExchangeStringUtil.splitData(remoteAddress.toString(),"/",":"); |
|
// NettyClient nettyClient = new NettyClient(); |
|
// nettyClient.connect(Integer.parseInt(port), host); // 断线重连 |
|
// } catch (InterruptedException e) { |
|
// e.printStackTrace(); |
|
// } |
|
} |
|
} |
|
|
|
|
|
@Override |
|
public void channelActive(ChannelHandlerContext ctx) { |
|
// 添加一个状态值,判断是否继续发送指令 update by ljf on 2020-08-07 |
|
if (Constant.WEB_FLAG) { |
|
num = 0; |
|
// 关闭连接 |
|
receiveStr = null; |
|
ctx.close(); |
|
} else { |
|
ctx.channel().read(); |
|
SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-mm-dd HH:mm:ss"); |
|
Date date = new Date(); |
|
log.info(ctx.channel().remoteAddress() + " " + sdf1.format(date) + "链接服务端成功!"); |
|
// 截取IP地址 |
|
IP = ExchangeStringUtil.getMidString(ctx.channel().remoteAddress() + "", "/", ":"); |
|
// 截取端口号 |
|
port = ExchangeStringUtil.getMidString(ctx.channel().remoteAddress() + "", ":", ""); |
|
log.info("IP: " + IP + ",端口号: " + port); |
|
// 生成对应采集冷量计的命令 |
|
// 生成对应的采集指令 |
|
deviceManageEntityList = deviceManageService.queryDevicesByType("3"); |
|
size = deviceManageEntityList.size(); |
|
|
|
// 封装工具类进行采集,update by ljf on 2021-01-26 |
|
SendOrderUtils.sendCloudOrder(deviceManageEntityList.get(0),0,IP,port,ctx); |
|
// // 1.创建将要写出的数据 |
|
// String collectionNum = deviceManageEntityList.get(0).getCollectionNum(); |
|
// String sendStr = GetReadOrder485.createCloudOrder(IP, port, |
|
// deviceManageEntityList.get(0).getDataCom(), |
|
// collectionNum, "34"); |
|
//// String sendStr = "5803004900021914"; |
|
// ByteBuf buffer = getByteBuf(ctx, sendStr); |
|
// // 2.发送数据 |
|
// ctx.channel().writeAndFlush(buffer); |
|
} |
|
|
|
// String sendStr = "5803004900021914"; // 冷量计 |
|
// // 申请一个数据结构存储信息 |
|
// ByteBuf buffer = ctx.alloc().buffer(); |
|
// // 将信息放入数据结构中 |
|
// buffer.writeBytes(ExchangeStringUtil.hexStrToBinaryStr(sendStr));//对接需要16进制 |
|
// ctx.writeAndFlush(buffer, ctx.newProgressivePromise()); |
|
} |
|
|
|
private ByteBuf getByteBuf(ChannelHandlerContext ctx, String sendStr) { |
|
// byte类型的数据 |
|
// byte[] bytes = "这里是将要写往服务端的数据".getBytes(Charset.forName("utf-8")); |
|
// String sendStr = "5803004900021914"; // 冷量计 |
|
// 申请一个数据结构存储信息 |
|
ByteBuf buffer = ctx.alloc().buffer(); |
|
// 将信息放入数据结构中 |
|
buffer.writeBytes(ExchangeStringUtil.hexStrToBinaryStr(sendStr));//对接需要16进制 |
|
return buffer; |
|
} |
|
|
|
@Override |
|
public void channelInactive(ChannelHandlerContext ctx) throws Exception { |
|
Thread.sleep(100); |
|
ctx.close(); |
|
log.info(ctx.channel().localAddress() + "退出链接!!"); |
|
} |
|
|
|
@Override |
|
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { |
|
try { |
|
ByteBuf buf = (ByteBuf)msg; |
|
byte [] bytes = new byte[buf.readableBytes()]; |
|
buf.readBytes(bytes);//复制内容到字节数组bytes |
|
buf.clear(); |
|
log.info("获取到的值: " + ExchangeStringUtil.bytesToHexString(bytes)); |
|
if (bytes.length <= 36) { |
|
// receiveStr = receiveStr.replace("null", ""); |
|
// receiveStr = receiveStr + ExchangeStringUtil.bytesToHexString(bytes);//将接收到的数据转为字符串,此字符串就是客户端发送的字符串 |
|
// log.info(ctx.channel().remoteAddress() + " " + ctx.channel().localAddress() + " 接受服务器数据:" + receiveStr + ",大小: " + receiveStr.length()); |
|
receiveStr = receiveStr + ExchangeStringUtil.bytesToHexString(bytes);//将接收到的数据转为字符串,此字符串就是客户端发送的字符串 |
|
receiveStr = receiveStr.replace("null", ""); |
|
log.info("接受服务器数据:" + receiveStr + ",大小: " + receiveStr.length()); |
|
} |
|
} catch (Exception e) { |
|
e.printStackTrace(); |
|
} finally { |
|
ReferenceCountUtil.release(msg); |
|
} |
|
} |
|
|
|
@Override |
|
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { |
|
log.info("采集冷量计-数据读取接收完成: " + receiveStr); |
|
// A9 FE C2 C7 1F 90 01 58 03 04 4A 30 00 53 65 1C C4 06 |
|
if (receiveStr.length() == 36) { |
|
// 接收到的报文 |
|
log.info("接收完整报文: " + receiveStr); |
|
// 解析报文 |
|
// AnalysisReceiveOrder485 analysisReceiveOrder485 = new AnalysisReceiveOrder485(); |
|
// analysisReceiveOrder485.analysisCloudOrder485(receiveStr); // 解析冷量计 |
|
receiveStr = ""; |
|
// 1.创建将要写出的数据 |
|
// String sendStr = "5803004900021914"; |
|
num = num + 1; |
|
Thread.sleep(500); |
|
if (num > size-1) { |
|
num = 0; |
|
// 关闭连接 |
|
receiveStr = null; |
|
ctx.close(); |
|
// 保持长连接 |
|
// // 封装工具类进行采集,update by ljf on 2021-01-26 |
|
// SendOrderUtils.sendCloudOrder(deviceManageEntityList.get(num),num,IP,port,ctx); |
|
} else { |
|
// 添加一个状态值,判断是否继续发送指令 update by ljf on 2020-08-07 |
|
if (Constant.WEB_FLAG) { |
|
log.info("有指令下发退出定时采集DDC参数"); |
|
num = 0; |
|
// 关闭连接 |
|
receiveStr = null; |
|
ctx.close(); |
|
} else { |
|
// 封装工具类进行采集,update by ljf on 2021-01-26 |
|
SendOrderUtils.sendCloudOrder(deviceManageEntityList.get(num),num,IP,port,ctx); |
|
// // 1.创建将要写出的数据 |
|
// String collectionNum = deviceManageEntityList.get(num).getCollectionNum(); |
|
// String sendStr = GetReadOrder485.createCloudOrder(IP, port, |
|
// deviceManageEntityList.get(num).getDataCom(), |
|
// collectionNum, "34"); |
|
//// String sendStr = "5803004900021914"; |
|
// ByteBuf buffer = getByteBuf(ctx, sendStr); |
|
// // 2.发送数据 |
|
// ctx.channel().writeAndFlush(buffer); |
|
// log.info("客户端再次往服务端发送数据" + sendStr); |
|
} |
|
} |
|
|
|
} else { |
|
log.info(receiveStr); |
|
receiveStr = null; |
|
ctx.flush(); |
|
ctx.close(); |
|
} |
|
|
|
// if (receiveStr.contains("c0a801fc")) { // 冷量计 |
|
// |
|
// // 生成对应的采集指令 |
|
// deviceManageEntityList = deviceManageService.queryDevicesByType("3"); |
|
// size = deviceManageEntityList.size(); |
|
// |
|
// log.info("初始连接报文: " + receiveStr); |
|
// IP = receiveStr; |
|
// receiveStr = ""; |
|
// // 1.创建将要写出的数据 |
|
// String collectionNum = deviceManageEntityList.get(0).getCollectionNum(); |
|
// String sendStr = GetReadOrder485.createCloudOrder(collectionNum,"34"); |
|
//// String sendStr = "5803004900021914"; |
|
// ByteBuf buffer = getByteBuf(ctx,sendStr); |
|
// // 2.发送数据 |
|
// ctx.channel().writeAndFlush(buffer); |
|
// } else if (receiveStr.contains("c0a801f0")) { // 电表 |
|
// |
|
// // 生成对应的采集指令 |
|
// deviceManageEntityList = deviceManageService.queryDevicesByType("1"); |
|
// size = deviceManageEntityList.size(); |
|
// |
|
// log.info("初始连接报文: " + receiveStr); |
|
// IP = receiveStr; |
|
// receiveStr = ""; |
|
// // 1.创建将要写出的数据 |
|
//// String sendStr = "6830043080000068110432326536C816"; // 网络单相电表 |
|
//// String sendStr = "FEFEFEFE6880025007000068010243C3B216"; // 广仪三相电表 |
|
// String collectionNum = deviceManageEntityList.get(0).getCollectionNum(); |
|
// String sendStr = GetReadOrder485.createMeterOrder(collectionNum,"1"); |
|
// ByteBuf buffer = getByteBuf(ctx,sendStr); |
|
// // 2.发送数据 |
|
// ctx.channel().writeAndFlush(buffer); |
|
// } else if ((receiveStr.length() == 18) && (IP.contains("c0a801fc"))) { |
|
// analysisReceiveOrder485.analysisCloudOrder485(receiveStr); // 解析冷量计 |
|
// receiveStr = ""; |
|
// // 1.创建将要写出的数据 |
|
//// String sendStr = "5803004900021914"; |
|
// num = num + 1; |
|
// Thread.sleep(1000); |
|
// if (num >= size-1) { |
|
// num = 0; |
|
// // 关闭连接 |
|
// receiveStr = null; |
|
// ctx.close(); |
|
// } else { |
|
// String collectionNum = deviceManageEntityList.get(num).getCollectionNum(); |
|
// String sendStr = GetReadOrder485.createCloudOrder(collectionNum, "34"); |
|
// ByteBuf buffer = getByteBuf(ctx, sendStr); |
|
// // 2.发送数据 |
|
// ctx.channel().writeAndFlush(buffer); |
|
// log.info("客户端再次往服务端发送数据" + num); |
|
// } |
|
// |
|
// } else if ((receiveStr.length() == 44) && (IP.contains("c0a801f0"))) { |
|
// analysisReceiveOrder485.analysisMeterOrder485(receiveStr); // 解析电表 |
|
// receiveStr = ""; |
|
// num = num + 1; |
|
// Thread.sleep(1000); |
|
// if (num >= size-1) { |
|
// num = 0; |
|
// receiveStr = null; |
|
// // 关闭连接 |
|
// ctx.close(); |
|
// } else { |
|
// // 1.创建将要写出的数据 |
|
// // fe fe fe fe 68 80 02 50 07 00 00 68 81 06 43 c3 8c 34 33 33 5c 16 |
|
//// String sendStr = "FEFEFE6880025007000068010243C3B216"; |
|
// String collectionNum = deviceManageEntityList.get(num).getCollectionNum(); |
|
// String sendStr = GetReadOrder485.createMeterOrder(collectionNum, "1"); |
|
// ByteBuf buffer = getByteBuf(ctx, sendStr); |
|
// // 2.发送数据 |
|
// ctx.channel().writeAndFlush(buffer); |
|
// log.info("客户端再次往服务端发送数据" + num); |
|
// } |
|
// } else if ((receiveStr.length() > 44)) { |
|
// log.info(receiveStr); |
|
// receiveStr = null; |
|
// ctx.flush(); |
|
// ctx.close(); |
|
// } |
|
ctx.flush(); |
|
} |
|
|
|
}
|
|
|