Browse Source

1、测试优化Netty-TCP通信;

dev
25604 2 weeks ago
parent
commit
6533353635
  1. 4
      user-service/src/main/java/com/mh/user/job/CollectionLoopRunner.java
  2. 2
      user-service/src/main/java/com/mh/user/mapper/AreaMapper.java
  3. 1
      user-service/src/main/java/com/mh/user/mapper/GatewayManageMapper.java
  4. 26
      user-service/src/main/java/com/mh/user/netty/handle/DataUploadServerHandler.java
  5. 22
      user-service/src/main/java/com/mh/user/netty/handle/ExceptionServerHandler.java
  6. 2
      user-service/src/main/java/com/mh/user/netty/handle/FourthChannelInitializer.java
  7. 4
      user-service/src/main/java/com/mh/user/netty/handle/HeartBeatServerHandler.java
  8. 13
      user-service/src/main/java/com/mh/user/netty/handle/LoginRequestHandler.java
  9. 2
      user-service/src/main/java/com/mh/user/provide/GatewayManageProvider.java
  10. 5
      user-service/src/main/java/com/mh/user/serialport/SerialPortSingle2.java
  11. 4
      user-service/src/main/java/com/mh/user/sqlmapper/SysLogMapper.xml
  12. 27
      user-service/src/main/java/com/mh/user/utils/ExchangeStringUtil.java
  13. 2
      user-service/src/main/java/com/mh/user/utils/GetReadOrder485.java

4
user-service/src/main/java/com/mh/user/job/CollectionLoopRunner.java

