diff --git a/user-service/src/main/java/com/mh/user/job/CollectionLoopRunner.java b/user-service/src/main/java/com/mh/user/job/CollectionLoopRunner.java index a6888ca..66534b8 100644 --- a/user-service/src/main/java/com/mh/user/job/CollectionLoopRunner.java +++ b/user-service/src/main/java/com/mh/user/job/CollectionLoopRunner.java @@ -3,6 +3,7 @@ package com.mh.user.job; import com.mh.user.constants.Constant; import com.mh.user.entity.AddCronJobReq; import com.mh.user.manage.QuartzManager; +import com.mh.user.netty.NettyEchoServer; import com.mh.user.serialport.SerialPortListener; import com.mh.user.serialport.SerialPortUtil; import com.mh.user.serialport.SerialTool; @@ -51,6 +52,9 @@ public class CollectionLoopRunner implements ApplicationRunner { //simulationCollection(); // 获取天气数据 getWeatherInfoJob.getWeatherInfo(); + // 启动netty端口 + NettyEchoServer nettyEchoServer = new NettyEchoServer(); + nettyEchoServer.bind(8098); } private void simulationCollection() throws Exception { diff --git a/user-service/src/main/java/com/mh/user/mapper/AreaMapper.java b/user-service/src/main/java/com/mh/user/mapper/AreaMapper.java index 9ad1ea5..9628611 100644 --- a/user-service/src/main/java/com/mh/user/mapper/AreaMapper.java +++ b/user-service/src/main/java/com/mh/user/mapper/AreaMapper.java @@ -21,7 +21,7 @@ public interface AreaMapper extends BaseMapper { @Select("select count(*) from area where area_id=#{areaId} and area_name=#{areaName}") int selectCountByAreaIdAndAreaName(String areaId, String areaName); - @Select("select count(*) from area where id= #{areaId}") + @Select("select count(*) from area") int getCount(String areaId, Integer page, Integer limit); @Select({ diff --git a/user-service/src/main/java/com/mh/user/mapper/GatewayManageMapper.java b/user-service/src/main/java/com/mh/user/mapper/GatewayManageMapper.java index db66660..a7a80d5 100644 --- a/user-service/src/main/java/com/mh/user/mapper/GatewayManageMapper.java +++ b/user-service/src/main/java/com/mh/user/mapper/GatewayManageMapper.java @@ -24,6 +24,7 @@ public interface GatewayManageMapper { */ @SelectProvider(type = GatewayManageProvider.class, method = "queryByOther") @Results(id="rs",value = { + @Result(column = "id", property = "id"), @Result(column = "gateway_name", property = "gatewayName"), @Result(column = "gateway_ip", property = "gatewayIP"), @Result(column = "gateway_address", property = "gatewayAddress"), diff --git a/user-service/src/main/java/com/mh/user/netty/handle/DataUploadServerHandler.java b/user-service/src/main/java/com/mh/user/netty/handle/DataUploadServerHandler.java index c6311e2..9c190aa 100644 --- a/user-service/src/main/java/com/mh/user/netty/handle/DataUploadServerHandler.java +++ b/user-service/src/main/java/com/mh/user/netty/handle/DataUploadServerHandler.java @@ -1,18 +1,15 @@ package com.mh.user.netty.handle; -import com.github.benmanes.caffeine.cache.Cache; -import com.mh.user.constants.FourthGEnum; +import com.mh.common.utils.StringUtils; import com.mh.user.entity.GatewayManageEntity; import com.mh.user.netty.session.ServerSession; import com.mh.user.netty.session.SessionMap; import com.mh.user.utils.CacheUtil; import com.mh.user.utils.ExchangeStringUtil; import com.mh.user.utils.NettyTools; -import com.mh.user.utils.SpringBeanUtil; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import lombok.extern.slf4j.Slf4j; -import org.springframework.context.ApplicationContext; import java.util.List; @@ -22,27 +19,32 @@ public class DataUploadServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //将接收到的数据转为字符串,此字符串就是客户端发送的字符串 - String receiveStr = ExchangeStringUtil.bytesToHexString((byte[]) msg); + String receiveStr = ExchangeStringUtil.bytesToHexString(msg); // 判断当前报文是否是上报数据报文 if (receiveStr != null) { // 判断属于哪一个DTU网关上报的数据 CacheUtil instance = CacheUtil.getInstance(); List gwList = instance.getGatewayInfo(); + String deviceCode = ""; + log.info("接收数据报文>>>>>>>>>>>>>>{}", receiveStr); if (gwList != null && !gwList.isEmpty()) { for (GatewayManageEntity gw : gwList) { - if (receiveStr.startsWith(ExchangeStringUtil.str2HexStr(gw.getSn()))) { - // 直接设置对应值 - NettyTools.setReceiveMsg(gw.getHeartBeat(), receiveStr.substring(gw.getSn().length())); - } else { - // 判断是否登录,没有登录立马断开 - String deviceCode = gw.getHeartBeat(); - if (!SessionMap.inst().hasLogin(deviceCode+ctx.channel().remoteAddress())) { - ServerSession.closeSession(ctx); - return; + if (!StringUtils.isBlank(gw.getSn())) { + String snHexStr = ExchangeStringUtil.str2HexStr(gw.getSn()); + if (receiveStr.startsWith(snHexStr)) { + // 直接设置对应值 + deviceCode = gw.getHeartBeat(); + NettyTools.setReceiveMsg(gw.getHeartBeat(), receiveStr.substring(snHexStr.length())); + break; } } } } + // 判断是否登录,没有登录立马断开 + if (!SessionMap.inst().hasLogin(deviceCode + ctx.channel().remoteAddress())) { + ServerSession.closeSession(ctx); + return; + } } super.channelRead(ctx, msg); } diff --git a/user-service/src/main/java/com/mh/user/netty/handle/ExceptionServerHandler.java b/user-service/src/main/java/com/mh/user/netty/handle/ExceptionServerHandler.java index c794f71..b0a7447 100644 --- a/user-service/src/main/java/com/mh/user/netty/handle/ExceptionServerHandler.java +++ b/user-service/src/main/java/com/mh/user/netty/handle/ExceptionServerHandler.java @@ -1,9 +1,16 @@ package com.mh.user.netty.handle; +import com.mh.user.model.SysLog; import com.mh.user.netty.session.ServerSession; +import com.mh.user.service.GatewayManageService; +import com.mh.user.service.SysLogService; +import com.mh.user.utils.SpringContextUtils; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import lombok.extern.slf4j.Slf4j; +import org.springframework.context.ApplicationContext; + +import java.util.Date; /** * @author LJF @@ -14,8 +21,9 @@ import lombok.extern.slf4j.Slf4j; */ @Slf4j public class ExceptionServerHandler extends ChannelInboundHandlerAdapter { - //private final ApplicationContext applicationContext = ApplicationContextProvider.getApplicationContext(); - //private final MeterInfoService meterInfoService = applicationContext.getBean(MeterInfoService.class); + private final ApplicationContext applicationContext = SpringContextUtils.getApplicationContext(); + private final SysLogService sysLogService = applicationContext.getBean(SysLogService.class); + private final GatewayManageService gatewayManageService = applicationContext.getBean(GatewayManageService.class); @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { @@ -45,6 +53,16 @@ public class ExceptionServerHandler extends ChannelInboundHandlerAdapter { log.info("获得已知异常!{}", cause.getMessage()); log.info("获得已知异常!", cause); ServerSession.closeSession(ctx); + // 添加到日志表,下线了 + SysLog sysLog = new SysLog(); + sysLog.setOperation(ctx.channel().remoteAddress().toString() + "下线了"); + sysLog.setCreateBy("开发者"); + sysLog.setIp(ctx.channel().remoteAddress().toString()); + sysLog.setCreateTime(new Date()); + sysLog.setTime(0L); + sysLogService.insertLog(sysLog); + // 更新网关在线情况 + gatewayManageService.updateGatewayManageOnlineByHeartBeatCode(ctx.channel().remoteAddress().toString().substring(0, 8), 1); ctx.close(); } } diff --git a/user-service/src/main/java/com/mh/user/netty/handle/FourthChannelInitializer.java b/user-service/src/main/java/com/mh/user/netty/handle/FourthChannelInitializer.java index bca94d0..f2db48b 100644 --- a/user-service/src/main/java/com/mh/user/netty/handle/FourthChannelInitializer.java +++ b/user-service/src/main/java/com/mh/user/netty/handle/FourthChannelInitializer.java @@ -23,7 +23,7 @@ public class FourthChannelInitializer extends ChannelInitializer { // 处理登录 channel.pipeline().addLast(new LoginRequestHandler()); // 处理心跳 - channel.pipeline().addLast(new HeartBeatServerHandler()); +// channel.pipeline().addLast(new HeartBeatServerHandler()); // 处理数据上报数据 channel.pipeline().addLast(new DataUploadServerHandler()); // 异常处理 diff --git a/user-service/src/main/java/com/mh/user/netty/handle/HeartBeatServerHandler.java b/user-service/src/main/java/com/mh/user/netty/handle/HeartBeatServerHandler.java index 7f0a8d5..8df0350 100644 --- a/user-service/src/main/java/com/mh/user/netty/handle/HeartBeatServerHandler.java +++ b/user-service/src/main/java/com/mh/user/netty/handle/HeartBeatServerHandler.java @@ -31,7 +31,7 @@ public class HeartBeatServerHandler extends IdleStateHandler { private final GatewayManageService gatewayManageService = applicationContext.getBean(GatewayManageService.class); //目前网关默认心跳30秒,多长时间没有心跳,就关闭连接 - private static final int READ_IDLE_GAP = 30; + private static final int READ_IDLE_GAP = 60; /** * @param readerIdleTimeSeconds 最长 没有 read到心跳的时间 @@ -58,7 +58,7 @@ public class HeartBeatServerHandler extends IdleStateHandler { public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //将接收到的数据转为字符串,此字符串就是客户端发送的字符串 - String receiveStr = ExchangeStringUtil.bytesToHexString((byte[]) msg); + String receiveStr = ExchangeStringUtil.bytesToHexString(msg); // 判断当前报文是否是登录报文 if (receiveStr != null && receiveStr.startsWith("2400")) { if (receiveStr.length() != 8) { diff --git a/user-service/src/main/java/com/mh/user/netty/handle/LoginRequestHandler.java b/user-service/src/main/java/com/mh/user/netty/handle/LoginRequestHandler.java index 8b72933..920e71d 100644 --- a/user-service/src/main/java/com/mh/user/netty/handle/LoginRequestHandler.java +++ b/user-service/src/main/java/com/mh/user/netty/handle/LoginRequestHandler.java @@ -5,10 +5,14 @@ import com.mh.user.netty.session.ServerSession; import com.mh.user.netty.session.SessionMap; import com.mh.user.netty.task.CallbackTask; import com.mh.user.netty.task.CallbackTaskScheduler; +import com.mh.user.service.GatewayManageService; +import com.mh.user.service.SysLogService; import com.mh.user.utils.ExchangeStringUtil; +import com.mh.user.utils.SpringContextUtils; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import lombok.extern.slf4j.Slf4j; +import org.springframework.context.ApplicationContext; /** * @author LJF @@ -20,16 +24,21 @@ import lombok.extern.slf4j.Slf4j; @Slf4j public class LoginRequestHandler extends ChannelInboundHandlerAdapter { + private final ApplicationContext applicationContext = SpringContextUtils.getApplicationContext(); + private final GatewayManageService gatewayManageService = applicationContext.getBean(GatewayManageService.class); + @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //将接收到的数据转为字符串,此字符串就是客户端发送的字符串 - String receiveStr = ExchangeStringUtil.bytesToHexString((byte[]) msg); - if (StringUtils.isBlank(receiveStr)) { + String receiveStr = ExchangeStringUtil.bytesToHexString(msg); + if (StringUtils.isBlank(receiveStr) || !receiveStr.startsWith("2400")) { super.channelRead(ctx, msg); return; } // 判断当前报文是否是心跳包上线: 869530073040186 log.info("接收到的心跳报文 <== {}", receiveStr); + // 通过对应的心跳包码进行判断,然后更新网关在线情况 + gatewayManageService.updateGatewayManageOnlineByHeartBeatCode(receiveStr, 0); // 获取IMEI号 String deviceCode = receiveStr; String meterNum = deviceCode; diff --git a/user-service/src/main/java/com/mh/user/provide/GatewayManageProvider.java b/user-service/src/main/java/com/mh/user/provide/GatewayManageProvider.java index 5006395..ccd7a4a 100644 --- a/user-service/src/main/java/com/mh/user/provide/GatewayManageProvider.java +++ b/user-service/src/main/java/com/mh/user/provide/GatewayManageProvider.java @@ -61,7 +61,7 @@ public class GatewayManageProvider { public String queryByOther(Map params) { StringBuffer sqlStr = new StringBuffer(); sqlStr.append("select id, gateway_name, gateway_ip, gateway_address, data_com, " + - "create_date, connect_date, internet_card, operator, gateway_port, grade from gateway_manage where 1=1 "); + "create_date, connect_date, internet_card, operator, gateway_port, grade, remarks, heart_beat,imei,sn,community_type from gateway_manage where 1=1 "); if(params.get("grade") != null){ sqlStr.append(" and grade = #{grade}"); } diff --git a/user-service/src/main/java/com/mh/user/serialport/SerialPortSingle2.java b/user-service/src/main/java/com/mh/user/serialport/SerialPortSingle2.java index 9ebbb78..19c0ea1 100644 --- a/user-service/src/main/java/com/mh/user/serialport/SerialPortSingle2.java +++ b/user-service/src/main/java/com/mh/user/serialport/SerialPortSingle2.java @@ -46,8 +46,11 @@ public class SerialPortSingle2 { List gwList = instance.getGatewayInfo(); if (gwList != null && !gwList.isEmpty()) { for (GatewayManageEntity gw : gwList) { - if (gw.getDataCom().toUpperCase().equals(deviceCodeParamEntity.getDataCom().toUpperCase())) { + if (!StringUtils.isBlank(gw.getDataCom()) && gw.getDataCom().toUpperCase().equals(deviceCodeParamEntity.getDataCom().toUpperCase())) { String communityType = gw.getCommunityType(); + if (StringUtils.isBlank(communityType)) { + continue; + } if (Constant.COMMUNITY_TYPE_TCP.equals(communityType)) { TcpSingle tcpSingle = new TcpSingle(); return tcpSingle.serialPortSend(deviceCodeParamEntity, gw); diff --git a/user-service/src/main/java/com/mh/user/sqlmapper/SysLogMapper.xml b/user-service/src/main/java/com/mh/user/sqlmapper/SysLogMapper.xml index eeaee5b..4bd9491 100644 --- a/user-service/src/main/java/com/mh/user/sqlmapper/SysLogMapper.xml +++ b/user-service/src/main/java/com/mh/user/sqlmapper/SysLogMapper.xml @@ -32,11 +32,11 @@ where id = #{id,jdbcType=BIGINT} - insert into sys_log (id, user_name, operation, + insert into sys_log (user_name, operation, method, params, time, ip, create_by, create_time, last_update_by, last_update_time,login_time,login_state,opt_desc) - values (#{id,jdbcType=BIGINT}, #{userName,jdbcType=VARCHAR}, #{operation,jdbcType=VARCHAR}, + values (#{userName,jdbcType=VARCHAR}, #{operation,jdbcType=VARCHAR}, #{method,jdbcType=VARCHAR}, #{params,jdbcType=VARCHAR}, #{time,jdbcType=BIGINT}, #{ip,jdbcType=VARCHAR}, #{createBy,jdbcType=BIGINT}, #{createTime,jdbcType=TIMESTAMP}, #{lastUpdateBy,jdbcType=BIGINT}, #{lastUpdateTime,jdbcType=TIMESTAMP}, diff --git a/user-service/src/main/java/com/mh/user/utils/ExchangeStringUtil.java b/user-service/src/main/java/com/mh/user/utils/ExchangeStringUtil.java index 40d9479..dbf4950 100644 --- a/user-service/src/main/java/com/mh/user/utils/ExchangeStringUtil.java +++ b/user-service/src/main/java/com/mh/user/utils/ExchangeStringUtil.java @@ -1,6 +1,8 @@ package com.mh.user.utils; import com.mh.common.utils.StringUtils; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; //import io.netty.buffer.ByteBuf; //import io.netty.channel.ChannelHandlerContext; import java.io.*; @@ -386,6 +388,31 @@ public class ExchangeStringUtil { return sbf.toString().trim(); } + public static String bytesToHexString(Object msg){ + ByteBuf byteBuf = (ByteBuf)msg; + ByteBuf buf = Unpooled.copiedBuffer(byteBuf); + byte [] src = new byte[buf.readableBytes()]; + //复制内容到字节数组bytes + buf.readBytes(src); + StringBuilder stringBuilder = new StringBuilder(); + if (src == null || src.length <= 0) { + return null; + } + for (int i = 0; i < src.length; i++) { + int v = src[i] & 0xFF; + String hv = Integer.toHexString(v); + if (hv.length() < 2) { + stringBuilder.append(0); + } + stringBuilder.append(hv); + } + String rawMessage = stringBuilder.toString().toUpperCase(); +// int first68Index = rawMessage.indexOf("68"); +// if (first68Index == -1){ +// } + return rawMessage; + } + public static String bytesToHexString(byte[] src){ StringBuilder stringBuilder = new StringBuilder(""); if (src == null || src.length <= 0) { diff --git a/user-service/src/main/java/com/mh/user/utils/GetReadOrder485.java b/user-service/src/main/java/com/mh/user/utils/GetReadOrder485.java index 4a62a16..9f4164a 100644 --- a/user-service/src/main/java/com/mh/user/utils/GetReadOrder485.java +++ b/user-service/src/main/java/com/mh/user/utils/GetReadOrder485.java @@ -5,6 +5,7 @@ import com.mh.user.constants.Constant; import com.mh.user.entity.*; import com.mh.user.service.DeviceCodeParamService; import com.mh.user.service.DeviceInstallService; +import com.mh.user.service.GatewayManageService; import com.mh.user.service.impl.DeviceDisplayServiceImpl; import lombok.extern.slf4j.Slf4j; import org.springframework.context.ApplicationContext; @@ -24,7 +25,6 @@ public class GetReadOrder485 { // 调用service ApplicationContext context = SpringBeanUtil.getApplicationContext(); - DeviceDisplayServiceImpl.GatewayManageService gatewayManageService = context.getBean(DeviceDisplayServiceImpl.GatewayManageService.class); DeviceInstallService deviceInstallService = context.getBean(DeviceInstallService.class); DeviceCodeParamService deviceCodeParamService = context.getBean(DeviceCodeParamService.class);