From 5ea52c5c822aba0064357cc9cbe8179c1fc852e5 Mon Sep 17 00:00:00 2001 From: 25604 Date: Wed, 12 Nov 2025 18:31:26 +0800 Subject: [PATCH] =?UTF-8?q?1=E3=80=81=E4=B8=AD=E5=A4=AE=E7=83=AD=E6=B0=B4?= =?UTF-8?q?=E6=8E=A5=E5=85=A5=E9=80=9A=E8=BF=87Netty-TCPServer=E9=80=9A?= =?UTF-8?q?=E4=BF=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- user-service/pom.xml | 7 + .../java/com/mh/user/constants/Constant.java | 2 + .../com/mh/user/constants/FourthGEnum.java | 111 +++++++ .../controller/GatewayManageController.java | 3 +- .../mh/user/entity/GatewayManageEntity.java | 6 + .../com/mh/user/job/JobCloudAndMeter.java | 306 +++++++++--------- .../mh/user/mapper/GatewayManageMapper.java | 14 + .../com/mh/user/netty/NettyEchoServer.java | 53 +++ .../netty/decoder/CustomFrameEncoder.java | 63 ++++ .../netty/handle/DataUploadServerHandler.java | 50 +++ .../netty/handle/ExceptionServerHandler.java | 50 +++ .../handle/FourthChannelInitializer.java | 34 ++ .../netty/handle/HeartBeatServerHandler.java | 102 ++++++ .../netty/handle/LoginRequestHandler.java | 101 ++++++ .../mh/user/netty/session/ServerSession.java | 66 ++++ .../com/mh/user/netty/session/SessionMap.java | 96 ++++++ .../com/mh/user/netty/task/CallbackTask.java | 20 ++ .../netty/task/CallbackTaskScheduler.java | 80 +++++ .../com/mh/user/netty/task/ExecuteTask.java | 6 + .../user/netty/task/FutureTaskScheduler.java | 67 ++++ .../user/serialport/SendAndReceiveByCom.java | 19 +- .../serialport/SerialPortSendReceive2.java | 15 +- .../mh/user/serialport/SerialPortSingle2.java | 22 ++ .../mh/user/serialport/SerialPortThread.java | 42 ++- .../mh/user/service/GatewayManageService.java | 3 + .../com/mh/user/service/SysLogService.java | 2 + .../impl/DeviceDisplayServiceImpl.java | 92 +++--- .../impl/GatewayManageServiceImpl.java | 13 +- .../user/service/impl/SysLogServiceImpl.java | 5 + .../com/mh/user/tcp/SendAndReceiveByTcp.java | 204 ++++++++++++ .../main/java/com/mh/user/tcp/TcpSingle.java | 126 ++++++++ .../java/com/mh/user/utils/CacheUtil.java | 25 ++ .../java/com/mh/user/utils/NettyTools.java | 101 ++++++ 33 files changed, 1670 insertions(+), 236 deletions(-) create mode 100644 user-service/src/main/java/com/mh/user/constants/FourthGEnum.java create mode 100644 user-service/src/main/java/com/mh/user/netty/NettyEchoServer.java create mode 100644 user-service/src/main/java/com/mh/user/netty/decoder/CustomFrameEncoder.java create mode 100644 user-service/src/main/java/com/mh/user/netty/handle/DataUploadServerHandler.java create mode 100644 user-service/src/main/java/com/mh/user/netty/handle/ExceptionServerHandler.java create mode 100644 user-service/src/main/java/com/mh/user/netty/handle/FourthChannelInitializer.java create mode 100644 user-service/src/main/java/com/mh/user/netty/handle/HeartBeatServerHandler.java create mode 100644 user-service/src/main/java/com/mh/user/netty/handle/LoginRequestHandler.java create mode 100644 user-service/src/main/java/com/mh/user/netty/session/ServerSession.java create mode 100644 user-service/src/main/java/com/mh/user/netty/session/SessionMap.java create mode 100644 user-service/src/main/java/com/mh/user/netty/task/CallbackTask.java create mode 100644 user-service/src/main/java/com/mh/user/netty/task/CallbackTaskScheduler.java create mode 100644 user-service/src/main/java/com/mh/user/netty/task/ExecuteTask.java create mode 100644 user-service/src/main/java/com/mh/user/netty/task/FutureTaskScheduler.java create mode 100644 user-service/src/main/java/com/mh/user/tcp/SendAndReceiveByTcp.java create mode 100644 user-service/src/main/java/com/mh/user/tcp/TcpSingle.java create mode 100644 user-service/src/main/java/com/mh/user/utils/NettyTools.java diff --git a/user-service/pom.xml b/user-service/pom.xml index 1903c1f..c706553 100644 --- a/user-service/pom.xml +++ b/user-service/pom.xml @@ -173,6 +173,13 @@ 1.0.0 + + + io.netty + netty-all + 4.1.86.Final + + diff --git a/user-service/src/main/java/com/mh/user/constants/Constant.java b/user-service/src/main/java/com/mh/user/constants/Constant.java index b878034..b35f342 100644 --- a/user-service/src/main/java/com/mh/user/constants/Constant.java +++ b/user-service/src/main/java/com/mh/user/constants/Constant.java @@ -13,6 +13,8 @@ public class Constant { public static final CharSequence CUSTOM_NAME_GUANGSHANG = "广州商学院"; public static final CharSequence CUSTOM_NAME_HUARUAN = "广州软件学院"; public static final String WEATHER_DATA = "weather_data"; + public static final String COMMUNITY_TYPE_REAL_COM = "realCom"; + public static final String COMMUNITY_TYPE_TCP = "tcp"; public static boolean CONTROL_WEB_FLAG = false; public static boolean SEND_STATUS = false; // 指令发送状态 public static volatile boolean FLAG = false; diff --git a/user-service/src/main/java/com/mh/user/constants/FourthGEnum.java b/user-service/src/main/java/com/mh/user/constants/FourthGEnum.java new file mode 100644 index 0000000..add3d7f --- /dev/null +++ b/user-service/src/main/java/com/mh/user/constants/FourthGEnum.java @@ -0,0 +1,111 @@ +package com.mh.user.constants; + +/** + * @author LJF + * @version 1.0 + * @project TAD_Server + * @description 4G设备常量值 + * @date 2023/7/3 18:14:12 + */ +public enum FourthGEnum { + + LOGIN_HEART_RESPONSE("0D", "登录和心跳确认控制码"), + + LOGIN_HEART_REQUEST("8D", "登录和心跳请求控制码"), + + LOGIN_HEART_EXCEPTION("CD", "登录和心跳异常应答控制码"), + + LOGIN_TYPE("02", "登录类型"), + + HEART_TYPE("01", "心跳类型"), + + CLOSE_CONTROL_TYPE("1C", "合闸指令"), + + OPEN_CONTROL_TYPE("1A", "跳闸指令"), + + DATA_ACTIVE_UPLOAD("8E", "数据主动上报控制码"), + + PULL_AND_CLOSE_DEVICE_REQUEST("1C", "跳合闸请求控制码"), + + PULL_AND_CLOSE_DEVICE_RESPONSE("9C", "跳合闸应答控制码"), + + PULL_AND_CLOSE_DEVICE_EXCEPTION("DC", "跳合闸应答异常控制码"), + + CLEAR_DEVICE_REQUEST("1A", "电表清零请求控制码"), + + CLEAR_DEVICE_RESPONSE("9A", "电表清零应答控制码"), + + CLEAR_DEVICE_EXCEPTION("DA", "电表清零应答异常控制码"), + + READ_DEVICE_REQUEST("11", "读电表请求控制码"), + + READ_DEVICE_RESPONSE("91", "读电表应答控制码"), + + READ_DEVICE_EXCEPTION("D1", "读电表应答异常控制码"), + + WRITE_DEVICE_REQUEST("14", "写电表请求控制码"), + + WRITE_DEVICE_RESPONSE("94", "读电表应答控制码"), + WRITE_DEVICE_IP("94", "写电表IP应答控制码"), + + WRITE_DEVICE_EXCEPTION("D4", "读电表应答异常控制码"), + + CHARGE_DEVICE_REQUEST("0F", "充值指令"), + + CHARGE_DEVICE_RESPONSE("8F", "充值正确应答控制码"), + + CHARGE_DEVICE_EXCEPTION("CF", "充值异常应答控制码"), + + READ_TOTAL_CHARGE_IDENTIFY_CODE("00000000", "当前组合有功总电能"), + + READ_STATUS_IDENTIFY_CODE("03050004", "读电表运行状态"), + READ_UPLOAD_CODE("02008104", "读取上报报文"), + + TY_UPLOAD_CODE("55C2", "腾越水表上报标识"), + + WRITE_CHARGE_TIME_IDENTIFY_CODE("07008104", "写入充值次数"), + + WRITE_LIMIT_POWER("10008104","写入限容功率"), + READ_LIMIT_POWER("10008104","读取限容功率"), + READ_SOFT_VERSION("01008004","读软件版本"), + READ_HARD_VERSION("02008004","读硬件版本"), + READ_ICCID("05008104","读取ICCID"), + READ_IMEI("04008104","读取IMEI"), + WRITE_IP("0B008104","设置IP端口号"), + READ_IP("0B008104","读取IP端口号"), + UPGRADES("13008104","触发升级,写1触发升级") + ; + + private String code; + + private String desc; + + FourthGEnum(String code, String desc) { + this.code = code; + this.desc = desc; + } + + public String getCode() { + return code; + } + + public void setCode(String code) { + this.code = code; + } + + public String getDesc() { + return desc; + } + + public void setDesc(String desc) { + this.desc = desc; + } + + @Override + public String toString() { + return "FourthGEnum{" + + "code='" + code + '\'' + + ", desc='" + desc + '\'' + + '}'; + } +} diff --git a/user-service/src/main/java/com/mh/user/controller/GatewayManageController.java b/user-service/src/main/java/com/mh/user/controller/GatewayManageController.java index 99176aa..de416a6 100644 --- a/user-service/src/main/java/com/mh/user/controller/GatewayManageController.java +++ b/user-service/src/main/java/com/mh/user/controller/GatewayManageController.java @@ -2,6 +2,7 @@ package com.mh.user.controller; import com.mh.common.http.HttpResult; import com.mh.user.entity.GatewayManageEntity; +import com.mh.user.service.GatewayManageService; import com.mh.user.service.impl.DeviceDisplayServiceImpl; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; @@ -13,7 +14,7 @@ import java.util.List; public class GatewayManageController { @Autowired - private DeviceDisplayServiceImpl.GatewayManageService gatewayManageService; + private GatewayManageService gatewayManageService; //保存 @PostMapping(value="/save") diff --git a/user-service/src/main/java/com/mh/user/entity/GatewayManageEntity.java b/user-service/src/main/java/com/mh/user/entity/GatewayManageEntity.java index 7b1788e..764826c 100644 --- a/user-service/src/main/java/com/mh/user/entity/GatewayManageEntity.java +++ b/user-service/src/main/java/com/mh/user/entity/GatewayManageEntity.java @@ -29,4 +29,10 @@ public class GatewayManageEntity { private String connectDate; //最新上线连接时间 private String remarks; //备注 private String type; //操作类型 + private String heartBeat; // 心跳包 + private String imei; // 设备IMEI + private String sn; // 设备SN + private String dataCom; // 通讯com口 + private String thread; // 线程名称 + private String communityType; // 通讯 类型:realCom,tcp } diff --git a/user-service/src/main/java/com/mh/user/job/JobCloudAndMeter.java b/user-service/src/main/java/com/mh/user/job/JobCloudAndMeter.java index 63d78eb..6ff9ace 100644 --- a/user-service/src/main/java/com/mh/user/job/JobCloudAndMeter.java +++ b/user-service/src/main/java/com/mh/user/job/JobCloudAndMeter.java @@ -1,159 +1,159 @@ -package com.mh.user.job; - -import com.mh.user.constants.SocketMessage; -import com.mh.user.entity.GatewayManageEntity; -import com.mh.user.serialport.SerialPortThread; -import com.mh.user.service.DeviceCodeParamService; -import com.mh.user.service.impl.DeviceDisplayServiceImpl; -import com.mh.user.utils.GetReadOrder485; -import lombok.SneakyThrows; -import lombok.extern.slf4j.Slf4j; -import org.quartz.DisallowConcurrentExecution; -import org.quartz.Job; -import org.quartz.JobExecutionContext; -import org.quartz.JobExecutionException; -import org.springframework.beans.factory.annotation.Autowired; -import java.util.ArrayList; -import java.util.List; - -/** - * @author ljf - * @title : - * @description : 定时采集冷量计任务 - * @updateTime 2020-05-18 - * @throws : - */ -/** - * :@DisallowConcurrentExecution : 此标记用在实现Job的类上面,意思是不允许并发执行. - * :注意org.quartz.threadPool.threadCount线程池中线程的数量至少要多个,否则@DisallowConcurrentExecution不生效 - * :假如Job的设置时间间隔为3秒,但Job执行时间是5秒,设置@DisallowConcurrentExecution以后程序会等任务执行完毕以后再去执行, - * 否则会在3秒时再启用新的线程执行 - */ -@DisallowConcurrentExecution -@Slf4j -public class JobCloudAndMeter implements Job { - - @Autowired - private SocketMessage socketMessage; - - @Autowired - DeviceDisplayServiceImpl.GatewayManageService gatewayManageService; - - @Autowired - DeviceCodeParamService deviceCodeParamService; - - private static int taskTimes = 1; - List gatewayDtoList=new ArrayList<>(); - GetReadOrder485 getReadOrder485=new GetReadOrder485(); - - @SneakyThrows - @Override - public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException { - log.info("定时采集开始>>>>>>"); - if(taskTimes==2){//2 - int r=deviceCodeParamService.queryCount2(); //查询记录数 - if (r==0){ - getReadOrder485.createOrderParam2(); //生成采集参数 - } - - SerialPortThread myThread = new SerialPortThread(); - Thread thread = new Thread(myThread); - myThread.setName("2","1"); - thread.start(); - System.out.println("当前活动线程数:"+Thread.activeCount()); - Thread.currentThread().sleep(3000);//毫秒 - -// SerialPortThread myThread2 = new SerialPortThread(); -// Thread thread2 = new Thread(myThread2); -// myThread2.setName("2","2"); -// thread2.start(); -// System.out.println("当前活动线程数:"+Thread.activeCount()); -// Thread.currentThread().sleep(3000);//毫秒 +//package com.mh.user.job; // -// SerialPortThread myThread3 = new SerialPortThread(); -// Thread thread3 = new Thread(myThread3); -// myThread3.setName("2","3"); -// thread3.start(); -// System.out.println("当前活动线程数:"+Thread.activeCount()); -// Thread.currentThread().sleep(3000);//毫秒 +//import com.mh.user.constants.SocketMessage; +//import com.mh.user.entity.GatewayManageEntity; +//import com.mh.user.serialport.SerialPortThread; +//import com.mh.user.service.DeviceCodeParamService; +//import com.mh.user.service.GatewayManageService; +//import com.mh.user.utils.GetReadOrder485; +//import lombok.SneakyThrows; +//import lombok.extern.slf4j.Slf4j; +//import org.quartz.DisallowConcurrentExecution; +//import org.quartz.Job; +//import org.quartz.JobExecutionContext; +//import org.quartz.JobExecutionException; +//import org.springframework.beans.factory.annotation.Autowired; +//import java.util.ArrayList; +//import java.util.List; // -// SerialPortThread myThread4= new SerialPortThread(); -// Thread thread4 = new Thread(myThread4); -// myThread4.setName("2","4"); -// thread4.start(); -// System.out.println("当前活动线程数:"+Thread.activeCount()); -// Thread.currentThread().sleep(3000);//毫秒 - System.out.println("采集水、电、运行状态!"+taskTimes); - taskTimes++; - }else if(taskTimes==3){//5 - int r=deviceCodeParamService.queryCount3();//查询记录数 - if (r==0){ - getReadOrder485.createOrderParam3(); //生成采集参数 - } - - SerialPortThread myThread = new SerialPortThread(); - Thread thread = new Thread(myThread); - myThread.setName("3","1"); - thread.start(); - System.out.println("当前活动线程数:"+Thread.activeCount()); - Thread.currentThread().sleep(3000);//毫秒 - -// SerialPortThread myThread2 = new SerialPortThread(); -// Thread thread2 = new Thread(myThread2); -// myThread2.setName("3","2"); -// thread2.start(); -// System.out.println("当前活动线程数:"+Thread.activeCount()); -// Thread.currentThread().sleep(3000);//毫秒 +///** +// * @author ljf +// * @title : +// * @description : 定时采集冷量计任务 +// * @updateTime 2020-05-18 +// * @throws : +// */ +///** +// * :@DisallowConcurrentExecution : 此标记用在实现Job的类上面,意思是不允许并发执行. +// * :注意org.quartz.threadPool.threadCount线程池中线程的数量至少要多个,否则@DisallowConcurrentExecution不生效 +// * :假如Job的设置时间间隔为3秒,但Job执行时间是5秒,设置@DisallowConcurrentExecution以后程序会等任务执行完毕以后再去执行, +// * 否则会在3秒时再启用新的线程执行 +// */ +//@DisallowConcurrentExecution +//@Slf4j +//public class JobCloudAndMeter implements Job { // -// SerialPortThread myThread3 = new SerialPortThread(); -// Thread thread3 = new Thread(myThread3); -// myThread3.setName("3","3"); -// thread3.start(); -// System.out.println("当前活动线程数:"+Thread.activeCount()); -// Thread.currentThread().sleep(3000);//毫秒 +// @Autowired +// private SocketMessage socketMessage; // -// SerialPortThread myThread4= new SerialPortThread(); -// Thread thread4 = new Thread(myThread4); -// myThread4.setName("3","4"); -// thread4.start(); -// System.out.println("当前活动线程数:"+Thread.activeCount()); -// Thread.currentThread().sleep(3000);//毫秒 - System.out.println("采集设定温度、设定水位、故障状态!"+taskTimes); - taskTimes=1; - }else if(taskTimes==1){ - int r=deviceCodeParamService.queryCount(); //查询记录数 - if (r==0){ - getReadOrder485.createOrderParam(); //生成采集参数 - } - SerialPortThread myThread = new SerialPortThread(); - Thread thread = new Thread(myThread); - myThread.setName("1","1"); - thread.start(); - System.out.println("当前活动线程数:"+Thread.activeCount()); - Thread.currentThread().sleep(3000);//毫秒 - -// SerialPortThread myThread2 = new SerialPortThread(); -// Thread thread2 = new Thread(myThread2); -// myThread2.setName("1","2"); -// thread2.start(); -// System.out.println("当前活动线程数:"+Thread.activeCount()); -// Thread.currentThread().sleep(3000);//毫秒 +// @Autowired +// GatewayManageService gatewayManageService; // -// SerialPortThread myThread3 = new SerialPortThread(); -// Thread thread3 = new Thread(myThread3); -// myThread3.setName("1","3"); -// thread3.start(); -// System.out.println("当前活动线程数:"+Thread.activeCount()); -// Thread.currentThread().sleep(3000);//毫秒 +// @Autowired +// DeviceCodeParamService deviceCodeParamService; // -// SerialPortThread myThread4= new SerialPortThread(); -// Thread thread4 = new Thread(myThread4); -// myThread4.setName("1","4"); -// thread4.start(); -// System.out.println("当前活动线程数:"+Thread.activeCount()); -// Thread.currentThread().sleep(3000);//毫秒 - System.out.println("采集水位、水温!"+taskTimes); - taskTimes++; - } - } -} +// private static int taskTimes = 1; +// List gatewayDtoList=new ArrayList<>(); +// GetReadOrder485 getReadOrder485=new GetReadOrder485(); +// +// @SneakyThrows +// @Override +// public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException { +// log.info("定时采集开始>>>>>>"); +// if(taskTimes==2){//2 +// int r=deviceCodeParamService.queryCount2(); //查询记录数 +// if (r==0){ +// getReadOrder485.createOrderParam2(); //生成采集参数 +// } +// +// SerialPortThread myThread = new SerialPortThread(); +// Thread thread = new Thread(myThread); +// myThread.setName("2","1"); +// thread.start(); +// System.out.println("当前活动线程数:"+Thread.activeCount()); +// Thread.currentThread().sleep(3000);//毫秒 +// +//// SerialPortThread myThread2 = new SerialPortThread(); +//// Thread thread2 = new Thread(myThread2); +//// myThread2.setName("2","2"); +//// thread2.start(); +//// System.out.println("当前活动线程数:"+Thread.activeCount()); +//// Thread.currentThread().sleep(3000);//毫秒 +//// +//// SerialPortThread myThread3 = new SerialPortThread(); +//// Thread thread3 = new Thread(myThread3); +//// myThread3.setName("2","3"); +//// thread3.start(); +//// System.out.println("当前活动线程数:"+Thread.activeCount()); +//// Thread.currentThread().sleep(3000);//毫秒 +//// +//// SerialPortThread myThread4= new SerialPortThread(); +//// Thread thread4 = new Thread(myThread4); +//// myThread4.setName("2","4"); +//// thread4.start(); +//// System.out.println("当前活动线程数:"+Thread.activeCount()); +//// Thread.currentThread().sleep(3000);//毫秒 +// System.out.println("采集水、电、运行状态!"+taskTimes); +// taskTimes++; +// }else if(taskTimes==3){//5 +// int r=deviceCodeParamService.queryCount3();//查询记录数 +// if (r==0){ +// getReadOrder485.createOrderParam3(); //生成采集参数 +// } +// +// SerialPortThread myThread = new SerialPortThread(); +// Thread thread = new Thread(myThread); +// myThread.setName("3","1"); +// thread.start(); +// System.out.println("当前活动线程数:"+Thread.activeCount()); +// Thread.currentThread().sleep(3000);//毫秒 +// +//// SerialPortThread myThread2 = new SerialPortThread(); +//// Thread thread2 = new Thread(myThread2); +//// myThread2.setName("3","2"); +//// thread2.start(); +//// System.out.println("当前活动线程数:"+Thread.activeCount()); +//// Thread.currentThread().sleep(3000);//毫秒 +//// +//// SerialPortThread myThread3 = new SerialPortThread(); +//// Thread thread3 = new Thread(myThread3); +//// myThread3.setName("3","3"); +//// thread3.start(); +//// System.out.println("当前活动线程数:"+Thread.activeCount()); +//// Thread.currentThread().sleep(3000);//毫秒 +//// +//// SerialPortThread myThread4= new SerialPortThread(); +//// Thread thread4 = new Thread(myThread4); +//// myThread4.setName("3","4"); +//// thread4.start(); +//// System.out.println("当前活动线程数:"+Thread.activeCount()); +//// Thread.currentThread().sleep(3000);//毫秒 +// System.out.println("采集设定温度、设定水位、故障状态!"+taskTimes); +// taskTimes=1; +// }else if(taskTimes==1){ +// int r=deviceCodeParamService.queryCount(); //查询记录数 +// if (r==0){ +// getReadOrder485.createOrderParam(); //生成采集参数 +// } +// SerialPortThread myThread = new SerialPortThread(); +// Thread thread = new Thread(myThread); +// myThread.setName("1","1"); +// thread.start(); +// System.out.println("当前活动线程数:"+Thread.activeCount()); +// Thread.currentThread().sleep(3000);//毫秒 +// +//// SerialPortThread myThread2 = new SerialPortThread(); +//// Thread thread2 = new Thread(myThread2); +//// myThread2.setName("1","2"); +//// thread2.start(); +//// System.out.println("当前活动线程数:"+Thread.activeCount()); +//// Thread.currentThread().sleep(3000);//毫秒 +//// +//// SerialPortThread myThread3 = new SerialPortThread(); +//// Thread thread3 = new Thread(myThread3); +//// myThread3.setName("1","3"); +//// thread3.start(); +//// System.out.println("当前活动线程数:"+Thread.activeCount()); +//// Thread.currentThread().sleep(3000);//毫秒 +//// +//// SerialPortThread myThread4= new SerialPortThread(); +//// Thread thread4 = new Thread(myThread4); +//// myThread4.setName("1","4"); +//// thread4.start(); +//// System.out.println("当前活动线程数:"+Thread.activeCount()); +//// Thread.currentThread().sleep(3000);//毫秒 +// System.out.println("采集水位、水温!"+taskTimes); +// taskTimes++; +// } +// } +//} diff --git a/user-service/src/main/java/com/mh/user/mapper/GatewayManageMapper.java b/user-service/src/main/java/com/mh/user/mapper/GatewayManageMapper.java index d263efc..db66660 100644 --- a/user-service/src/main/java/com/mh/user/mapper/GatewayManageMapper.java +++ b/user-service/src/main/java/com/mh/user/mapper/GatewayManageMapper.java @@ -34,6 +34,12 @@ public interface GatewayManageMapper { @Result(column = "create_date", property = "createDate"), @Result(column = "connect_date", property = "connectDate"), @Result(column = "remarks", property = "remarks"), + @Result(column = "heart_beat", property = "heartBeat"), + @Result(column = "imei", property = "imei"), + @Result(column = "sn", property = "sn"), + @Result(column = "data_com", property = "dataCom"), + @Result(column = "thread", property = "thread"), + @Result(column = "community_type", property = "communityType") }) List queryByOther(@Param("grade") Integer grade, @Param("operator") Integer operator); @@ -97,4 +103,12 @@ public interface GatewayManageMapper { // 根据设备类型查询数据库,找出对应的详细信息 @Select("select id, gateway_name, gateway_ip, gateway_address, data_com, connect_date, internet_card, operator, gateway_port, grade from gateway_manage where grade = #{grade}") GatewayManageEntity queryGatewayByGrade(@Param("grade") Long grade); + + // 根据设备心跳包更新设备在线状态 + @Update("update gateway_manage set grade = #{status} where heart_beat = #{heartBeat}") + void updateGatewayManageOnlineByHeartBeatCode(String heartBeat, int status); + + // 根据设备IMEI更新设备在线状态 + @Update("update gateway_manage set grade = #{status} where imei = #{imei}") + void updateGatewayManageOnlineByImei(String imei, int status); } diff --git a/user-service/src/main/java/com/mh/user/netty/NettyEchoServer.java b/user-service/src/main/java/com/mh/user/netty/NettyEchoServer.java new file mode 100644 index 0000000..5829428 --- /dev/null +++ b/user-service/src/main/java/com/mh/user/netty/NettyEchoServer.java @@ -0,0 +1,53 @@ +package com.mh.user.netty; + +import com.mh.user.netty.handle.FourthChannelInitializer; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.*; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.logging.LoggingHandler; +import lombok.extern.slf4j.Slf4j; + + +/** + * @author LJF + * @title :Netty + * @description :netty 使用 + * @updateTime 2020-04-21 + * @throws : + */ +@Slf4j +public class NettyEchoServer { + + public void bind(int port) throws Exception { + // accept线程组,用来接收连接 + EventLoopGroup bossGroup = new NioEventLoopGroup(2); + // IO 线程组,用来处理业务逻辑 + EventLoopGroup workerGroup = new NioEventLoopGroup(4); + + try { + // 服务端启动引导 + ServerBootstrap serverBootstrap = new ServerBootstrap(); + serverBootstrap.group(bossGroup,workerGroup) // 绑定两个线程 + .channel(NioServerSocketChannel.class) // 指定通道类型 + .option(ChannelOption.SO_BACKLOG, 1024) // 设置TCP连接的缓冲区 + .option(ChannelOption.SO_KEEPALIVE, true) + .handler(new LoggingHandler(LogLevel.INFO)) // 设置日志级别 + .childHandler(new FourthChannelInitializer()); // 初始化handle + // 通过bind启动服务 + ChannelFuture f = serverBootstrap.bind(port).sync(); + // 阻塞主线程,知道网络服务被关闭 + //f.channel().closeFuture().sync(); + f.channel().closeFuture().addListener(future -> { + log.info("Netty服务器已关闭"); + workerGroup.shutdownGracefully(); + bossGroup.shutdownGracefully(); + }); + } catch (Exception e){ + log.error("netty服务器异常", e); + workerGroup.shutdownGracefully(); + bossGroup.shutdownGracefully(); + } + } +} diff --git a/user-service/src/main/java/com/mh/user/netty/decoder/CustomFrameEncoder.java b/user-service/src/main/java/com/mh/user/netty/decoder/CustomFrameEncoder.java new file mode 100644 index 0000000..4ef4b62 --- /dev/null +++ b/user-service/src/main/java/com/mh/user/netty/decoder/CustomFrameEncoder.java @@ -0,0 +1,63 @@ +package com.mh.user.netty.decoder; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToByteEncoder; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class CustomFrameEncoder extends MessageToByteEncoder { + private static final byte FRAME_START = 0x68; + private static final byte FRAME_END = 0x16; + + @Override + protected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out) throws Exception { + log.info("数据到达:"+msg); + // Write first frame start + out.writeByte(FRAME_START); + + // Write meter number + byte[] meterNumberBytes = hexStringToByteArray(msg.substring(2, 14)); // Extract meter number from message + out.writeBytes(meterNumberBytes); + + // Write second frame start + out.writeByte(FRAME_START); + + // Write control code + byte controlCode = hexStringToByteArray(msg.substring(14, 16))[0]; // Extract control code from message + out.writeByte(controlCode); + + // Write data length + int dataLength = Integer.parseInt(msg.substring(16, 18), 16); // Extract data length from message + out.writeByte(dataLength); + + // Write data + byte[] dataBytes = hexStringToByteArray(msg.substring(18, 18 + (dataLength * 2))); // Extract data from message + out.writeBytes(dataBytes); + + // Calculate and write checksum + byte checksum = calculateChecksum(msg.substring(14, 18 + (dataLength * 2))); // Calculate checksum + out.writeByte(checksum); + + // Write frame end + out.writeByte(FRAME_END); + } + + private byte[] hexStringToByteArray(String hexString) { + int len = hexString.length(); + byte[] data = new byte[len / 2]; + for (int i = 0; i < len; i += 2) { + data[i / 2] = (byte) ((Character.digit(hexString.charAt(i), 16) << 4) + + Character.digit(hexString.charAt(i + 1), 16)); + } + return data; + } + + private byte calculateChecksum(String message) { + byte checksum = 0; + for (int i = 0; i < message.length(); i += 2) { + checksum += Integer.parseInt(message.substring(i, i + 2), 16); + } + return (byte) (checksum & 0xFF); + } +} \ No newline at end of file diff --git a/user-service/src/main/java/com/mh/user/netty/handle/DataUploadServerHandler.java b/user-service/src/main/java/com/mh/user/netty/handle/DataUploadServerHandler.java new file mode 100644 index 0000000..c6311e2 --- /dev/null +++ b/user-service/src/main/java/com/mh/user/netty/handle/DataUploadServerHandler.java @@ -0,0 +1,50 @@ +package com.mh.user.netty.handle; + +import com.github.benmanes.caffeine.cache.Cache; +import com.mh.user.constants.FourthGEnum; +import com.mh.user.entity.GatewayManageEntity; +import com.mh.user.netty.session.ServerSession; +import com.mh.user.netty.session.SessionMap; +import com.mh.user.utils.CacheUtil; +import com.mh.user.utils.ExchangeStringUtil; +import com.mh.user.utils.NettyTools; +import com.mh.user.utils.SpringBeanUtil; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import lombok.extern.slf4j.Slf4j; +import org.springframework.context.ApplicationContext; + +import java.util.List; + +@Slf4j +public class DataUploadServerHandler extends ChannelInboundHandlerAdapter { + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + //将接收到的数据转为字符串,此字符串就是客户端发送的字符串 + String receiveStr = ExchangeStringUtil.bytesToHexString((byte[]) msg); + // 判断当前报文是否是上报数据报文 + if (receiveStr != null) { + // 判断属于哪一个DTU网关上报的数据 + CacheUtil instance = CacheUtil.getInstance(); + List gwList = instance.getGatewayInfo(); + if (gwList != null && !gwList.isEmpty()) { + for (GatewayManageEntity gw : gwList) { + if (receiveStr.startsWith(ExchangeStringUtil.str2HexStr(gw.getSn()))) { + // 直接设置对应值 + NettyTools.setReceiveMsg(gw.getHeartBeat(), receiveStr.substring(gw.getSn().length())); + } else { + // 判断是否登录,没有登录立马断开 + String deviceCode = gw.getHeartBeat(); + if (!SessionMap.inst().hasLogin(deviceCode+ctx.channel().remoteAddress())) { + ServerSession.closeSession(ctx); + return; + } + } + } + } + } + super.channelRead(ctx, msg); + } + +} diff --git a/user-service/src/main/java/com/mh/user/netty/handle/ExceptionServerHandler.java b/user-service/src/main/java/com/mh/user/netty/handle/ExceptionServerHandler.java new file mode 100644 index 0000000..c794f71 --- /dev/null +++ b/user-service/src/main/java/com/mh/user/netty/handle/ExceptionServerHandler.java @@ -0,0 +1,50 @@ +package com.mh.user.netty.handle; + +import com.mh.user.netty.session.ServerSession; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import lombok.extern.slf4j.Slf4j; + +/** + * @author LJF + * @version 1.0 + * @project TAD_Server + * @description 异常处理 + * @date 2023/7/5 09:02:03 + */ +@Slf4j +public class ExceptionServerHandler extends ChannelInboundHandlerAdapter { + //private final ApplicationContext applicationContext = ApplicationContextProvider.getApplicationContext(); + //private final MeterInfoService meterInfoService = applicationContext.getBean(MeterInfoService.class); + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + ctx.flush(); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + log.info("{}客户端断开连接!", ctx.channel().remoteAddress()); + //String meterNumIp = ServerSession.closeSession(ctx); + //String[] split = meterNumIp.split("/"); + //String meterNum = split[0]; + //if (meterNum != null) { + // meterInfoService.insertTestDetail(meterNum,"offline"); + //} + ServerSession.closeSession(ctx); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + if (cause instanceof UnsupportedOperationException) { + ServerSession.closeSession(ctx); + } else { + log.info("获得已知异常!",cause); + ctx.close(); + } + log.info("获得已知异常!{}", cause.getMessage()); + log.info("获得已知异常!", cause); + ServerSession.closeSession(ctx); + ctx.close(); + } +} diff --git a/user-service/src/main/java/com/mh/user/netty/handle/FourthChannelInitializer.java b/user-service/src/main/java/com/mh/user/netty/handle/FourthChannelInitializer.java new file mode 100644 index 0000000..bca94d0 --- /dev/null +++ b/user-service/src/main/java/com/mh/user/netty/handle/FourthChannelInitializer.java @@ -0,0 +1,34 @@ +package com.mh.user.netty.handle; + +import com.mh.user.netty.decoder.CustomFrameEncoder; +import io.netty.channel.Channel; +import io.netty.channel.ChannelInitializer; + +/** + * @author LJF + * @version 1.0 + * @project TAD_Server + * @description 初始换一个服务端 + * @date 2023/7/3 15:58:05 + */ +public class FourthChannelInitializer extends ChannelInitializer { + @Override + protected void initChannel(Channel channel) throws Exception { + // 电表报文分隔符,一般都是16结尾 + //ByteBuf delimiter = Unpooled.copiedBuffer(new byte[]{22}); + //打印分隔符 + //channel.pipeline().addLast("frameDecoder", new DelimiterBasedFrameDecoder(1024,false, delimiter)); + //分割报文处理解码器,防止数据粘包 +// channel.pipeline().addLast(new CustomFrameEncoder()); + // 处理登录 + channel.pipeline().addLast(new LoginRequestHandler()); + // 处理心跳 + channel.pipeline().addLast(new HeartBeatServerHandler()); + // 处理数据上报数据 + channel.pipeline().addLast(new DataUploadServerHandler()); + // 异常处理 + channel.pipeline().addLast(new ExceptionServerHandler()); + } + + +} diff --git a/user-service/src/main/java/com/mh/user/netty/handle/HeartBeatServerHandler.java b/user-service/src/main/java/com/mh/user/netty/handle/HeartBeatServerHandler.java new file mode 100644 index 0000000..7f0a8d5 --- /dev/null +++ b/user-service/src/main/java/com/mh/user/netty/handle/HeartBeatServerHandler.java @@ -0,0 +1,102 @@ +package com.mh.user.netty.handle; + +import com.mh.user.model.SysLog; +import com.mh.user.netty.session.ServerSession; +import com.mh.user.service.GatewayManageService; +import com.mh.user.service.SysLogService; +import com.mh.user.utils.ExchangeStringUtil; +import com.mh.user.utils.SpringContextUtils; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.timeout.IdleStateEvent; +import io.netty.handler.timeout.IdleStateHandler; +import lombok.extern.slf4j.Slf4j; +import org.springframework.context.ApplicationContext; + +import java.util.Date; +import java.util.concurrent.TimeUnit; + + +/** + * @author LJF + * @version 1.0 + * @project TAD_Server + * @description 心跳包设置 + * @date 2023/7/4 13:54:27 + */ +@Slf4j +public class HeartBeatServerHandler extends IdleStateHandler { + + private final ApplicationContext applicationContext = SpringContextUtils.getApplicationContext(); + private final SysLogService sysLogService = applicationContext.getBean(SysLogService.class); + private final GatewayManageService gatewayManageService = applicationContext.getBean(GatewayManageService.class); + + //目前网关默认心跳30秒,多长时间没有心跳,就关闭连接 + private static final int READ_IDLE_GAP = 30; + + /** + * @param readerIdleTimeSeconds 最长 没有 read到心跳的时间 + * @param writerIdleTimeSeconds 心跳包我们只是进行接收心跳,没有所谓的写操作,我们将其设置为0 + * @param allIdleTimeSeconds 如果我们定义 多长时间没有 读或写操作,我们使用这个参数。 + */ + public HeartBeatServerHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) { + super(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds); + } + + public HeartBeatServerHandler(long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) { + super(readerIdleTime, writerIdleTime, allIdleTime, unit); + } + + public HeartBeatServerHandler(boolean observeOutput, long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) { + super(observeOutput, readerIdleTime, writerIdleTime, allIdleTime, unit); + } + + public HeartBeatServerHandler() { + super(READ_IDLE_GAP, 0, 0, TimeUnit.SECONDS); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + + //将接收到的数据转为字符串,此字符串就是客户端发送的字符串 + String receiveStr = ExchangeStringUtil.bytesToHexString((byte[]) msg); + // 判断当前报文是否是登录报文 + if (receiveStr != null && receiveStr.startsWith("2400")) { + if (receiveStr.length() != 8) { + super.channelRead(ctx, msg); + return; + } + + // 通过对应的心跳包码进行判断,然后更新网关在线情况 + gatewayManageService.updateGatewayManageOnlineByHeartBeatCode(receiveStr, 0); + +// //进行心跳包的处理 +// FutureTaskScheduler.add(() -> { +// if (ctx.channel().isActive()) { +// // 开始发送第一条采集数据报文 +// } +// }); + } + super.channelRead(ctx, msg); + } + + @Override + protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception { + log.info("{} 已超过 " + READ_IDLE_GAP + " 秒没有接收到心跳包,已断开链接", ctx.channel().remoteAddress()); + String imeiIP = ServerSession.closeSession(ctx); + String[] split = imeiIP.split("/"); + String imei = split[0]; + if (imei != null) { + // 添加到日志表,下线了 + SysLog sysLog = new SysLog(); + sysLog.setOperation(imei + "下线了"); + sysLog.setCreateBy("开发者"); + sysLog.setIp(imeiIP); + sysLog.setCreateTime(new Date()); + sysLogService.insertLog(sysLog); + // 更新网关在线情况 + gatewayManageService.updateGatewayManageOnlineByImei(imei, 1); + } + ctx.close(); + } + +} diff --git a/user-service/src/main/java/com/mh/user/netty/handle/LoginRequestHandler.java b/user-service/src/main/java/com/mh/user/netty/handle/LoginRequestHandler.java new file mode 100644 index 0000000..8b72933 --- /dev/null +++ b/user-service/src/main/java/com/mh/user/netty/handle/LoginRequestHandler.java @@ -0,0 +1,101 @@ +package com.mh.user.netty.handle; + +import com.mh.common.utils.StringUtils; +import com.mh.user.netty.session.ServerSession; +import com.mh.user.netty.session.SessionMap; +import com.mh.user.netty.task.CallbackTask; +import com.mh.user.netty.task.CallbackTaskScheduler; +import com.mh.user.utils.ExchangeStringUtil; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import lombok.extern.slf4j.Slf4j; + +/** + * @author LJF + * @version 1.0 + * @project TAD_Server + * @description 登录报文处理 + * @date 2023/7/3 15:34:11 + */ +@Slf4j +public class LoginRequestHandler extends ChannelInboundHandlerAdapter { + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + //将接收到的数据转为字符串,此字符串就是客户端发送的字符串 + String receiveStr = ExchangeStringUtil.bytesToHexString((byte[]) msg); + if (StringUtils.isBlank(receiveStr)) { + super.channelRead(ctx, msg); + return; + } + // 判断当前报文是否是心跳包上线: 869530073040186 + log.info("接收到的心跳报文 <== {}", receiveStr); + // 获取IMEI号 + String deviceCode = receiveStr; + String meterNum = deviceCode; + deviceCode = deviceCode + ctx.channel().remoteAddress(); + //新的session的创建 + ServerSession session = new ServerSession(ctx.channel(), deviceCode); + + //进行登录逻辑处理,异步进行处理。并且需要知道 处理的结果。 callbacktask就要 + //派上用场了 + String finalDeviceCode = deviceCode; + CallbackTaskScheduler.add(new CallbackTask() { + @Override + public Boolean execute() throws Exception { + //进行 login 逻辑的处理 + return action(session, finalDeviceCode, ctx, meterNum); + } + + //没有异常的话,我们进行处理 + @Override + public void onBack(Boolean result) { + if (result) { + log.info("设备登录成功: 设备号 = " + session.getSessionId()); + //ctx.pipeline().remove(LoginRequestHandler.class); //压测需要放开 + } else { + log.info("设备刷新登录: 设备号 = " + session.getSessionId()); + SessionMap.inst().updateSession(finalDeviceCode, session, meterNum); + //log.info("设备登录失败: 设备号 = " + session.getSessionId()); + //ServerSession.closeSession(ctx); + // 假如说已经在会话中了,直接断开连接 + //ctx.close(); + } + } + + //有异常的话,我们进行处理 + @Override + public void onException(Throwable t) { + log.info("设备登录异常: 设备号 = " + session.getSessionId()); + ServerSession.closeSession(ctx); + } + }); + } + + private boolean action(ServerSession session, String deviceCode, ChannelHandlerContext ctx, String meterNum) { + //user验证 + boolean isValidUser = checkUser(deviceCode, session); + if (!isValidUser) { + //我们发送登录成功的报文给 客户端 + session.bind(); + return false; + } + //我们发送登录成功的报文给 客户端 + session.bind(); + return true; + } + + private boolean checkUser(String deviceCode, ServerSession session) { + //当前用户已经登录 + if (SessionMap.inst().hasLogin(deviceCode)) { + log.info("设备已经登录: 设备号 = " + deviceCode); + return false; + } + //一般情况下,我们会将 user存储到 DB中,然后对user的用户名和密码进行校验 + //但是,我们这边没有进行db的集成,所以我们想一个别的办法进行user的校验。在我们的sessionMap进行以下校验 + //为什么选sessionmap,因为我们user的会话,都是存储到sessionmap中的,sessionmap中只要有这个user的会话,说明就是ok的 + return true; + } + + +} diff --git a/user-service/src/main/java/com/mh/user/netty/session/ServerSession.java b/user-service/src/main/java/com/mh/user/netty/session/ServerSession.java new file mode 100644 index 0000000..285be2b --- /dev/null +++ b/user-service/src/main/java/com/mh/user/netty/session/ServerSession.java @@ -0,0 +1,66 @@ +package com.mh.user.netty.session; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.util.AttributeKey; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; + +@Data +@Slf4j +public class ServerSession { + public static final AttributeKey SESSION_KEY = + AttributeKey.valueOf("SESSION_KEY"); + //通道 + private Channel channel; + private final String sessionId; + private boolean isLogin = false; + + public ServerSession(Channel channel, String deviceCode){ + this.channel = channel; + this.sessionId = deviceCode; + } + + //session需要和通道进行一定的关联,他是在构造函数中关联上的; + //session还需要通过sessionkey和channel进行再次的关联;channel.attr方法.set当前的 + // serverSession + //session需要被添加到我们的SessionMap中 + public void bind(){ + log.info("server Session 会话进行绑定 :" + channel.remoteAddress()); + channel.attr(SESSION_KEY).set(this); + SessionMap.inst().addSession(sessionId, this); + this.isLogin = true; + } + + //通过channel获取session + public static ServerSession getSession(ChannelHandlerContext ctx){ + Channel channel = ctx.channel(); + return channel.attr(SESSION_KEY).get(); + } + + //关闭session,新增返回一个meterNum用于纪录设备下线时间2024-05-08 + public static String closeSession(ChannelHandlerContext ctx){ + String meterNum = null; + ServerSession serverSession = ctx.channel().attr(SESSION_KEY).get(); + if(serverSession != null && serverSession.getSessionId() != null) { + ChannelFuture future = serverSession.channel.close(); + future.addListener((ChannelFutureListener) future1 -> { + if(!future1.isSuccess()) { + log.info("Channel close error!"); + } + }); + ctx.close(); + meterNum = serverSession.sessionId; + SessionMap.inst().removeSession(serverSession.sessionId); + log.info(ctx.channel().remoteAddress()+" "+serverSession.sessionId + "==>移除会话"); + } + return meterNum; + } + + //写消息 + public void writeAndFlush(Object msg) { + channel.writeAndFlush(msg); + } +} diff --git a/user-service/src/main/java/com/mh/user/netty/session/SessionMap.java b/user-service/src/main/java/com/mh/user/netty/session/SessionMap.java new file mode 100644 index 0000000..4ad8543 --- /dev/null +++ b/user-service/src/main/java/com/mh/user/netty/session/SessionMap.java @@ -0,0 +1,96 @@ +package com.mh.user.netty.session; + +import lombok.Data; +import lombok.extern.slf4j.Slf4j; + +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +@Data +@Slf4j +public class SessionMap { + + private ThreadLocal sceneThreadLocal = new ThreadLocal<>(); + + //用单例模式进行sessionMap的创建 + private SessionMap(){} + + private static SessionMap singleInstance = new SessionMap(); + + public static SessionMap inst() { + return singleInstance; + } + + //进行会话的保存 + //key 我们使用 sessionId;value 需要是 serverSession + private ConcurrentHashMap map = new ConcurrentHashMap<>(256); + //添加session + public void addSession(String sessionId, ServerSession s) { + map.put(sessionId, s); + log.info("IP地址:"+s.getChannel().remoteAddress()+" "+ sessionId + " 表具上线,总共表具:" + map.size()); + } + + //删除session + public void removeSession(String sessionId) { + if(map.containsKey(sessionId)) { + ServerSession s = map.get(sessionId); + map.remove(sessionId); + log.info("设备id下线:{},在线设备:{}", s.getSessionId(), map.size() ); + } + return; + } + + public boolean hasLogin(String sessionId) { + Iterator> iterator = map.entrySet().iterator(); + while(iterator.hasNext()) { + Map.Entry next = iterator.next(); + if(sessionId != null && sessionId.equalsIgnoreCase(next.getValue().getSessionId())) { + return true ; + } + } + return false; + } + + //如果在线,肯定有sessionMap里保存的 serverSession + //如果不在线,serverSession也没有。用这个来判断是否在线 + public List getSessionBy(String sessionId) { + return map.values().stream(). + filter(s -> s.getSessionId().equals(sessionId)). + collect(Collectors.toList()); + } + + public boolean getScene() { + return sceneThreadLocal.get(); + } + + public void initScene(Boolean status) { + if (sceneThreadLocal == null) { + log.info("======创建ThreadLocal======"); + sceneThreadLocal = new ThreadLocal<>(); + } + log.info("设置状态==>" + status); + sceneThreadLocal.set(status); + } + + public void clearScene() { + initScene(null); + sceneThreadLocal.remove(); + } + + public void updateSession(String sessionId, ServerSession session, String meterNum) { + Iterator> iterator = map.entrySet().iterator(); + while(iterator.hasNext()) { + Map.Entry next = iterator.next(); + if (next.getKey().contains(meterNum)){ + iterator.remove(); + } + if(sessionId != null && sessionId.equalsIgnoreCase(next.getValue().getSessionId())) { + next.setValue(session); + } + } + } + +} diff --git a/user-service/src/main/java/com/mh/user/netty/task/CallbackTask.java b/user-service/src/main/java/com/mh/user/netty/task/CallbackTask.java new file mode 100644 index 0000000..1512fb1 --- /dev/null +++ b/user-service/src/main/java/com/mh/user/netty/task/CallbackTask.java @@ -0,0 +1,20 @@ +package com.mh.user.netty.task; + +/** + * @author LJF + * @version 1.0 + * @project TAD_Server + * @description 回调任务 + * @date 2023/7/3 15:34:11 + */ +public interface CallbackTask { + T execute() throws Exception; + + /** + * // 执行没有 异常的情况下的 返回值 + * @param t + */ + void onBack(T t); + + void onException(Throwable t); +} diff --git a/user-service/src/main/java/com/mh/user/netty/task/CallbackTaskScheduler.java b/user-service/src/main/java/com/mh/user/netty/task/CallbackTaskScheduler.java new file mode 100644 index 0000000..c242f04 --- /dev/null +++ b/user-service/src/main/java/com/mh/user/netty/task/CallbackTaskScheduler.java @@ -0,0 +1,80 @@ +package com.mh.user.netty.task; + +import com.google.common.util.concurrent.*; + +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * @author LJF + * @version 1.0 + * @project TAD_Server + * @description 回调任务 + * @date 2023/7/3 15:34:11 + */ +public class CallbackTaskScheduler extends Thread { + private ConcurrentLinkedQueue executeTaskQueue = + new ConcurrentLinkedQueue<>(); + private long sleepTime = 1000 * 10; + private final ExecutorService pool = Executors.newCachedThreadPool(); + ListeningExecutorService lpool = MoreExecutors.listeningDecorator(pool); + private static CallbackTaskScheduler inst = new CallbackTaskScheduler(); + + private CallbackTaskScheduler() { + this.start(); + } + + //add task + public static void add(CallbackTask executeTask) { + inst.executeTaskQueue.add(executeTask); + } + + @Override + public void run() { + while (true) { + handleTask(); + //为了避免频繁连接服务器,但是当前连接服务器过长导致失败 + //threadSleep(sleepTime); + } + } + + private void threadSleep(long sleepTime) { + try { + Thread.sleep(sleepTime); + } catch (Exception e) { + e.printStackTrace(); + } + } + + //任务执行 + private void handleTask() { + CallbackTask executeTask = null; + while (executeTaskQueue.peek() != null) { + executeTask = executeTaskQueue.poll(); + handleTask(executeTask); + } + } + + private void handleTask(CallbackTask executeTask) { + ListenableFuture future = lpool.submit(new Callable() { + public T call() throws Exception { + return executeTask.execute(); + } + }); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(T t) { + executeTask.onBack(t); + } + + @Override + public void onFailure(Throwable throwable) { + executeTask.onException(throwable); + } + }, lpool); + } + +} + diff --git a/user-service/src/main/java/com/mh/user/netty/task/ExecuteTask.java b/user-service/src/main/java/com/mh/user/netty/task/ExecuteTask.java new file mode 100644 index 0000000..7cc5ceb --- /dev/null +++ b/user-service/src/main/java/com/mh/user/netty/task/ExecuteTask.java @@ -0,0 +1,6 @@ +package com.mh.user.netty.task; + +//不需要知道异步线程的 返回值 +public interface ExecuteTask { + void execute(); +} diff --git a/user-service/src/main/java/com/mh/user/netty/task/FutureTaskScheduler.java b/user-service/src/main/java/com/mh/user/netty/task/FutureTaskScheduler.java new file mode 100644 index 0000000..3819b2c --- /dev/null +++ b/user-service/src/main/java/com/mh/user/netty/task/FutureTaskScheduler.java @@ -0,0 +1,67 @@ +package com.mh.user.netty.task; + +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * @author LJF + * @version 1.0 + * @project TAD_Server + * @description 任务定时 + * @date 2023/7/3 15:34:11 + */ +public class FutureTaskScheduler extends Thread{ + private ConcurrentLinkedQueue executeTaskQueue = + new ConcurrentLinkedQueue<>(); + private long sleepTime = 200; + private ExecutorService pool = Executors.newFixedThreadPool(10); + private static FutureTaskScheduler inst = new FutureTaskScheduler(); + public FutureTaskScheduler() { + this.start(); + } + //任务添加 + public static void add(ExecuteTask executeTask) { + inst.executeTaskQueue.add(executeTask); + } + + @Override + public void run() { + while (true) { + handleTask(); + //threadSleep(sleepTime); + } + } + + private void threadSleep(long sleepTime) { + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + //执行任务 + private void handleTask() { + ExecuteTask executeTask; + while (executeTaskQueue.peek() != null) { + executeTask = executeTaskQueue.poll(); + handleTask(executeTask); + } + //刷新心跳时间 + } + private void handleTask(ExecuteTask executeTask) { + pool.execute(new ExecuteRunnable(executeTask)); + } + + class ExecuteRunnable implements Runnable { + ExecuteTask executeTask; + public ExecuteRunnable(ExecuteTask executeTask) { + this.executeTask = executeTask; + } + @Override + public void run() { + executeTask.execute(); + } + } +} diff --git a/user-service/src/main/java/com/mh/user/serialport/SendAndReceiveByCom.java b/user-service/src/main/java/com/mh/user/serialport/SendAndReceiveByCom.java index d5a9b6d..128dd5c 100644 --- a/user-service/src/main/java/com/mh/user/serialport/SendAndReceiveByCom.java +++ b/user-service/src/main/java/com/mh/user/serialport/SendAndReceiveByCom.java @@ -11,6 +11,7 @@ import com.mh.user.service.NowDataService; import com.mh.user.service.SysParamService; import com.mh.user.strategy.DeviceStrategy; import com.mh.user.strategy.DeviceStrategyFactory; +import com.mh.user.tcp.SendAndReceiveByTcp; import com.mh.user.utils.*; import lombok.extern.slf4j.Slf4j; import org.springframework.context.ApplicationContext; @@ -119,20 +120,8 @@ public class SendAndReceiveByCom { String dateStr = DateUtil.dateToString(date1, "yyyy-MM-dd HH:mm:ss"); if (bytes == null) { SerialTool.closePort(serialPort); - log.info("串口" + serialPort.getName() + "没有数据返回!" + i); - log.info("----------------" + deviceType + "离线,设备号:" + deviceAddr + ",所属楼栋:" + buildingName + "----------------"); - String time1 = deviceInstallService.selectLastDate(deviceType, deviceAddr, buildingId); - if (time1 == null) { - time1 = dateStr; - } - int d = ExchangeStringUtil.compareCopyTime(time1, dateStr); - if (d == 1) { - deviceInstallService.updateNotOnline(deviceAddr, deviceType, buildingId, "离线"); //所有设备离线 - if (deviceType.equals("热泵")) { - nowDataService.updateRunState(buildingId, deviceAddr, "离线", buildingName); //监控界面状态表热泵在线状态 - } - } - continue; + log.info("串口{}没有数据返回!{}", serialPort.getName(), i); + SendAndReceiveByTcp.printLog(deviceAddr, deviceType, buildingId, buildingName, dateStr, log, deviceInstallService, nowDataService); } // 处理返回来的数据报文 dealReceiveData(dateStr, serialPort, i, deviceAddr, deviceType, registerAddr, brand, buildingId, buildingName, bytes, device); @@ -145,7 +134,7 @@ public class SendAndReceiveByCom { } finally { if (null != serialPort) { SerialTool.closePort(serialPort); - log.info("关闭串口==" + serialPort.getName()); + log.info("关闭串口=={}", serialPort.getName()); } } } diff --git a/user-service/src/main/java/com/mh/user/serialport/SerialPortSendReceive2.java b/user-service/src/main/java/com/mh/user/serialport/SerialPortSendReceive2.java index 02b6f7d..3e28d28 100644 --- a/user-service/src/main/java/com/mh/user/serialport/SerialPortSendReceive2.java +++ b/user-service/src/main/java/com/mh/user/serialport/SerialPortSendReceive2.java @@ -5,6 +5,7 @@ import com.mh.user.constants.Constant; import com.mh.user.entity.DeviceCodeParamEntity; import com.mh.user.entity.SysParamEntity; import com.mh.user.service.*; +import com.mh.user.tcp.SendAndReceiveByTcp; import com.mh.user.utils.*; import lombok.extern.slf4j.Slf4j; import org.springframework.context.ApplicationContext; @@ -114,19 +115,7 @@ public class SerialPortSendReceive2 { if (bytes == null) { SerialTool.closePort(serialPort); log.info("串口" + serialPort.getName() + "没有数据返回!" + i); - log.info("----------------" + deviceType + "离线,设备号:" + deviceAddr + ",所属楼栋:" + buildingName + "----------------"); - String time1 = deviceInstallService.selectLastDate(deviceType, deviceAddr, buildingId); - if (time1 == null) { - time1 = dateStr; - } - int d = ExchangeStringUtil.compareCopyTime(time1, dateStr); - if (d == 1) { - deviceInstallService.updateNotOnline(deviceAddr, deviceType, buildingId, "离线"); //所有设备离线 - if (deviceType.equals("热泵")) { - nowDataService.updateRunState(buildingId, deviceAddr, "离线", buildingName); //监控界面状态表热泵在线状态 - } - } - continue; + SendAndReceiveByTcp.printLog(deviceAddr, deviceType, buildingId, buildingName, dateStr, log, deviceInstallService, nowDataService); } // 处理返回来的数据报文 dealReceiveData(dateStr, serialPort, i, deviceAddr, deviceType, registerAddr, brand, buildingId, buildingName, bytes); diff --git a/user-service/src/main/java/com/mh/user/serialport/SerialPortSingle2.java b/user-service/src/main/java/com/mh/user/serialport/SerialPortSingle2.java index f851a5f..9ebbb78 100644 --- a/user-service/src/main/java/com/mh/user/serialport/SerialPortSingle2.java +++ b/user-service/src/main/java/com/mh/user/serialport/SerialPortSingle2.java @@ -3,6 +3,7 @@ package com.mh.user.serialport; import com.mh.common.utils.StringUtils; import com.mh.user.constants.Constant; import com.mh.user.entity.DeviceCodeParamEntity; +import com.mh.user.entity.GatewayManageEntity; import com.mh.user.factory.Device; import com.mh.user.factory.DeviceFactory; import com.mh.user.service.BuildingService; @@ -10,12 +11,15 @@ import com.mh.user.service.DeviceInstallService; import com.mh.user.service.NowDataService; import com.mh.user.strategy.DeviceStrategy; import com.mh.user.strategy.DeviceStrategyFactory; +import com.mh.user.tcp.SendAndReceiveByTcp; +import com.mh.user.tcp.TcpSingle; import com.mh.user.utils.*; import lombok.extern.slf4j.Slf4j; import org.springframework.context.ApplicationContext; import java.io.IOException; import java.util.Date; +import java.util.List; import purejavacomm.SerialPort; @@ -36,6 +40,24 @@ public class SerialPortSingle2 { BuildingService buildingService = context.getBean(BuildingService.class); public String serialPortSend(DeviceCodeParamEntity deviceCodeParamEntity) { + // 增加一个判断,判断是否tcp通信 + // 增加判断是什么通讯类型:realCom、tcp + CacheUtil instance = CacheUtil.getInstance(); + List gwList = instance.getGatewayInfo(); + if (gwList != null && !gwList.isEmpty()) { + for (GatewayManageEntity gw : gwList) { + if (gw.getDataCom().toUpperCase().equals(deviceCodeParamEntity.getDataCom().toUpperCase())) { + String communityType = gw.getCommunityType(); + if (Constant.COMMUNITY_TYPE_TCP.equals(communityType)) { + TcpSingle tcpSingle = new TcpSingle(); + return tcpSingle.serialPortSend(deviceCodeParamEntity, gw); + } else if (Constant.COMMUNITY_TYPE_REAL_COM.equals(communityType)) { + // 直接跳出判断,进行下一步 + break; + } + } + } + } SerialPort serialPort = null; String rtData = "fail"; String comName = deviceCodeParamEntity.getDataCom().toUpperCase(); diff --git a/user-service/src/main/java/com/mh/user/serialport/SerialPortThread.java b/user-service/src/main/java/com/mh/user/serialport/SerialPortThread.java index d6ba39f..31b83f1 100644 --- a/user-service/src/main/java/com/mh/user/serialport/SerialPortThread.java +++ b/user-service/src/main/java/com/mh/user/serialport/SerialPortThread.java @@ -1,8 +1,13 @@ package com.mh.user.serialport; +import com.mh.user.constants.Constant; +import com.mh.user.entity.GatewayManageEntity; +import com.mh.user.tcp.SendAndReceiveByTcp; +import com.mh.user.utils.CacheUtil; import gnu.io.SerialPort; import lombok.extern.slf4j.Slf4j; +import java.util.List; import java.util.concurrent.CountDownLatch; /** @@ -35,13 +40,36 @@ public class SerialPortThread implements Runnable{ public void run(){ log.info("创建发送接收数据线程>>>>>>>>>>>>>>" + thread); - SendAndReceiveByCom sendAndReceiveByCom = new SendAndReceiveByCom(); - try { - sendAndReceiveByCom.sendAndReceive(name, thread); - } catch (Exception e) { - log.error("串口通信发生异常: ", e); - } finally { - this.countDownLatch.countDown(); + // 增加判断是什么通讯类型:realCom、tcp + CacheUtil instance = CacheUtil.getInstance(); + List gwList = instance.getGatewayInfo(); + if (gwList != null && !gwList.isEmpty()) { + for (GatewayManageEntity gw : gwList) { + if (gw.getDataCom().toUpperCase().equals("COM"+thread)) { + String communityType = gw.getCommunityType(); + if (Constant.COMMUNITY_TYPE_REAL_COM.equals(communityType)) { + SendAndReceiveByCom sendAndReceiveByCom = new SendAndReceiveByCom(); + try { + sendAndReceiveByCom.sendAndReceive(name, thread); + } catch (Exception e) { + log.error("串口通信发生异常: ", e); + } finally { + this.countDownLatch.countDown(); + } + } else if (Constant.COMMUNITY_TYPE_TCP.equals(communityType)) { + // 开始TCP通信 + SendAndReceiveByTcp sendAndReceiveByTcp = new SendAndReceiveByTcp(); + try { + sendAndReceiveByTcp.sendAndReceive(name, thread, gw.getHeartBeat()); + } catch (Exception e) { + log.error("TCP通信发生异常: ", e); + } finally { + this.countDownLatch.countDown(); + } + } + } + } } + } } diff --git a/user-service/src/main/java/com/mh/user/service/GatewayManageService.java b/user-service/src/main/java/com/mh/user/service/GatewayManageService.java index 77b892a..12bbf3a 100644 --- a/user-service/src/main/java/com/mh/user/service/GatewayManageService.java +++ b/user-service/src/main/java/com/mh/user/service/GatewayManageService.java @@ -50,4 +50,7 @@ public interface GatewayManageService { // 根据grade查询对应的网关路由信息 GatewayManageEntity queryGatewayByGrade(Long grade); + void updateGatewayManageOnlineByHeartBeatCode(String heartBeat, int status); + + void updateGatewayManageOnlineByImei(String imei, int status); } diff --git a/user-service/src/main/java/com/mh/user/service/SysLogService.java b/user-service/src/main/java/com/mh/user/service/SysLogService.java index 4ebbfad..627bfae 100644 --- a/user-service/src/main/java/com/mh/user/service/SysLogService.java +++ b/user-service/src/main/java/com/mh/user/service/SysLogService.java @@ -22,4 +22,6 @@ public interface SysLogService extends CurdService { SysLog logInfo(int id); SysParamEntity selectSysParam(); + + void insertLog(SysLog sysLog); } diff --git a/user-service/src/main/java/com/mh/user/service/impl/DeviceDisplayServiceImpl.java b/user-service/src/main/java/com/mh/user/service/impl/DeviceDisplayServiceImpl.java index baef948..1c8b5c7 100644 --- a/user-service/src/main/java/com/mh/user/service/impl/DeviceDisplayServiceImpl.java +++ b/user-service/src/main/java/com/mh/user/service/impl/DeviceDisplayServiceImpl.java @@ -36,50 +36,50 @@ public class DeviceDisplayServiceImpl implements DeviceDisplayService { return deviceDisplayMapper.queryDeviceStatus(deviceType); } - /** - * 网关管理服务类 - * author:ljf - * create—date:2020-05-21 - */ - public static interface GatewayManageService { - - /** - * 根据条件查询网关信息 - * @param grade - * @param operator - * @return - */ - List queryByOther(Integer grade, Integer operator); - - /** - * 新增或更新网关信息 - * @param gatewayManageEntity - */ - void addOrUpdateGateWayInfo(GatewayManageEntity gatewayManageEntity); - - // 查询全部 - List queryAll(String gatewayID, String operator, String grade, int page, int limit); - - // 查询记录数 - int queryCount(String gatewayID, String operator, String grade); - - // 添加网关设备 - String insertGatewayManage(GatewayManageEntity gatewayManageEntity); - - // 根据网关ID删除网关设备 - void deleteGatewayManageByID(int gatewayID); - - // 根据网关ID查询设备信息 - GatewayManageEntity queryGatewayByID(Long gatewayID); - - // 设备总数 - int queryByOtherCount(int page, int size, int gatewayID); - - // 更新网关管理器的最新连接数据 - void updateGatewayManage(String IP, String port); - - // 根据grade查询对应的网关路由信息 - GatewayManageEntity queryGatewayByGrade(Long grade); - - } +// /** +// * 网关管理服务类 +// * author:ljf +// * create—date:2020-05-21 +// */ +// public static interface GatewayManageService { +// +// /** +// * 根据条件查询网关信息 +// * @param grade +// * @param operator +// * @return +// */ +// List queryByOther(Integer grade, Integer operator); +// +// /** +// * 新增或更新网关信息 +// * @param gatewayManageEntity +// */ +// void addOrUpdateGateWayInfo(GatewayManageEntity gatewayManageEntity); +// +// // 查询全部 +// List queryAll(String gatewayID, String operator, String grade, int page, int limit); +// +// // 查询记录数 +// int queryCount(String gatewayID, String operator, String grade); +// +// // 添加网关设备 +// String insertGatewayManage(GatewayManageEntity gatewayManageEntity); +// +// // 根据网关ID删除网关设备 +// void deleteGatewayManageByID(int gatewayID); +// +// // 根据网关ID查询设备信息 +// GatewayManageEntity queryGatewayByID(Long gatewayID); +// +// // 设备总数 +// int queryByOtherCount(int page, int size, int gatewayID); +// +// // 更新网关管理器的最新连接数据 +// void updateGatewayManage(String IP, String port); +// +// // 根据grade查询对应的网关路由信息 +// GatewayManageEntity queryGatewayByGrade(Long grade); +// +// } } diff --git a/user-service/src/main/java/com/mh/user/service/impl/GatewayManageServiceImpl.java b/user-service/src/main/java/com/mh/user/service/impl/GatewayManageServiceImpl.java index 0973c14..e8ef4ea 100644 --- a/user-service/src/main/java/com/mh/user/service/impl/GatewayManageServiceImpl.java +++ b/user-service/src/main/java/com/mh/user/service/impl/GatewayManageServiceImpl.java @@ -2,6 +2,7 @@ package com.mh.user.service.impl; import com.mh.user.entity.GatewayManageEntity; import com.mh.user.mapper.GatewayManageMapper; +import com.mh.user.service.GatewayManageService; import org.springframework.stereotype.Service; import java.util.List; @@ -14,7 +15,7 @@ import java.util.List; * @throws : */ @Service -public class GatewayManageServiceImpl implements DeviceDisplayServiceImpl.GatewayManageService { +public class GatewayManageServiceImpl implements GatewayManageService { // 通过构造函数注入,引用Mapper private final GatewayManageMapper gatewayManageMapper; @@ -97,4 +98,14 @@ public class GatewayManageServiceImpl implements DeviceDisplayServiceImpl.Gatewa public GatewayManageEntity queryGatewayByGrade(Long grade) { return gatewayManageMapper.queryGatewayByGrade(grade); } + + @Override + public void updateGatewayManageOnlineByHeartBeatCode(String heartBeat, int status) { + gatewayManageMapper.updateGatewayManageOnlineByHeartBeatCode(heartBeat, status); + } + + @Override + public void updateGatewayManageOnlineByImei(String imei, int status) { + gatewayManageMapper.updateGatewayManageOnlineByImei(imei, status); + } } diff --git a/user-service/src/main/java/com/mh/user/service/impl/SysLogServiceImpl.java b/user-service/src/main/java/com/mh/user/service/impl/SysLogServiceImpl.java index c17a92a..7ca4c57 100644 --- a/user-service/src/main/java/com/mh/user/service/impl/SysLogServiceImpl.java +++ b/user-service/src/main/java/com/mh/user/service/impl/SysLogServiceImpl.java @@ -73,4 +73,9 @@ public class SysLogServiceImpl implements SysLogService { public SysParamEntity selectSysParam() { return sysLogMapper.selectSysParam(); } + + @Override + public void insertLog(SysLog sysLog) { + sysLogMapper.insert(sysLog); + } } diff --git a/user-service/src/main/java/com/mh/user/tcp/SendAndReceiveByTcp.java b/user-service/src/main/java/com/mh/user/tcp/SendAndReceiveByTcp.java new file mode 100644 index 0000000..d91474f --- /dev/null +++ b/user-service/src/main/java/com/mh/user/tcp/SendAndReceiveByTcp.java @@ -0,0 +1,204 @@ +package com.mh.user.tcp; + +import com.mh.common.utils.StringUtils; +import com.mh.user.constants.Constant; +import com.mh.user.entity.DeviceCodeParamEntity; +import com.mh.user.factory.Device; +import com.mh.user.factory.DeviceFactory; +import com.mh.user.netty.session.ServerSession; +import com.mh.user.netty.session.SessionMap; +import com.mh.user.serialport.SerialTool; +import com.mh.user.service.BuildingService; +import com.mh.user.service.DeviceInstallService; +import com.mh.user.service.NowDataService; +import com.mh.user.strategy.DeviceStrategy; +import com.mh.user.strategy.DeviceStrategyFactory; +import com.mh.user.utils.*; +import lombok.extern.slf4j.Slf4j; +import org.slf4j.Logger; +import org.springframework.context.ApplicationContext; +import purejavacomm.SerialPort; + +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +/** + * @author LJF + * @version 1.0 + * @project CHWS + * @description 通过tcp发送和接收数据 + * @date 2024-03-18 14:56:32 + */ +@Slf4j +public class SendAndReceiveByTcp { + + List deviceManageEntityList; + + ApplicationContext context = SpringBeanUtil.getApplicationContext(); + DeviceInstallService deviceInstallService = context.getBean(DeviceInstallService.class); + NowDataService nowDataService = context.getBean(NowDataService.class); + BuildingService buildingService = context.getBean(BuildingService.class); + + public void sendAndReceive(String sort, String thread, String heartBeat) { + CacheUtil cacheUtil = CacheUtil.getInstance(); + try { + //生成对应的采集指令 + List deviceParamsByType = cacheUtil.getDeviceParamsByType(sort); + deviceManageEntityList = deviceParamsByType + .parallelStream() + .filter(value -> value.getThread().equals(thread)) + .sorted(Comparator.comparing(DeviceCodeParamEntity::getDataCom)) + .collect(Collectors.toList()); + int size = deviceManageEntityList.size(); + for (int i = 0; i < size; i++) { + //判断网页端是否有操作设备的 + if (Constant.WEB_FLAG) { + log.info("有指令下发退出定时采集"); + break; + } + String deviceAddr = deviceManageEntityList.get(i).getDeviceAddr(); + String deviceType = deviceManageEntityList.get(i).getDeviceType(); + String registerAddr = deviceManageEntityList.get(i).getRegisterAddr(); + String brand = deviceManageEntityList.get(i).getBrand();//品牌 + String buildingId = deviceManageEntityList.get(i).getBuildingId(); + String buildingName = buildingService.queryBuildingName(buildingId); //查询楼栋名称 + // 创建设备报文 + Device device = DeviceFactory.createDevice(deviceType); + DeviceStrategy strategy = DeviceStrategyFactory.createStrategy(deviceType); + if (null == strategy) { + continue; + } + device.setStrategy(strategy); + String sendStr = device.createOrders(deviceManageEntityList.get(i)); + + try { + //向串口发送指令 + if (StringUtils.isBlank(sendStr)) { + continue; + } + if (deviceType.equals("热泵")) { + for (int j = 0; j < 4; j++) { + Thread.sleep(1000); + // 判断网页端是否有操作设备的 + if (Constant.WEB_FLAG) { + log.info("有指令下发退出定时采集"); + break; + } + } + } else { + Thread.sleep(2000); + } + //从获取响应数据 + ConcurrentHashMap map = SessionMap.inst().getMap(); + Set> entries = map.entrySet(); + boolean flag = false; + String keyVal = null; + for (Map.Entry entry : entries) { + String key = entry.getKey(); + //log.info("当前session:{},当前sessionId:{},当前在线设备数:{}",entry,entry.getValue().getSessionId(),map.size()); + if (key.contains(heartBeat)) { + flag = true; + keyVal = key; + break; + } + } + if (flag) { + ServerSession serverSession = map.get(keyVal); + serverSession.getChannel().writeAndFlush(NettyTools.createByteBuf(sendStr)); + NettyTools.initReceiveMsg(heartBeat); + String receiveMsg = NettyTools.waitReceiveMsg(heartBeat); + Date date1 = new Date(); + String dateStr = DateUtil.dateToString(date1, "yyyy-MM-dd HH:mm:ss"); + if (StringUtils.isBlank(receiveMsg)) { + log.info("TCP客户端:{},没有数据返回!{}", keyVal, i); + printLog(deviceAddr, deviceType, buildingId, buildingName, dateStr, log, deviceInstallService, nowDataService); + continue; + } + // 处理返回来的数据报文 + dealReceiveData(dateStr, keyVal, i, deviceAddr, deviceType, registerAddr, brand, buildingId, buildingName, receiveMsg, device); + } else { + log.error("没有找到心跳包数据:{}", heartBeat); + } + } catch (Exception e) { + log.error("发送窗口数据异常==>", e); + } + } + } catch (Exception e) { + log.error("-------------串口采集异常!----------->>", e); + } + } + + public static void printLog(String deviceAddr, String deviceType, String buildingId, String buildingName, String dateStr, Logger log, DeviceInstallService deviceInstallService, NowDataService nowDataService) { + log.info("----------------{}离线,设备号:{},所属楼栋:{}----------------", deviceType, deviceAddr, buildingName); + String time1 = deviceInstallService.selectLastDate(deviceType, deviceAddr, buildingId); + if (time1 == null) { + time1 = dateStr; + } + int d = ExchangeStringUtil.compareCopyTime(time1, dateStr); + if (d == 1) { + deviceInstallService.updateNotOnline(deviceAddr, deviceType, buildingId, "离线"); //所有设备离线 + if (deviceType.equals("热泵")) { + nowDataService.updateRunState(buildingId, deviceAddr, "离线", buildingName); //监控界面状态表热泵在线状态 + } + } + return; + } + + /** + * 处理返回来的数据 + * + * @param dateStr + * @param serialPort + * @param i + * @param deviceAddr + * @param deviceType + * @param registerAddr + * @param brand + * @param buildingId + * @param buildingName + * @param receiveStr + * @throws InterruptedException + */ + private void dealReceiveData(String dateStr, + String serialPort, + int i, + String deviceAddr, + String deviceType, + String registerAddr, + String brand, + String buildingId, + String buildingName, String receiveStr, Device device) { + try { + //去掉空格和null + receiveStr = receiveStr.replace("null", ""); + receiveStr = receiveStr.replace(" ", ""); + log.info("TCP客户端:{},接受第{}数据:{},大小: {}", serialPort, i, receiveStr, receiveStr.length()); + //返回值全部变成大写 + String receiveData = receiveStr.toUpperCase(); + //截取去掉FE + String dataStr; + if (receiveData.length() > 8) { + String str1 = receiveData.substring(0, 8); + String str2 = receiveData.substring(8); + dataStr = str1.replace("FE", "") + str2; + } else { + dataStr = receiveData.replace("FE", ""); + } + deviceInstallService.updateOnline(deviceAddr, deviceType, buildingId, "在线"); //设备在线 + log.info("----------------{}在线,设备号:{},所属楼栋:{}----------------", deviceType, deviceAddr, buildingName); + if (deviceType.equals("热泵")) { + String strState = nowDataService.selectState(buildingId, deviceAddr); + if (strState != null && strState.equals("离线")) { //采集到数据 + nowDataService.updateRunState(buildingId, deviceAddr, "不运行", buildingName); //监控界面状态表热泵在线状态 + } + } + // 解析返回来的数据 + device.analysisReceiveData(dateStr, deviceType, registerAddr, brand, buildingId, buildingName, dataStr); + + } catch (Exception e) { + log.error("楼栋:{}设备类型:{}保存数据库失败!{}", buildingName, deviceType, i, e); + } + } + +} diff --git a/user-service/src/main/java/com/mh/user/tcp/TcpSingle.java b/user-service/src/main/java/com/mh/user/tcp/TcpSingle.java new file mode 100644 index 0000000..47daf0b --- /dev/null +++ b/user-service/src/main/java/com/mh/user/tcp/TcpSingle.java @@ -0,0 +1,126 @@ +package com.mh.user.tcp; + +import com.mh.common.utils.StringUtils; +import com.mh.user.constants.Constant; +import com.mh.user.entity.DeviceCodeParamEntity; +import com.mh.user.entity.GatewayManageEntity; +import com.mh.user.factory.Device; +import com.mh.user.factory.DeviceFactory; +import com.mh.user.netty.session.ServerSession; +import com.mh.user.netty.session.SessionMap; +import com.mh.user.serialport.SerialTool; +import com.mh.user.service.BuildingService; +import com.mh.user.service.DeviceInstallService; +import com.mh.user.service.NowDataService; +import com.mh.user.strategy.DeviceStrategy; +import com.mh.user.strategy.DeviceStrategyFactory; +import com.mh.user.utils.*; +import lombok.extern.slf4j.Slf4j; +import org.springframework.context.ApplicationContext; +import purejavacomm.SerialPort; + +import java.io.IOException; +import java.util.Date; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import static com.mh.user.tcp.SendAndReceiveByTcp.printLog; + +/** + * @author nxr + * @title : + * @description : 串口发送和接收处理,操作类 + * @updateTime 2022-08-10 + * @throws : + */ +@Slf4j +public class TcpSingle { + + // 调用service + ApplicationContext context = SpringBeanUtil.getApplicationContext(); + DeviceInstallService deviceInstallService = context.getBean(DeviceInstallService.class); + NowDataService nowDataService = context.getBean(NowDataService.class); + BuildingService buildingService = context.getBean(BuildingService.class); + + public String serialPortSend(DeviceCodeParamEntity deviceCodeParamEntity, GatewayManageEntity gatewayManageEntity) { + String rtData = "fail"; + String comName = deviceCodeParamEntity.getDataCom().toUpperCase(); + if (StringUtils.isBlank(comName)) { + return rtData; + } + try { + // 创建设备报文 + Device device = DeviceFactory.createDevice(deviceCodeParamEntity.getDeviceType()); + DeviceStrategy strategy = DeviceStrategyFactory.createStrategy(deviceCodeParamEntity.getDeviceType()); + if (null == strategy) { + return rtData; + } + device.setStrategy(strategy); + String sendStr = device.createOrders(deviceCodeParamEntity); + //从获取响应数据 + ConcurrentHashMap map = SessionMap.inst().getMap(); + Set> entries = map.entrySet(); + boolean flag = false; + String keyVal = null; + String heartBeat = gatewayManageEntity.getHeartBeat().toUpperCase(); + for (Map.Entry entry : entries) { + String key = entry.getKey(); + //log.info("当前session:{},当前sessionId:{},当前在线设备数:{}",entry,entry.getValue().getSessionId(),map.size()); + if (key.contains(heartBeat)) { + flag = true; + keyVal = key; + break; + } + } + if (flag) { + ServerSession serverSession = map.get(keyVal); + serverSession.getChannel().writeAndFlush(NettyTools.createByteBuf(sendStr)); + NettyTools.initReceiveMsg(heartBeat); + String receiveStr = ""; + if (Constant.WEB_FLAG) { + receiveStr = NettyTools.waitReceiveMsg(heartBeat); + } + if (StringUtils.isBlank(receiveStr)) { + return Constant.FAIL; + } + receiveStr = receiveStr.replace("null", "").replace(" ", ""); + log.info("TCP客户端" + heartBeat + "接收数据:" + receiveStr + ",大小: " + receiveStr.length()); + //返回值全部变成大写 + String receiveData = receiveStr.toUpperCase(); + //截取去掉FE + String deviceType = deviceCodeParamEntity.getDeviceType(); + String deviceAddr = deviceCodeParamEntity.getDeviceAddr(); + String dataStr = ""; + if (receiveData.length() > 8 && ("水表".equals(deviceType) || "电表".equals(deviceType))) { + String str1 = receiveData.substring(0, 8); + String str2 = receiveData.substring(8); + dataStr = str1.replace("FE", "") + str2; + } else { + dataStr = receiveData; + } + String registerAddr = deviceCodeParamEntity.getRegisterAddr(); + String brand = deviceCodeParamEntity.getBrand(); + String buildingId = deviceCodeParamEntity.getBuildingId(); + String buildingName = buildingService.queryBuildingName(buildingId); //查询楼栋名称 + + deviceInstallService.updateOnline(deviceAddr, deviceType, buildingId, "在线"); //设备在线 + log.info("{}在线,设备号:{},所属楼栋:{}", deviceType, deviceAddr, buildingName); + if (deviceType.equals("热泵")) { + String strState = nowDataService.selectState(buildingId, deviceAddr); + if (strState != null && strState.equals("离线")) { //采集到数据 + nowDataService.updateRunState(buildingId, deviceAddr, "不运行", buildingName); //监控界面状态表热泵在线状态 + } + } + rtData = device.analysisReceiveData(DateUtil.dateToString(new Date(), "yyyy-MM-dd HH:mm:ss"), + deviceType, registerAddr, brand, buildingId, buildingName, dataStr); + return rtData; + } else { + log.error("没有找到心跳包数据:{}", heartBeat); + } + } catch (Exception e) { + log.info("单抄TCP客户端:{}异常,没有数据返回!关闭串口", gatewayManageEntity.getHeartBeat(), e); + } + return Constant.FAIL; + } +} diff --git a/user-service/src/main/java/com/mh/user/utils/CacheUtil.java b/user-service/src/main/java/com/mh/user/utils/CacheUtil.java index f65967e..158b75a 100644 --- a/user-service/src/main/java/com/mh/user/utils/CacheUtil.java +++ b/user-service/src/main/java/com/mh/user/utils/CacheUtil.java @@ -4,7 +4,9 @@ import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONObject; import com.github.benmanes.caffeine.cache.Cache; import com.mh.user.entity.DeviceCodeParamEntity; +import com.mh.user.entity.GatewayManageEntity; import com.mh.user.service.DeviceCodeParamService; +import com.mh.user.service.GatewayManageService; import org.springframework.context.ApplicationContext; import java.util.List; @@ -22,17 +24,29 @@ public class CacheUtil { Cache caffeineCache = (Cache) context.getBean("caffeineCache"); DeviceCodeParamService deviceCodeParamService = context.getBean(DeviceCodeParamService.class); + GatewayManageService gatewayManageService = context.getBean(GatewayManageService.class); private static class SingletonHolder{ private static final CacheUtil instance=new CacheUtil(); } private CacheUtil(){ + // 缓存采集信息 createDeviceParams(); + // 缓存网关信息数据 + saveGatewayInfo(); } + public static CacheUtil getInstance(){ return SingletonHolder.instance; } + private void saveGatewayInfo() { + if (caffeineCache.getIfPresent("gw") == null) { + List gatewayManageEntities = gatewayManageService.queryByOther(null, null); + caffeineCache.put("gw", gatewayManageEntities); + } + } + /** * 把采集ddc设置的报文存入到缓存中 */ @@ -70,6 +84,17 @@ public class CacheUtil { return JSONArray.parseArray(JSONObject.toJSONString(cacheObject), DeviceCodeParamEntity.class); } + public List getGatewayInfo() { + Object cacheObject = caffeineCache.getIfPresent("gw"); + // 如果为空,重新添加 + if (cacheObject == null) { + saveGatewayInfo(); + // 在重新获取数据 + cacheObject = caffeineCache.getIfPresent("gw"); + } + return JSONArray.parseArray(JSONObject.toJSONString(cacheObject), GatewayManageEntity.class); + } + /** * 删除缓存 */ diff --git a/user-service/src/main/java/com/mh/user/utils/NettyTools.java b/user-service/src/main/java/com/mh/user/utils/NettyTools.java new file mode 100644 index 0000000..cb35d18 --- /dev/null +++ b/user-service/src/main/java/com/mh/user/utils/NettyTools.java @@ -0,0 +1,101 @@ +package com.mh.user.utils; + + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.mh.common.utils.StringUtils; +import com.mh.user.constants.Constant; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; +import lombok.extern.slf4j.Slf4j; + +import java.util.Objects; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +/** + * @author LJF + * @version 1.0 + * @project TAD_Server + * @description 缓存等待数据 + * @date 2023/7/4 08:45:16 + */ +@Slf4j +public class NettyTools { + + public static ByteBuf getByteBuf(ChannelHandlerContext ctx, String sendStr) { + // byte类型的数据 +// String sendStr = "5803004900021914"; // 冷量计 + // 申请一个数据结构存储信息 + ByteBuf buffer = ctx.alloc().buffer(); + // 将信息放入数据结构中 + buffer.writeBytes(ExchangeStringUtil.hexStrToBinaryStr(sendStr));//对接需要16进制 + return buffer; + } + + public static ByteBuf createByteBuf(String sendStr) { + // byte类型的数据 +// String sendStr = "5803004900021914"; // 冷量计 + // 申请一个数据结构存储信息 + ByteBuf buffer = Unpooled.buffer(); + // 将信息放入数据结构中 + buffer.writeBytes(ExchangeStringUtil.hexStrToBinaryStr(sendStr));//对接需要16进制 + return buffer; + } + + /** + * 响应消息缓存 + */ + private static final Cache> responseMsgCache = CacheBuilder.newBuilder() + .maximumSize(50000) + .expireAfterWrite(1000, TimeUnit.SECONDS) + .build(); + + + /** + * 等待响应消息 + * @param key 消息唯一标识 + * @return ReceiveDdcMsgVo + */ + public static String waitReceiveMsg(String key) { + + try { + //设置超时时间 + String vo = Objects.requireNonNull(responseMsgCache.getIfPresent(key)) + .poll(1000 * 10, TimeUnit.MILLISECONDS); + + //删除key + responseMsgCache.invalidate(key); + return vo; + } catch (Exception e) { + log.error("获取数据异常,sn={},msg=null",key); + return Constant.FAIL; + } + + } + + /** + * 初始化响应消息的队列 + * @param key 消息唯一标识 + */ + public static void initReceiveMsg(String key) { + responseMsgCache.put(key,new LinkedBlockingQueue(1)); + } + + /** + * 设置响应消息 + * @param key 消息唯一标识 + */ + public static void setReceiveMsg(String key, String msg) { + + if(responseMsgCache.getIfPresent(key) != null){ + responseMsgCache.getIfPresent(key).add(msg); + return; + } + + log.warn("sn {}不存在",key); + } + +}