@ -3,6 +3,7 @@ package com.mh.user.job;
import com.mh.user.constants.Constant; import com.mh.user.constants.Constant;
import com.mh.user.entity.AddCronJobReq; import com.mh.user.entity.AddCronJobReq;
import com.mh.user.manage.QuartzManager; import com.mh.user.manage.QuartzManager;
import com.mh.user.netty.NettyEchoServer;
import com.mh.user.serialport.SerialPortListener; import com.mh.user.serialport.SerialPortListener;
import com.mh.user.serialport.SerialPortUtil; import com.mh.user.serialport.SerialPortUtil;
import com.mh.user.serialport.SerialTool; import com.mh.user.serialport.SerialTool;
@ -51,6 +52,9 @@ public class CollectionLoopRunner implements ApplicationRunner {
//simulationCollection(); //simulationCollection();
// 获取天气数据 // 获取天气数据
getWeatherInfoJob.getWeatherInfo(); getWeatherInfoJob.getWeatherInfo();
// 启动netty端口
NettyEchoServer nettyEchoServer = new NettyEchoServer();
nettyEchoServer.bind(8098);
} }
private void simulationCollection() throws Exception { private void simulationCollection() throws Exception {

2
user-service/src/main/java/com/mh/user/mapper/AreaMapper.java

@ -21,7 +21,7 @@ public interface AreaMapper extends BaseMapper<AreaEntity> {
@Select("select count(*) from area where area_id=#{areaId} and area_name=#{areaName}") @Select("select count(*) from area where area_id=#{areaId} and area_name=#{areaName}")
int selectCountByAreaIdAndAreaName(String areaId, String areaName); int selectCountByAreaIdAndAreaName(String areaId, String areaName);
@Select("select count(*) from area where id= #{areaId}") @Select("select count(*) from area")
int getCount(String areaId, Integer page, Integer limit); int getCount(String areaId, Integer page, Integer limit);
@Select({ @Select({

1
user-service/src/main/java/com/mh/user/mapper/GatewayManageMapper.java

@ -24,6 +24,7 @@ public interface GatewayManageMapper {
*/ */
@SelectProvider(type = GatewayManageProvider.class, method = "queryByOther") @SelectProvider(type = GatewayManageProvider.class, method = "queryByOther")
@Results(id="rs",value = { @Results(id="rs",value = {
@Result(column = "id", property = "id"),
@Result(column = "gateway_name", property = "gatewayName"), @Result(column = "gateway_name", property = "gatewayName"),
@Result(column = "gateway_ip", property = "gatewayIP"), @Result(column = "gateway_ip", property = "gatewayIP"),
@Result(column = "gateway_address", property = "gatewayAddress"), @Result(column = "gateway_address", property = "gatewayAddress"),

26
user-service/src/main/java/com/mh/user/netty/handle/DataUploadServerHandler.java

@ -1,18 +1,15 @@
package com.mh.user.netty.handle; package com.mh.user.netty.handle;
import com.github.benmanes.caffeine.cache.Cache; import com.mh.common.utils.StringUtils;
import com.mh.user.constants.FourthGEnum;
import com.mh.user.entity.GatewayManageEntity; import com.mh.user.entity.GatewayManageEntity;
import com.mh.user.netty.session.ServerSession; import com.mh.user.netty.session.ServerSession;
import com.mh.user.netty.session.SessionMap; import com.mh.user.netty.session.SessionMap;
import com.mh.user.utils.CacheUtil; import com.mh.user.utils.CacheUtil;
import com.mh.user.utils.ExchangeStringUtil; import com.mh.user.utils.ExchangeStringUtil;
import com.mh.user.utils.NettyTools; import com.mh.user.utils.NettyTools;
import com.mh.user.utils.SpringBeanUtil;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationContext;
import java.util.List; import java.util.List;
@ -22,28 +19,33 @@ public class DataUploadServerHandler extends ChannelInboundHandlerAdapter {
@Override @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//将接收到的数据转为字符串,此字符串就是客户端发送的字符串 //将接收到的数据转为字符串,此字符串就是客户端发送的字符串
String receiveStr = ExchangeStringUtil.bytesToHexString((byte[]) msg); String receiveStr = ExchangeStringUtil.bytesToHexString(msg);
// 判断当前报文是否是上报数据报文 // 判断当前报文是否是上报数据报文
if (receiveStr != null) { if (receiveStr != null) {
// 判断属于哪一个DTU网关上报的数据 // 判断属于哪一个DTU网关上报的数据
CacheUtil instance = CacheUtil.getInstance(); CacheUtil instance = CacheUtil.getInstance();
List<GatewayManageEntity> gwList = instance.getGatewayInfo(); List<GatewayManageEntity> gwList = instance.getGatewayInfo();
String deviceCode = "";
log.info("接收数据报文>>>>>>>>>>>>>>{}", receiveStr);
if (gwList != null && !gwList.isEmpty()) { if (gwList != null && !gwList.isEmpty()) {
for (GatewayManageEntity gw : gwList) { for (GatewayManageEntity gw : gwList) {
if (receiveStr.startsWith(ExchangeStringUtil.str2HexStr(gw.getSn()))) { if (!StringUtils.isBlank(gw.getSn())) {
String snHexStr = ExchangeStringUtil.str2HexStr(gw.getSn());
if (receiveStr.startsWith(snHexStr)) {
// 直接设置对应值 // 直接设置对应值
NettyTools.setReceiveMsg(gw.getHeartBeat(), receiveStr.substring(gw.getSn().length())); deviceCode = gw.getHeartBeat();
} else { NettyTools.setReceiveMsg(gw.getHeartBeat(), receiveStr.substring(snHexStr.length()));
break;
}
}
}
}
// 判断是否登录,没有登录立马断开 // 判断是否登录,没有登录立马断开
String deviceCode = gw.getHeartBeat();
if (!SessionMap.inst().hasLogin(deviceCode + ctx.channel().remoteAddress())) { if (!SessionMap.inst().hasLogin(deviceCode + ctx.channel().remoteAddress())) {
ServerSession.closeSession(ctx); ServerSession.closeSession(ctx);
return; return;
} }
} }
}
}
}
super.channelRead(ctx, msg); super.channelRead(ctx, msg);
} }

22
user-service/src/main/java/com/mh/user/netty/handle/ExceptionServerHandler.java

@ -1,9 +1,16 @@
package com.mh.user.netty.handle; package com.mh.user.netty.handle;
import com.mh.user.model.SysLog;
import com.mh.user.netty.session.ServerSession; import com.mh.user.netty.session.ServerSession;
import com.mh.user.service.GatewayManageService;
import com.mh.user.service.SysLogService;
import com.mh.user.utils.SpringContextUtils;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationContext;
import java.util.Date;
/** /**
* @author LJF * @author LJF
@ -14,8 +21,9 @@ import lombok.extern.slf4j.Slf4j;
*/ */
@Slf4j @Slf4j
public class ExceptionServerHandler extends ChannelInboundHandlerAdapter { public class ExceptionServerHandler extends ChannelInboundHandlerAdapter {
//private final ApplicationContext applicationContext = ApplicationContextProvider.getApplicationContext(); private final ApplicationContext applicationContext = SpringContextUtils.getApplicationContext();
//private final MeterInfoService meterInfoService = applicationContext.getBean(MeterInfoService.class); private final SysLogService sysLogService = applicationContext.getBean(SysLogService.class);
private final GatewayManageService gatewayManageService = applicationContext.getBean(GatewayManageService.class);
@Override @Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
@ -45,6 +53,16 @@ public class ExceptionServerHandler extends ChannelInboundHandlerAdapter {
log.info("获得已知异常!{}", cause.getMessage()); log.info("获得已知异常!{}", cause.getMessage());
log.info("获得已知异常!", cause); log.info("获得已知异常!", cause);
ServerSession.closeSession(ctx); ServerSession.closeSession(ctx);
// 添加到日志表,下线了
SysLog sysLog = new SysLog();
sysLog.setOperation(ctx.channel().remoteAddress().toString() + "下线了");
sysLog.setCreateBy("开发者");
sysLog.setIp(ctx.channel().remoteAddress().toString());
sysLog.setCreateTime(new Date());
sysLog.setTime(0L);
sysLogService.insertLog(sysLog);
// 更新网关在线情况
gatewayManageService.updateGatewayManageOnlineByHeartBeatCode(ctx.channel().remoteAddress().toString().substring(0, 8), 1);
ctx.close(); ctx.close();
} }
} }

2
user-service/src/main/java/com/mh/user/netty/handle/FourthChannelInitializer.java

@ -23,7 +23,7 @@ public class FourthChannelInitializer extends ChannelInitializer<Channel> {
// 处理登录 // 处理登录
channel.pipeline().addLast(new LoginRequestHandler()); channel.pipeline().addLast(new LoginRequestHandler());
// 处理心跳 // 处理心跳
channel.pipeline().addLast(new HeartBeatServerHandler()); // channel.pipeline().addLast(new HeartBeatServerHandler());
// 处理数据上报数据 // 处理数据上报数据
channel.pipeline().addLast(new DataUploadServerHandler()); channel.pipeline().addLast(new DataUploadServerHandler());
// 异常处理 // 异常处理

4
user-service/src/main/java/com/mh/user/netty/handle/HeartBeatServerHandler.java

@ -31,7 +31,7 @@ public class HeartBeatServerHandler extends IdleStateHandler {
private final GatewayManageService gatewayManageService = applicationContext.getBean(GatewayManageService.class); private final GatewayManageService gatewayManageService = applicationContext.getBean(GatewayManageService.class);
//目前网关默认心跳30秒,多长时间没有心跳,就关闭连接 //目前网关默认心跳30秒,多长时间没有心跳,就关闭连接
private static final int READ_IDLE_GAP = 30; private static final int READ_IDLE_GAP = 60;
/** /**
* @param readerIdleTimeSeconds 最长 没有 read到心跳的时间 * @param readerIdleTimeSeconds 最长 没有 read到心跳的时间
@ -58,7 +58,7 @@ public class HeartBeatServerHandler extends IdleStateHandler {
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//将接收到的数据转为字符串,此字符串就是客户端发送的字符串 //将接收到的数据转为字符串,此字符串就是客户端发送的字符串
String receiveStr = ExchangeStringUtil.bytesToHexString((byte[]) msg); String receiveStr = ExchangeStringUtil.bytesToHexString(msg);
// 判断当前报文是否是登录报文 // 判断当前报文是否是登录报文
if (receiveStr != null && receiveStr.startsWith("2400")) { if (receiveStr != null && receiveStr.startsWith("2400")) {
if (receiveStr.length() != 8) { if (receiveStr.length() != 8) {

13
user-service/src/main/java/com/mh/user/netty/handle/LoginRequestHandler.java

@ -5,10 +5,14 @@ import com.mh.user.netty.session.ServerSession;
import com.mh.user.netty.session.SessionMap; import com.mh.user.netty.session.SessionMap;
import com.mh.user.netty.task.CallbackTask; import com.mh.user.netty.task.CallbackTask;
import com.mh.user.netty.task.CallbackTaskScheduler; import com.mh.user.netty.task.CallbackTaskScheduler;
import com.mh.user.service.GatewayManageService;
import com.mh.user.service.SysLogService;
import com.mh.user.utils.ExchangeStringUtil; import com.mh.user.utils.ExchangeStringUtil;
import com.mh.user.utils.SpringContextUtils;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationContext;
/** /**
* @author LJF * @author LJF
@ -20,16 +24,21 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j @Slf4j
public class LoginRequestHandler extends ChannelInboundHandlerAdapter { public class LoginRequestHandler extends ChannelInboundHandlerAdapter {
private final ApplicationContext applicationContext = SpringContextUtils.getApplicationContext();
private final GatewayManageService gatewayManageService = applicationContext.getBean(GatewayManageService.class);
@Override @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//将接收到的数据转为字符串,此字符串就是客户端发送的字符串 //将接收到的数据转为字符串,此字符串就是客户端发送的字符串
String receiveStr = ExchangeStringUtil.bytesToHexString((byte[]) msg); String receiveStr = ExchangeStringUtil.bytesToHexString(msg);
if (StringUtils.isBlank(receiveStr)) { if (StringUtils.isBlank(receiveStr) || !receiveStr.startsWith("2400")) {
super.channelRead(ctx, msg); super.channelRead(ctx, msg);
return; return;
} }
// 判断当前报文是否是心跳包上线: 869530073040186 // 判断当前报文是否是心跳包上线: 869530073040186
log.info("接收到的心跳报文 <== {}", receiveStr); log.info("接收到的心跳报文 <== {}", receiveStr);
// 通过对应的心跳包码进行判断,然后更新网关在线情况
gatewayManageService.updateGatewayManageOnlineByHeartBeatCode(receiveStr, 0);
// 获取IMEI号 // 获取IMEI号
String deviceCode = receiveStr; String deviceCode = receiveStr;
String meterNum = deviceCode; String meterNum = deviceCode;

2
user-service/src/main/java/com/mh/user/provide/GatewayManageProvider.java

@ -61,7 +61,7 @@ public class GatewayManageProvider {
public String queryByOther(Map<?,?> params) { public String queryByOther(Map<?,?> params) {
StringBuffer sqlStr = new StringBuffer(); StringBuffer sqlStr = new StringBuffer();
sqlStr.append("select id, gateway_name, gateway_ip, gateway_address, data_com, " + sqlStr.append("select id, gateway_name, gateway_ip, gateway_address, data_com, " +
"create_date, connect_date, internet_card, operator, gateway_port, grade from gateway_manage where 1=1 "); "create_date, connect_date, internet_card, operator, gateway_port, grade, remarks, heart_beat,imei,sn,community_type from gateway_manage where 1=1 ");
if(params.get("grade") != null){ if(params.get("grade") != null){
sqlStr.append(" and grade = #{grade}"); sqlStr.append(" and grade = #{grade}");
} }

5
user-service/src/main/java/com/mh/user/serialport/SerialPortSingle2.java

@ -46,8 +46,11 @@ public class SerialPortSingle2 {
List<GatewayManageEntity> gwList = instance.getGatewayInfo(); List<GatewayManageEntity> gwList = instance.getGatewayInfo();
if (gwList != null && !gwList.isEmpty()) { if (gwList != null && !gwList.isEmpty()) {
for (GatewayManageEntity gw : gwList) { for (GatewayManageEntity gw : gwList) {
if (gw.getDataCom().toUpperCase().equals(deviceCodeParamEntity.getDataCom().toUpperCase())) { if (!StringUtils.isBlank(gw.getDataCom()) && gw.getDataCom().toUpperCase().equals(deviceCodeParamEntity.getDataCom().toUpperCase())) {
String communityType = gw.getCommunityType(); String communityType = gw.getCommunityType();
if (StringUtils.isBlank(communityType)) {
continue;
}
if (Constant.COMMUNITY_TYPE_TCP.equals(communityType)) { if (Constant.COMMUNITY_TYPE_TCP.equals(communityType)) {
TcpSingle tcpSingle = new TcpSingle(); TcpSingle tcpSingle = new TcpSingle();
return tcpSingle.serialPortSend(deviceCodeParamEntity, gw); return tcpSingle.serialPortSend(deviceCodeParamEntity, gw);

4
user-service/src/main/java/com/mh/user/sqlmapper/SysLogMapper.xml

@ -32,11 +32,11 @@
where id = #{id,jdbcType=BIGINT} where id = #{id,jdbcType=BIGINT}
</delete> </delete>
<insert id="insert" parameterType="com.mh.user.model.SysLog"> <insert id="insert" parameterType="com.mh.user.model.SysLog">
insert into sys_log (id, user_name, operation, insert into sys_log (user_name, operation,
method, params, time, method, params, time,
ip, create_by, create_time, ip, create_by, create_time,
last_update_by, last_update_time,login_time,login_state,opt_desc) last_update_by, last_update_time,login_time,login_state,opt_desc)
values (#{id,jdbcType=BIGINT}, #{userName,jdbcType=VARCHAR}, #{operation,jdbcType=VARCHAR}, values (#{userName,jdbcType=VARCHAR}, #{operation,jdbcType=VARCHAR},
#{method,jdbcType=VARCHAR}, #{params,jdbcType=VARCHAR}, #{time,jdbcType=BIGINT}, #{method,jdbcType=VARCHAR}, #{params,jdbcType=VARCHAR}, #{time,jdbcType=BIGINT},
#{ip,jdbcType=VARCHAR}, #{createBy,jdbcType=BIGINT}, #{createTime,jdbcType=TIMESTAMP}, #{ip,jdbcType=VARCHAR}, #{createBy,jdbcType=BIGINT}, #{createTime,jdbcType=TIMESTAMP},
#{lastUpdateBy,jdbcType=BIGINT}, #{lastUpdateTime,jdbcType=TIMESTAMP}, #{lastUpdateBy,jdbcType=BIGINT}, #{lastUpdateTime,jdbcType=TIMESTAMP},

27
user-service/src/main/java/com/mh/user/utils/ExchangeStringUtil.java

@ -1,6 +1,8 @@
package com.mh.user.utils; package com.mh.user.utils;
import com.mh.common.utils.StringUtils; import com.mh.common.utils.StringUtils;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
//import io.netty.buffer.ByteBuf; //import io.netty.buffer.ByteBuf;
//import io.netty.channel.ChannelHandlerContext; //import io.netty.channel.ChannelHandlerContext;
import java.io.*; import java.io.*;
@ -386,6 +388,31 @@ public class ExchangeStringUtil {
return sbf.toString().trim(); return sbf.toString().trim();
} }
public static String bytesToHexString(Object msg){
ByteBuf byteBuf = (ByteBuf)msg;
ByteBuf buf = Unpooled.copiedBuffer(byteBuf);
byte [] src = new byte[buf.readableBytes()];
//复制内容到字节数组bytes
buf.readBytes(src);
StringBuilder stringBuilder = new StringBuilder();
if (src == null || src.length <= 0) {
return null;
}
for (int i = 0; i < src.length; i++) {
int v = src[i] & 0xFF;
String hv = Integer.toHexString(v);
if (hv.length() < 2) {
stringBuilder.append(0);
}
stringBuilder.append(hv);
}
String rawMessage = stringBuilder.toString().toUpperCase();
// int first68Index = rawMessage.indexOf("68");
// if (first68Index == -1){
// }
return rawMessage;
}
public static String bytesToHexString(byte[] src){ public static String bytesToHexString(byte[] src){
StringBuilder stringBuilder = new StringBuilder(""); StringBuilder stringBuilder = new StringBuilder("");
if (src == null || src.length <= 0) { if (src == null || src.length <= 0) {

2
user-service/src/main/java/com/mh/user/utils/GetReadOrder485.java

@ -5,6 +5,7 @@ import com.mh.user.constants.Constant;
import com.mh.user.entity.*; import com.mh.user.entity.*;
import com.mh.user.service.DeviceCodeParamService; import com.mh.user.service.DeviceCodeParamService;
import com.mh.user.service.DeviceInstallService; import com.mh.user.service.DeviceInstallService;
import com.mh.user.service.GatewayManageService;
import com.mh.user.service.impl.DeviceDisplayServiceImpl; import com.mh.user.service.impl.DeviceDisplayServiceImpl;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContext;
@ -24,7 +25,6 @@ public class GetReadOrder485 {
// 调用service // 调用service
ApplicationContext context = SpringBeanUtil.getApplicationContext(); ApplicationContext context = SpringBeanUtil.getApplicationContext();
DeviceDisplayServiceImpl.GatewayManageService gatewayManageService = context.getBean(DeviceDisplayServiceImpl.GatewayManageService.class);
DeviceInstallService deviceInstallService = context.getBean(DeviceInstallService.class); DeviceInstallService deviceInstallService = context.getBean(DeviceInstallService.class);
DeviceCodeParamService deviceCodeParamService = context.getBean(DeviceCodeParamService.class); DeviceCodeParamService deviceCodeParamService = context.getBean(DeviceCodeParamService.class);

Loading…
Cancel
Save