16 changed files with 832 additions and 808 deletions
@ -1,95 +1,95 @@
|
||||
package com.mh.user.netty; |
||||
|
||||
import io.netty.bootstrap.Bootstrap; |
||||
import io.netty.channel.*; |
||||
import io.netty.channel.nio.NioEventLoopGroup; |
||||
import io.netty.channel.socket.SocketChannel; |
||||
import io.netty.channel.socket.nio.NioSocketChannel; |
||||
import io.netty.handler.timeout.IdleStateHandler; |
||||
import io.netty.handler.timeout.ReadTimeoutHandler; |
||||
import io.netty.handler.timeout.WriteTimeoutHandler; |
||||
import lombok.Getter; |
||||
import lombok.Setter; |
||||
import lombok.extern.slf4j.Slf4j; |
||||
|
||||
import java.util.concurrent.TimeUnit; |
||||
|
||||
/** |
||||
* @author ljf |
||||
* @title : |
||||
* @description :Netty冷水机组客户端 |
||||
* @updateTime 2020-05-13 |
||||
* @throws : |
||||
*/ |
||||
@Slf4j |
||||
@Setter |
||||
@Getter |
||||
public class NettyChillerClient { |
||||
|
||||
private volatile static NettyChillerDDCClient nettyChillerDDCClient = new NettyChillerDDCClient(); |
||||
|
||||
private int port; |
||||
private String host; |
||||
|
||||
// 构造函数传递值 继承Thread时需要
|
||||
// public NettyClient(int port, String host) {
|
||||
// this.port = port;
|
||||
// this.host = host;
|
||||
// }
|
||||
|
||||
public static void connect(int port, String host) throws InterruptedException { |
||||
// 配置客户端NIO线程组
|
||||
EventLoopGroup group = new NioEventLoopGroup(1); |
||||
try { |
||||
Bootstrap bootstrap = new Bootstrap(); |
||||
bootstrap.group(group).channel(NioSocketChannel.class) |
||||
.option(ChannelOption.TCP_NODELAY, true) |
||||
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000) |
||||
.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(1024*1024)) |
||||
.handler(new ChannelInitializer<SocketChannel>() { |
||||
@Override |
||||
protected void initChannel(SocketChannel socketChannel) { |
||||
// 基于换行符号
|
||||
// socketChannel.pipeline().addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,4,4,-8,0));
|
||||
// // 解码转String,注意调整自己的编码格式GBK、UTF-8
|
||||
// socketChannel.pipeline().addLast(new StringDecoder(StandardCharsets.UTF_8));
|
||||
// // 解码转String,注意调整自己的编码格式GBK、UTF-8
|
||||
// socketChannel.pipeline().addLast(new StringEncoder(StandardCharsets.UTF_8));
|
||||
// socketChannel.pipeline().addLast(new LengthFieldPrepender(4));
|
||||
socketChannel.pipeline().addLast(new IdleStateHandler(10,10,10, TimeUnit.SECONDS)); |
||||
// 在管道中添加我们自己的接收数据实现方法
|
||||
socketChannel.pipeline().addLast(new NettyChillerClientHandler()); |
||||
// socketChannel.pipeline().addLast(new NettyMeterClientHandler());
|
||||
} |
||||
}); |
||||
// 发起异步连接操作
|
||||
ChannelFuture channelFuture = bootstrap.connect(host, port).sync(); |
||||
if (channelFuture.isSuccess()) { |
||||
log.info("connect server 成功---------"); |
||||
} else { |
||||
log.info("连接失败!"); |
||||
log.info("准备重连!"); |
||||
// connect(port, host);
|
||||
} |
||||
|
||||
// 等待客户端连接链路关闭future.channel().closeFuture().sync(); // 阻塞main线程
|
||||
channelFuture.channel().closeFuture().sync(); |
||||
} catch (Exception e) { |
||||
log.error("error>>>>>>" + e.getMessage()); |
||||
} finally { |
||||
group.shutdownGracefully(); |
||||
// try {
|
||||
// TimeUnit.SECONDS.sleep(5);
|
||||
// connect(port, host); // 断线重连
|
||||
// } catch (InterruptedException e) {
|
||||
// e.printStackTrace();
|
||||
//package com.mh.user.netty;
|
||||
//
|
||||
//import io.netty.bootstrap.Bootstrap;
|
||||
//import io.netty.channel.*;
|
||||
//import io.netty.channel.nio.NioEventLoopGroup;
|
||||
//import io.netty.channel.socket.SocketChannel;
|
||||
//import io.netty.channel.socket.nio.NioSocketChannel;
|
||||
//import io.netty.handler.timeout.IdleStateHandler;
|
||||
//import io.netty.handler.timeout.ReadTimeoutHandler;
|
||||
//import io.netty.handler.timeout.WriteTimeoutHandler;
|
||||
//import lombok.Getter;
|
||||
//import lombok.Setter;
|
||||
//import lombok.extern.slf4j.Slf4j;
|
||||
//
|
||||
//import java.util.concurrent.TimeUnit;
|
||||
//
|
||||
///**
|
||||
// * @author ljf
|
||||
// * @title :
|
||||
// * @description :Netty冷水机组客户端
|
||||
// * @updateTime 2020-05-13
|
||||
// * @throws :
|
||||
// */
|
||||
//@Slf4j
|
||||
//@Setter
|
||||
//@Getter
|
||||
//public class NettyChillerClient {
|
||||
//
|
||||
// private volatile static NettyChillerDDCClient nettyChillerDDCClient = new NettyChillerDDCClient();
|
||||
//
|
||||
// private int port;
|
||||
// private String host;
|
||||
//
|
||||
// // 构造函数传递值 继承Thread时需要
|
||||
//// public NettyClient(int port, String host) {
|
||||
//// this.port = port;
|
||||
//// this.host = host;
|
||||
//// }
|
||||
//
|
||||
// public static void connect(int port, String host) throws InterruptedException {
|
||||
// // 配置客户端NIO线程组
|
||||
// EventLoopGroup group = new NioEventLoopGroup(1);
|
||||
// try {
|
||||
// Bootstrap bootstrap = new Bootstrap();
|
||||
// bootstrap.group(group).channel(NioSocketChannel.class)
|
||||
// .option(ChannelOption.TCP_NODELAY, true)
|
||||
// .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
|
||||
// .option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(1024*1024))
|
||||
// .handler(new ChannelInitializer<SocketChannel>() {
|
||||
// @Override
|
||||
// protected void initChannel(SocketChannel socketChannel) {
|
||||
// // 基于换行符号
|
||||
//// socketChannel.pipeline().addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,4,4,-8,0));
|
||||
//// // 解码转String,注意调整自己的编码格式GBK、UTF-8
|
||||
//// socketChannel.pipeline().addLast(new StringDecoder(StandardCharsets.UTF_8));
|
||||
//// // 解码转String,注意调整自己的编码格式GBK、UTF-8
|
||||
//// socketChannel.pipeline().addLast(new StringEncoder(StandardCharsets.UTF_8));
|
||||
//// socketChannel.pipeline().addLast(new LengthFieldPrepender(4));
|
||||
// socketChannel.pipeline().addLast(new IdleStateHandler(10,10,10, TimeUnit.SECONDS));
|
||||
// // 在管道中添加我们自己的接收数据实现方法
|
||||
// socketChannel.pipeline().addLast(new NettyChillerClientHandler());
|
||||
//// socketChannel.pipeline().addLast(new NettyMeterClientHandler());
|
||||
// }
|
||||
// });
|
||||
// // 发起异步连接操作
|
||||
// ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
|
||||
// if (channelFuture.isSuccess()) {
|
||||
// log.info("connect server 成功---------");
|
||||
// } else {
|
||||
// log.info("连接失败!");
|
||||
// log.info("准备重连!");
|
||||
//// connect(port, host);
|
||||
// }
|
||||
} |
||||
} |
||||
|
||||
// @SneakyThrows
|
||||
// @Override
|
||||
// public void run() {
|
||||
// connect(port, host);
|
||||
//
|
||||
// // 等待客户端连接链路关闭future.channel().closeFuture().sync(); // 阻塞main线程
|
||||
// channelFuture.channel().closeFuture().sync();
|
||||
// } catch (Exception e) {
|
||||
// log.error("error>>>>>>" + e.getMessage());
|
||||
// } finally {
|
||||
// group.shutdownGracefully();
|
||||
//// try {
|
||||
//// TimeUnit.SECONDS.sleep(5);
|
||||
//// connect(port, host); // 断线重连
|
||||
//// } catch (InterruptedException e) {
|
||||
//// e.printStackTrace();
|
||||
//// }
|
||||
// }
|
||||
// }
|
||||
} |
||||
//
|
||||
//// @SneakyThrows
|
||||
//// @Override
|
||||
//// public void run() {
|
||||
//// connect(port, host);
|
||||
//// }
|
||||
//}
|
||||
|
@ -1,315 +1,315 @@
|
||||
package com.mh.user.netty; |
||||
|
||||
import com.mh.user.constants.Constant; |
||||
import com.mh.user.entity.ChillersEntity; |
||||
import com.mh.user.entity.DeviceCodeParamEntity; |
||||
import com.mh.user.service.DeviceCodeParamService; |
||||
import com.mh.user.service.chillers.ChillersService; |
||||
import com.mh.user.service.chillers.GatewayManageService; |
||||
import com.mh.user.utils.AnalysisReceiveOrder485; |
||||
import com.mh.user.utils.ExchangeStringUtil; |
||||
import com.mh.user.utils.GetReadOrder485; |
||||
import com.mh.user.utils.SpringBeanUtil; |
||||
import io.netty.buffer.ByteBuf; |
||||
import io.netty.channel.Channel; |
||||
import io.netty.channel.ChannelHandlerAdapter; |
||||
import io.netty.channel.ChannelHandlerContext; |
||||
import io.netty.handler.timeout.IdleState; |
||||
import io.netty.handler.timeout.IdleStateEvent; |
||||
import io.netty.util.ReferenceCountUtil; |
||||
import lombok.extern.slf4j.Slf4j; |
||||
import org.springframework.context.ApplicationContext; |
||||
|
||||
import java.text.SimpleDateFormat; |
||||
import java.util.Date; |
||||
import java.util.List; |
||||
|
||||
/** |
||||
* @author ljf |
||||
* @title : |
||||
* @description :客户端异步消息处理机制,采集冷水机组 |
||||
* @updateTime 2020-05-13 |
||||
* @throws : |
||||
*/ |
||||
@Slf4j |
||||
public class NettyChillerClientHandler extends ChannelHandlerAdapter { |
||||
|
||||
private int num = 0; |
||||
private int size = 0; |
||||
private int idle_count = 0; |
||||
private String receiveStr = ""; |
||||
List<DeviceCodeParamEntity> deviceCodeParamList; |
||||
|
||||
// 调用service
|
||||
ApplicationContext context = SpringBeanUtil.getApplicationContext(); |
||||
DeviceCodeParamService deviceCodeParamService = context.getBean(DeviceCodeParamService.class); |
||||
GatewayManageService gatewayManageService = context.getBean(GatewayManageService.class); |
||||
|
||||
/** |
||||
* 超时处理 |
||||
* 如果120秒没有接受客户端的心跳,就触发; |
||||
* 如果超过3次,则直接关闭; |
||||
*/ |
||||
@Override |
||||
public void userEventTriggered(ChannelHandlerContext ctx, Object obj) throws Exception { |
||||
if (obj instanceof IdleStateEvent) { |
||||
IdleStateEvent event = (IdleStateEvent) obj; |
||||
if (IdleState.READER_IDLE.equals(event.state())) { //如果读通道处于空闲状态,说明没有接收到心跳命令
|
||||
System.out.println("第" + idle_count + "已经10秒没有接收到服务器的信息了,发送第" + num + "条数据"); |
||||
if (deviceCodeParamList.get(num) == null) { |
||||
System.out.println("关闭这个不活跃的channel"); |
||||
ctx.channel().close(); |
||||
} else { |
||||
if ((num > size - 1) || (idle_count > 3)) { |
||||
System.out.println("关闭这个不活跃的channel"); |
||||
ctx.channel().close(); |
||||
} |
||||
GetReadOrder485 getReadOrder485 = new GetReadOrder485(); |
||||
String sendStr = getReadOrder485.createChillersOrder(deviceCodeParamList.get(num)); |
||||
ctx.channel().writeAndFlush(ExchangeStringUtil.getByteBuf(ctx, sendStr)); |
||||
idle_count++; |
||||
} |
||||
} |
||||
} else { |
||||
super.userEventTriggered(ctx, obj); |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { |
||||
log.info("当前channel从EventLoop取消注册"); |
||||
ctx.close(); |
||||
super.channelUnregistered(ctx); |
||||
} |
||||
|
||||
@Override |
||||
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { |
||||
// super.exceptionCaught(ctx, cause);
|
||||
log.info("通信异常!!"); |
||||
// // 发送采集冷水机组指令
|
||||
// GetReadOrder485 getReadOrder485 = new GetReadOrder485();
|
||||
// String sendStr = getReadOrder485.createChillersOrder(chillersEntityList.get(num));
|
||||
// // 获取采集参数个数
|
||||
// ByteBuf buffer = getByteBuf(ctx, sendStr);
|
||||
// // 2.发送数据
|
||||
// ctx.channel().writeAndFlush(buffer);
|
||||
// receiveStr = null;
|
||||
cause.printStackTrace(); |
||||
ctx.close(); |
||||
// Channel incoming = ctx.channel();
|
||||
// if (incoming.isActive()) {
|
||||
// log.info("SimpleClient: " + incoming.remoteAddress() + "异常");
|
||||
// cause.printStackTrace();
|
||||
// ctx.close();
|
||||
// receiveStr = null;
|
||||
// try {
|
||||
// TimeUnit.SECONDS.sleep(5);
|
||||
// SocketAddress remoteAddress = ctx.channel().remoteAddress();
|
||||
// String port = ExchangeStringUtil.endData(remoteAddress.toString(),":");
|
||||
// String host = ExchangeStringUtil.splitData(remoteAddress.toString(),"/",":");
|
||||
// NettyClient nettyClient = new NettyClient();
|
||||
// nettyClient.connect(Integer.parseInt(port), host); // 断线重连
|
||||
// } catch (InterruptedException e) {
|
||||
// e.printStackTrace();
|
||||
// }
|
||||
// }
|
||||
} |
||||
|
||||
|
||||
@Override |
||||
public void channelActive(ChannelHandlerContext ctx) throws Exception { |
||||
super.channelActive(ctx); |
||||
// 添加一个状态值,判断是否继续发送指令 update by ljf on 2020-08-07
|
||||
if (Constant.WEB_FLAG) { |
||||
num = 0; |
||||
// 关闭连接
|
||||
receiveStr = null; |
||||
ctx.close(); |
||||
} else { |
||||
SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-mm-dd HH:mm:ss"); |
||||
Date date = new Date(); |
||||
log.info(ctx.channel().remoteAddress() + " " + sdf1.format(date) + "链接服务端成功!"); |
||||
// 截取IP地址
|
||||
String IP = ExchangeStringUtil.getMidString(ctx.channel().remoteAddress() + "", "/", ":"); |
||||
// 截取端口号
|
||||
String port = ExchangeStringUtil.getMidString(ctx.channel().remoteAddress() + "", ":", ""); |
||||
log.info("IP: " + IP + ",端口号: " + port); |
||||
// 更新对应的网关在线情况
|
||||
gatewayManageService.updateGatewayManage(IP, port); |
||||
// 生成采集指令
|
||||
deviceCodeParamList = deviceCodeParamService.queryCodeParam(port); |
||||
size = deviceCodeParamList.size(); |
||||
|
||||
// 发送采集冷水机组指令
|
||||
GetReadOrder485 getReadOrder485 = new GetReadOrder485(); |
||||
String sendStr = getReadOrder485.createChillersOrder(deviceCodeParamList.get(num)); |
||||
// 获取采集参数个数
|
||||
ByteBuf buffer = getByteBuf(ctx, sendStr); |
||||
// 发送数据
|
||||
ctx.channel().writeAndFlush(buffer); |
||||
} |
||||
} |
||||
|
||||
private ByteBuf getByteBuf(ChannelHandlerContext ctx, String sendStr) { |
||||
// byte类型的数据
|
||||
// byte[] bytes = "这里是将要写往服务端的数据".getBytes(Charset.forName("utf-8"));
|
||||
// String sendStr = "5803004900021914"; // 冷量计
|
||||
// 申请一个数据结构存储信息
|
||||
ByteBuf buffer = ctx.alloc().buffer(); |
||||
// 将信息放入数据结构中
|
||||
buffer.writeBytes(ExchangeStringUtil.hexStrToBinaryStr(sendStr));//对接需要16进制
|
||||
return buffer; |
||||
} |
||||
|
||||
@Override |
||||
public void channelInactive(ChannelHandlerContext ctx) throws Exception { |
||||
Thread.sleep(100); |
||||
ctx.close(); |
||||
log.info(ctx.channel().localAddress() + "退出链接!!"); |
||||
} |
||||
|
||||
@Override |
||||
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { |
||||
try { |
||||
ByteBuf buf = (ByteBuf) msg; |
||||
byte[] bytes = new byte[buf.readableBytes()]; |
||||
buf.readBytes(bytes);//复制内容到字节数组bytes
|
||||
buf.clear(); |
||||
log.info("获取到的值: " + ExchangeStringUtil.bytesToHexString(bytes)); |
||||
if (bytes.length <= 36) { |
||||
// receiveStr = receiveStr.replace("null", "");
|
||||
// receiveStr = receiveStr + ExchangeStringUtil.bytesToHexString(bytes);//将接收到的数据转为字符串,此字符串就是客户端发送的字符串
|
||||
// log.info(ctx.channel().remoteAddress() + " " + ctx.channel().localAddress() + " 接受服务器数据:" + receiveStr + ",大小: " + receiveStr.length());
|
||||
receiveStr = receiveStr + ExchangeStringUtil.bytesToHexString(bytes);//将接收到的数据转为字符串,此字符串就是客户端发送的字符串
|
||||
receiveStr = receiveStr.replace("null", ""); |
||||
log.info("接受服务器数据:" + receiveStr + ",大小: " + receiveStr.length()); |
||||
} |
||||
} catch (Exception e) { |
||||
e.printStackTrace(); |
||||
} finally { |
||||
ReferenceCountUtil.release(msg); |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { |
||||
log.info("冷水机组--数据读取接收完成: " + receiveStr); |
||||
if (receiveStr.length() == 30) { |
||||
log.info("采集完整的报文: " + receiveStr); |
||||
// 解析采集的报文,并保存到数据库
|
||||
AnalysisReceiveOrder485 analysisReceiveOrder485 = new AnalysisReceiveOrder485(); |
||||
analysisReceiveOrder485.analysisChillerOrder485(receiveStr,deviceCodeParamList.get(num)); |
||||
// 清空receiveStr
|
||||
receiveStr = ""; |
||||
// 判断发送的下标,如果不等于指令数组大小
|
||||
num = num + 1; |
||||
if (num > size - 1) { |
||||
num = 0; |
||||
// 关闭连接
|
||||
receiveStr = null; |
||||
ctx.close(); |
||||
// // 继续发送下一个采集冷水机设备指令
|
||||
// GetReadOrder485 getReadOrder485 = new GetReadOrder485();
|
||||
// String sendStr = getReadOrder485.createChillersOrder(chillersEntityList.get(num));
|
||||
// ByteBuf buffer = getByteBuf(ctx, sendStr);
|
||||
// // 发送数据
|
||||
// ctx.channel().writeAndFlush(buffer);
|
||||
// log.info("客户端再次往服务端发送数据" + num + " 数据条数:" + size);
|
||||
} else { |
||||
// 添加一个状态值,判断是否继续发送指令 update by ljf on 2020-08-07
|
||||
if (Constant.WEB_FLAG) { |
||||
num = 0; |
||||
// 关闭连接
|
||||
receiveStr = null; |
||||
ctx.close(); |
||||
} else { |
||||
Thread.sleep(1000); |
||||
// 继续发送下一个采集冷水机设备指令
|
||||
GetReadOrder485 getReadOrder485 = new GetReadOrder485(); |
||||
String sendStr = getReadOrder485.createChillersOrder(deviceCodeParamList.get(num)); |
||||
ByteBuf buffer = getByteBuf(ctx, sendStr); |
||||
// 发送数据
|
||||
ctx.channel().writeAndFlush(buffer); |
||||
log.info("客户端再次往服务端发送数据" + num + " 数据条数:" + size); |
||||
} |
||||
} |
||||
} else if (receiveStr.length() == 32) { |
||||
log.info("采集完整的报文: " + receiveStr); |
||||
// 解析采集的报文,并保存到数据库
|
||||
AnalysisReceiveOrder485 analysisReceiveOrder485 = new AnalysisReceiveOrder485(); |
||||
analysisReceiveOrder485.analysisChillerOrder485(receiveStr,deviceCodeParamList.get(num)); |
||||
// 清空receiveStr
|
||||
receiveStr = ""; |
||||
// 判断发送的下标,如果不等于指令数组大小
|
||||
num = num + 1; |
||||
if (num > size - 1) { |
||||
num = 0; |
||||
// 关闭连接
|
||||
receiveStr = null; |
||||
ctx.close(); |
||||
// // 继续发送下一个采集冷水机设备指令
|
||||
// GetReadOrder485 getReadOrder485 = new GetReadOrder485();
|
||||
// String sendStr = getReadOrder485.createChillersOrder(chillersEntityList.get(num));
|
||||
// ByteBuf buffer = getByteBuf(ctx, sendStr);
|
||||
// // 发送数据
|
||||
// ctx.channel().writeAndFlush(buffer);
|
||||
// log.info("客户端再次往服务端发送数据" + num + " 数据条数:" + size);
|
||||
} else { |
||||
// 添加一个状态值,判断是否继续发送指令 update by ljf on 2020-08-07
|
||||
if (Constant.WEB_FLAG) { |
||||
log.info("有指令下发退出定时采集冷水机组参数"); |
||||
num = 0; |
||||
// 关闭连接
|
||||
receiveStr = null; |
||||
ctx.close(); |
||||
} else { |
||||
Thread.sleep(1000); |
||||
// 继续发送下一个采集冷水机设备指令
|
||||
GetReadOrder485 getReadOrder485 = new GetReadOrder485(); |
||||
String sendStr = getReadOrder485.createChillersOrder(deviceCodeParamList.get(num)); |
||||
ByteBuf buffer = getByteBuf(ctx, sendStr); |
||||
// 2.发送数据
|
||||
ctx.channel().writeAndFlush(buffer); |
||||
log.info("客户端再次往服务端发送数据" + num + " 数据条数:" + size); |
||||
} |
||||
} |
||||
} else if (receiveStr.length() > 36) { |
||||
// 清空receiveStr
|
||||
receiveStr = null; |
||||
// 判断发送的下标,如果不等于指令数组大小
|
||||
num = num + 1; |
||||
if (num > size - 1) { |
||||
num = 0; |
||||
// 关闭连接
|
||||
receiveStr = null; |
||||
ctx.close(); |
||||
// // 继续发送下一个采集冷水机设备指令
|
||||
// GetReadOrder485 getReadOrder485 = new GetReadOrder485();
|
||||
// String sendStr = getReadOrder485.createChillersOrder(chillersEntityList.get(num));
|
||||
// ByteBuf buffer = getByteBuf(ctx, sendStr);
|
||||
// // 发送数据
|
||||
// ctx.channel().writeAndFlush(buffer);
|
||||
// log.info("客户端再次往服务端发送数据" + num + " 数据条数:" + size);
|
||||
} else { |
||||
// 添加一个状态值,判断是否继续发送指令 update by ljf on 2020-08-07
|
||||
if (Constant.WEB_FLAG) { |
||||
num = 0; |
||||
// 关闭连接
|
||||
receiveStr = null; |
||||
ctx.close(); |
||||
} else { |
||||
Thread.sleep(1000); |
||||
// 继续发送下一个采集冷水机设备指令
|
||||
GetReadOrder485 getReadOrder485 = new GetReadOrder485(); |
||||
String sendStr = getReadOrder485.createChillersOrder(deviceCodeParamList.get(num)); |
||||
ByteBuf buffer = getByteBuf(ctx, sendStr); |
||||
// 发送数据
|
||||
ctx.channel().writeAndFlush(buffer); |
||||
log.info("客户端再次往服务端发送数据" + num + " 数据条数:" + size); |
||||
} |
||||
} |
||||
} |
||||
ctx.flush(); |
||||
} |
||||
|
||||
} |
||||
//package com.mh.user.netty;
|
||||
//
|
||||
//import com.mh.user.constants.Constant;
|
||||
//import com.mh.user.entity.ChillersEntity;
|
||||
//import com.mh.user.entity.DeviceCodeParamEntity;
|
||||
//import com.mh.user.service.DeviceCodeParamService;
|
||||
//import com.mh.user.service.chillers.ChillersService;
|
||||
//import com.mh.user.service.chillers.GatewayManageService;
|
||||
//import com.mh.user.utils.AnalysisReceiveOrder485;
|
||||
//import com.mh.user.utils.ExchangeStringUtil;
|
||||
//import com.mh.user.utils.GetReadOrder485;
|
||||
//import com.mh.user.utils.SpringBeanUtil;
|
||||
//import io.netty.buffer.ByteBuf;
|
||||
//import io.netty.channel.Channel;
|
||||
//import io.netty.channel.ChannelHandlerAdapter;
|
||||
//import io.netty.channel.ChannelHandlerContext;
|
||||
//import io.netty.handler.timeout.IdleState;
|
||||
//import io.netty.handler.timeout.IdleStateEvent;
|
||||
//import io.netty.util.ReferenceCountUtil;
|
||||
//import lombok.extern.slf4j.Slf4j;
|
||||
//import org.springframework.context.ApplicationContext;
|
||||
//
|
||||
//import java.text.SimpleDateFormat;
|
||||
//import java.util.Date;
|
||||
//import java.util.List;
|
||||
//
|
||||
///**
|
||||
// * @author ljf
|
||||
// * @title :
|
||||
// * @description :客户端异步消息处理机制,采集冷水机组
|
||||
// * @updateTime 2020-05-13
|
||||
// * @throws :
|
||||
// */
|
||||
//@Slf4j
|
||||
//public class NettyChillerClientHandler extends ChannelHandlerAdapter {
|
||||
//
|
||||
// private int num = 0;
|
||||
// private int size = 0;
|
||||
// private int idle_count = 0;
|
||||
// private String receiveStr = "";
|
||||
// List<DeviceCodeParamEntity> deviceCodeParamList;
|
||||
//
|
||||
// // 调用service
|
||||
// ApplicationContext context = SpringBeanUtil.getApplicationContext();
|
||||
// DeviceCodeParamService deviceCodeParamService = context.getBean(DeviceCodeParamService.class);
|
||||
// GatewayManageService gatewayManageService = context.getBean(GatewayManageService.class);
|
||||
//
|
||||
// /**
|
||||
// * 超时处理
|
||||
// * 如果120秒没有接受客户端的心跳,就触发;
|
||||
// * 如果超过3次,则直接关闭;
|
||||
// */
|
||||
// @Override
|
||||
// public void userEventTriggered(ChannelHandlerContext ctx, Object obj) throws Exception {
|
||||
// if (obj instanceof IdleStateEvent) {
|
||||
// IdleStateEvent event = (IdleStateEvent) obj;
|
||||
// if (IdleState.READER_IDLE.equals(event.state())) { //如果读通道处于空闲状态,说明没有接收到心跳命令
|
||||
// System.out.println("第" + idle_count + "已经10秒没有接收到服务器的信息了,发送第" + num + "条数据");
|
||||
// if (deviceCodeParamList.get(num) == null) {
|
||||
// System.out.println("关闭这个不活跃的channel");
|
||||
// ctx.channel().close();
|
||||
// } else {
|
||||
// if ((num > size - 1) || (idle_count > 3)) {
|
||||
// System.out.println("关闭这个不活跃的channel");
|
||||
// ctx.channel().close();
|
||||
// }
|
||||
// GetReadOrder485 getReadOrder485 = new GetReadOrder485();
|
||||
// String sendStr = getReadOrder485.createChillersOrder(deviceCodeParamList.get(num));
|
||||
// ctx.channel().writeAndFlush(ExchangeStringUtil.getByteBuf(ctx, sendStr));
|
||||
// idle_count++;
|
||||
// }
|
||||
// }
|
||||
// } else {
|
||||
// super.userEventTriggered(ctx, obj);
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// @Override
|
||||
// public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
|
||||
// log.info("当前channel从EventLoop取消注册");
|
||||
// ctx.close();
|
||||
// super.channelUnregistered(ctx);
|
||||
// }
|
||||
//
|
||||
// @Override
|
||||
// public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
|
||||
//// super.exceptionCaught(ctx, cause);
|
||||
// log.info("通信异常!!");
|
||||
//// // 发送采集冷水机组指令
|
||||
//// GetReadOrder485 getReadOrder485 = new GetReadOrder485();
|
||||
//// String sendStr = getReadOrder485.createChillersOrder(chillersEntityList.get(num));
|
||||
//// // 获取采集参数个数
|
||||
//// ByteBuf buffer = getByteBuf(ctx, sendStr);
|
||||
//// // 2.发送数据
|
||||
//// ctx.channel().writeAndFlush(buffer);
|
||||
//// receiveStr = null;
|
||||
// cause.printStackTrace();
|
||||
// ctx.close();
|
||||
//// Channel incoming = ctx.channel();
|
||||
//// if (incoming.isActive()) {
|
||||
//// log.info("SimpleClient: " + incoming.remoteAddress() + "异常");
|
||||
//// cause.printStackTrace();
|
||||
//// ctx.close();
|
||||
//// receiveStr = null;
|
||||
//// try {
|
||||
//// TimeUnit.SECONDS.sleep(5);
|
||||
//// SocketAddress remoteAddress = ctx.channel().remoteAddress();
|
||||
//// String port = ExchangeStringUtil.endData(remoteAddress.toString(),":");
|
||||
//// String host = ExchangeStringUtil.splitData(remoteAddress.toString(),"/",":");
|
||||
//// NettyClient nettyClient = new NettyClient();
|
||||
//// nettyClient.connect(Integer.parseInt(port), host); // 断线重连
|
||||
//// } catch (InterruptedException e) {
|
||||
//// e.printStackTrace();
|
||||
//// }
|
||||
//// }
|
||||
// }
|
||||
//
|
||||
//
|
||||
// @Override
|
||||
// public void channelActive(ChannelHandlerContext ctx) throws Exception {
|
||||
// super.channelActive(ctx);
|
||||
// // 添加一个状态值,判断是否继续发送指令 update by ljf on 2020-08-07
|
||||
// if (Constant.WEB_FLAG) {
|
||||
// num = 0;
|
||||
// // 关闭连接
|
||||
// receiveStr = null;
|
||||
// ctx.close();
|
||||
// } else {
|
||||
// SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-mm-dd HH:mm:ss");
|
||||
// Date date = new Date();
|
||||
// log.info(ctx.channel().remoteAddress() + " " + sdf1.format(date) + "链接服务端成功!");
|
||||
// // 截取IP地址
|
||||
// String IP = ExchangeStringUtil.getMidString(ctx.channel().remoteAddress() + "", "/", ":");
|
||||
// // 截取端口号
|
||||
// String port = ExchangeStringUtil.getMidString(ctx.channel().remoteAddress() + "", ":", "");
|
||||
// log.info("IP: " + IP + ",端口号: " + port);
|
||||
// // 更新对应的网关在线情况
|
||||
// gatewayManageService.updateGatewayManage(IP, port);
|
||||
// // 生成采集指令
|
||||
// deviceCodeParamList = deviceCodeParamService.queryCodeParam(port);
|
||||
// size = deviceCodeParamList.size();
|
||||
//
|
||||
// // 发送采集冷水机组指令
|
||||
// GetReadOrder485 getReadOrder485 = new GetReadOrder485();
|
||||
// String sendStr = getReadOrder485.createChillersOrder(deviceCodeParamList.get(num));
|
||||
// // 获取采集参数个数
|
||||
// ByteBuf buffer = getByteBuf(ctx, sendStr);
|
||||
// // 发送数据
|
||||
// ctx.channel().writeAndFlush(buffer);
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// private ByteBuf getByteBuf(ChannelHandlerContext ctx, String sendStr) {
|
||||
// // byte类型的数据
|
||||
//// byte[] bytes = "这里是将要写往服务端的数据".getBytes(Charset.forName("utf-8"));
|
||||
//// String sendStr = "5803004900021914"; // 冷量计
|
||||
// // 申请一个数据结构存储信息
|
||||
// ByteBuf buffer = ctx.alloc().buffer();
|
||||
// // 将信息放入数据结构中
|
||||
// buffer.writeBytes(ExchangeStringUtil.hexStrToBinaryStr(sendStr));//对接需要16进制
|
||||
// return buffer;
|
||||
// }
|
||||
//
|
||||
// @Override
|
||||
// public void channelInactive(ChannelHandlerContext ctx) throws Exception {
|
||||
// Thread.sleep(100);
|
||||
// ctx.close();
|
||||
// log.info(ctx.channel().localAddress() + "退出链接!!");
|
||||
// }
|
||||
//
|
||||
// @Override
|
||||
// public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
||||
// try {
|
||||
// ByteBuf buf = (ByteBuf) msg;
|
||||
// byte[] bytes = new byte[buf.readableBytes()];
|
||||
// buf.readBytes(bytes);//复制内容到字节数组bytes
|
||||
// buf.clear();
|
||||
// log.info("获取到的值: " + ExchangeStringUtil.bytesToHexString(bytes));
|
||||
// if (bytes.length <= 36) {
|
||||
//// receiveStr = receiveStr.replace("null", "");
|
||||
//// receiveStr = receiveStr + ExchangeStringUtil.bytesToHexString(bytes);//将接收到的数据转为字符串,此字符串就是客户端发送的字符串
|
||||
//// log.info(ctx.channel().remoteAddress() + " " + ctx.channel().localAddress() + " 接受服务器数据:" + receiveStr + ",大小: " + receiveStr.length());
|
||||
// receiveStr = receiveStr + ExchangeStringUtil.bytesToHexString(bytes);//将接收到的数据转为字符串,此字符串就是客户端发送的字符串
|
||||
// receiveStr = receiveStr.replace("null", "");
|
||||
// log.info("接受服务器数据:" + receiveStr + ",大小: " + receiveStr.length());
|
||||
// }
|
||||
// } catch (Exception e) {
|
||||
// e.printStackTrace();
|
||||
// } finally {
|
||||
// ReferenceCountUtil.release(msg);
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// @Override
|
||||
// public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
|
||||
// log.info("冷水机组--数据读取接收完成: " + receiveStr);
|
||||
// if (receiveStr.length() == 30) {
|
||||
// log.info("采集完整的报文: " + receiveStr);
|
||||
// // 解析采集的报文,并保存到数据库
|
||||
// AnalysisReceiveOrder485 analysisReceiveOrder485 = new AnalysisReceiveOrder485();
|
||||
// analysisReceiveOrder485.analysisChillerOrder485(receiveStr,deviceCodeParamList.get(num));
|
||||
// // 清空receiveStr
|
||||
// receiveStr = "";
|
||||
// // 判断发送的下标,如果不等于指令数组大小
|
||||
// num = num + 1;
|
||||
// if (num > size - 1) {
|
||||
// num = 0;
|
||||
// // 关闭连接
|
||||
// receiveStr = null;
|
||||
// ctx.close();
|
||||
//// // 继续发送下一个采集冷水机设备指令
|
||||
//// GetReadOrder485 getReadOrder485 = new GetReadOrder485();
|
||||
//// String sendStr = getReadOrder485.createChillersOrder(chillersEntityList.get(num));
|
||||
//// ByteBuf buffer = getByteBuf(ctx, sendStr);
|
||||
//// // 发送数据
|
||||
//// ctx.channel().writeAndFlush(buffer);
|
||||
//// log.info("客户端再次往服务端发送数据" + num + " 数据条数:" + size);
|
||||
// } else {
|
||||
// // 添加一个状态值,判断是否继续发送指令 update by ljf on 2020-08-07
|
||||
// if (Constant.WEB_FLAG) {
|
||||
// num = 0;
|
||||
// // 关闭连接
|
||||
// receiveStr = null;
|
||||
// ctx.close();
|
||||
// } else {
|
||||
// Thread.sleep(1000);
|
||||
// // 继续发送下一个采集冷水机设备指令
|
||||
// GetReadOrder485 getReadOrder485 = new GetReadOrder485();
|
||||
// String sendStr = getReadOrder485.createChillersOrder(deviceCodeParamList.get(num));
|
||||
// ByteBuf buffer = getByteBuf(ctx, sendStr);
|
||||
// // 发送数据
|
||||
// ctx.channel().writeAndFlush(buffer);
|
||||
// log.info("客户端再次往服务端发送数据" + num + " 数据条数:" + size);
|
||||
// }
|
||||
// }
|
||||
// } else if (receiveStr.length() == 32) {
|
||||
// log.info("采集完整的报文: " + receiveStr);
|
||||
// // 解析采集的报文,并保存到数据库
|
||||
// AnalysisReceiveOrder485 analysisReceiveOrder485 = new AnalysisReceiveOrder485();
|
||||
// analysisReceiveOrder485.analysisChillerOrder485(receiveStr,deviceCodeParamList.get(num));
|
||||
// // 清空receiveStr
|
||||
// receiveStr = "";
|
||||
// // 判断发送的下标,如果不等于指令数组大小
|
||||
// num = num + 1;
|
||||
// if (num > size - 1) {
|
||||
// num = 0;
|
||||
// // 关闭连接
|
||||
// receiveStr = null;
|
||||
// ctx.close();
|
||||
//// // 继续发送下一个采集冷水机设备指令
|
||||
//// GetReadOrder485 getReadOrder485 = new GetReadOrder485();
|
||||
//// String sendStr = getReadOrder485.createChillersOrder(chillersEntityList.get(num));
|
||||
//// ByteBuf buffer = getByteBuf(ctx, sendStr);
|
||||
//// // 发送数据
|
||||
//// ctx.channel().writeAndFlush(buffer);
|
||||
//// log.info("客户端再次往服务端发送数据" + num + " 数据条数:" + size);
|
||||
// } else {
|
||||
// // 添加一个状态值,判断是否继续发送指令 update by ljf on 2020-08-07
|
||||
// if (Constant.WEB_FLAG) {
|
||||
// log.info("有指令下发退出定时采集冷水机组参数");
|
||||
// num = 0;
|
||||
// // 关闭连接
|
||||
// receiveStr = null;
|
||||
// ctx.close();
|
||||
// } else {
|
||||
// Thread.sleep(1000);
|
||||
// // 继续发送下一个采集冷水机设备指令
|
||||
// GetReadOrder485 getReadOrder485 = new GetReadOrder485();
|
||||
// String sendStr = getReadOrder485.createChillersOrder(deviceCodeParamList.get(num));
|
||||
// ByteBuf buffer = getByteBuf(ctx, sendStr);
|
||||
// // 2.发送数据
|
||||
// ctx.channel().writeAndFlush(buffer);
|
||||
// log.info("客户端再次往服务端发送数据" + num + " 数据条数:" + size);
|
||||
// }
|
||||
// }
|
||||
// } else if (receiveStr.length() > 36) {
|
||||
// // 清空receiveStr
|
||||
// receiveStr = null;
|
||||
// // 判断发送的下标,如果不等于指令数组大小
|
||||
// num = num + 1;
|
||||
// if (num > size - 1) {
|
||||
// num = 0;
|
||||
// // 关闭连接
|
||||
// receiveStr = null;
|
||||
// ctx.close();
|
||||
//// // 继续发送下一个采集冷水机设备指令
|
||||
//// GetReadOrder485 getReadOrder485 = new GetReadOrder485();
|
||||
//// String sendStr = getReadOrder485.createChillersOrder(chillersEntityList.get(num));
|
||||
//// ByteBuf buffer = getByteBuf(ctx, sendStr);
|
||||
//// // 发送数据
|
||||
//// ctx.channel().writeAndFlush(buffer);
|
||||
//// log.info("客户端再次往服务端发送数据" + num + " 数据条数:" + size);
|
||||
// } else {
|
||||
// // 添加一个状态值,判断是否继续发送指令 update by ljf on 2020-08-07
|
||||
// if (Constant.WEB_FLAG) {
|
||||
// num = 0;
|
||||
// // 关闭连接
|
||||
// receiveStr = null;
|
||||
// ctx.close();
|
||||
// } else {
|
||||
// Thread.sleep(1000);
|
||||
// // 继续发送下一个采集冷水机设备指令
|
||||
// GetReadOrder485 getReadOrder485 = new GetReadOrder485();
|
||||
// String sendStr = getReadOrder485.createChillersOrder(deviceCodeParamList.get(num));
|
||||
// ByteBuf buffer = getByteBuf(ctx, sendStr);
|
||||
// // 发送数据
|
||||
// ctx.channel().writeAndFlush(buffer);
|
||||
// log.info("客户端再次往服务端发送数据" + num + " 数据条数:" + size);
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// ctx.flush();
|
||||
// }
|
||||
//
|
||||
//}
|
||||
|
Loading…
Reference in new issue