diff --git a/user-service/pom.xml b/user-service/pom.xml
index 1903c1f..c706553 100644
--- a/user-service/pom.xml
+++ b/user-service/pom.xml
@@ -173,6 +173,13 @@
1.0.0
+
+
+ io.netty
+ netty-all
+ 4.1.86.Final
+
+
diff --git a/user-service/src/main/java/com/mh/user/constants/Constant.java b/user-service/src/main/java/com/mh/user/constants/Constant.java
index b878034..b35f342 100644
--- a/user-service/src/main/java/com/mh/user/constants/Constant.java
+++ b/user-service/src/main/java/com/mh/user/constants/Constant.java
@@ -13,6 +13,8 @@ public class Constant {
public static final CharSequence CUSTOM_NAME_GUANGSHANG = "广州商学院";
public static final CharSequence CUSTOM_NAME_HUARUAN = "广州软件学院";
public static final String WEATHER_DATA = "weather_data";
+ public static final String COMMUNITY_TYPE_REAL_COM = "realCom";
+ public static final String COMMUNITY_TYPE_TCP = "tcp";
public static boolean CONTROL_WEB_FLAG = false;
public static boolean SEND_STATUS = false; // 指令发送状态
public static volatile boolean FLAG = false;
diff --git a/user-service/src/main/java/com/mh/user/constants/FourthGEnum.java b/user-service/src/main/java/com/mh/user/constants/FourthGEnum.java
new file mode 100644
index 0000000..add3d7f
--- /dev/null
+++ b/user-service/src/main/java/com/mh/user/constants/FourthGEnum.java
@@ -0,0 +1,111 @@
+package com.mh.user.constants;
+
+/**
+ * @author LJF
+ * @version 1.0
+ * @project TAD_Server
+ * @description 4G设备常量值
+ * @date 2023/7/3 18:14:12
+ */
+public enum FourthGEnum {
+
+ LOGIN_HEART_RESPONSE("0D", "登录和心跳确认控制码"),
+
+ LOGIN_HEART_REQUEST("8D", "登录和心跳请求控制码"),
+
+ LOGIN_HEART_EXCEPTION("CD", "登录和心跳异常应答控制码"),
+
+ LOGIN_TYPE("02", "登录类型"),
+
+ HEART_TYPE("01", "心跳类型"),
+
+ CLOSE_CONTROL_TYPE("1C", "合闸指令"),
+
+ OPEN_CONTROL_TYPE("1A", "跳闸指令"),
+
+ DATA_ACTIVE_UPLOAD("8E", "数据主动上报控制码"),
+
+ PULL_AND_CLOSE_DEVICE_REQUEST("1C", "跳合闸请求控制码"),
+
+ PULL_AND_CLOSE_DEVICE_RESPONSE("9C", "跳合闸应答控制码"),
+
+ PULL_AND_CLOSE_DEVICE_EXCEPTION("DC", "跳合闸应答异常控制码"),
+
+ CLEAR_DEVICE_REQUEST("1A", "电表清零请求控制码"),
+
+ CLEAR_DEVICE_RESPONSE("9A", "电表清零应答控制码"),
+
+ CLEAR_DEVICE_EXCEPTION("DA", "电表清零应答异常控制码"),
+
+ READ_DEVICE_REQUEST("11", "读电表请求控制码"),
+
+ READ_DEVICE_RESPONSE("91", "读电表应答控制码"),
+
+ READ_DEVICE_EXCEPTION("D1", "读电表应答异常控制码"),
+
+ WRITE_DEVICE_REQUEST("14", "写电表请求控制码"),
+
+ WRITE_DEVICE_RESPONSE("94", "读电表应答控制码"),
+ WRITE_DEVICE_IP("94", "写电表IP应答控制码"),
+
+ WRITE_DEVICE_EXCEPTION("D4", "读电表应答异常控制码"),
+
+ CHARGE_DEVICE_REQUEST("0F", "充值指令"),
+
+ CHARGE_DEVICE_RESPONSE("8F", "充值正确应答控制码"),
+
+ CHARGE_DEVICE_EXCEPTION("CF", "充值异常应答控制码"),
+
+ READ_TOTAL_CHARGE_IDENTIFY_CODE("00000000", "当前组合有功总电能"),
+
+ READ_STATUS_IDENTIFY_CODE("03050004", "读电表运行状态"),
+ READ_UPLOAD_CODE("02008104", "读取上报报文"),
+
+ TY_UPLOAD_CODE("55C2", "腾越水表上报标识"),
+
+ WRITE_CHARGE_TIME_IDENTIFY_CODE("07008104", "写入充值次数"),
+
+ WRITE_LIMIT_POWER("10008104","写入限容功率"),
+ READ_LIMIT_POWER("10008104","读取限容功率"),
+ READ_SOFT_VERSION("01008004","读软件版本"),
+ READ_HARD_VERSION("02008004","读硬件版本"),
+ READ_ICCID("05008104","读取ICCID"),
+ READ_IMEI("04008104","读取IMEI"),
+ WRITE_IP("0B008104","设置IP端口号"),
+ READ_IP("0B008104","读取IP端口号"),
+ UPGRADES("13008104","触发升级,写1触发升级")
+ ;
+
+ private String code;
+
+ private String desc;
+
+ FourthGEnum(String code, String desc) {
+ this.code = code;
+ this.desc = desc;
+ }
+
+ public String getCode() {
+ return code;
+ }
+
+ public void setCode(String code) {
+ this.code = code;
+ }
+
+ public String getDesc() {
+ return desc;
+ }
+
+ public void setDesc(String desc) {
+ this.desc = desc;
+ }
+
+ @Override
+ public String toString() {
+ return "FourthGEnum{" +
+ "code='" + code + '\'' +
+ ", desc='" + desc + '\'' +
+ '}';
+ }
+}
diff --git a/user-service/src/main/java/com/mh/user/controller/GatewayManageController.java b/user-service/src/main/java/com/mh/user/controller/GatewayManageController.java
index 99176aa..de416a6 100644
--- a/user-service/src/main/java/com/mh/user/controller/GatewayManageController.java
+++ b/user-service/src/main/java/com/mh/user/controller/GatewayManageController.java
@@ -2,6 +2,7 @@ package com.mh.user.controller;
import com.mh.common.http.HttpResult;
import com.mh.user.entity.GatewayManageEntity;
+import com.mh.user.service.GatewayManageService;
import com.mh.user.service.impl.DeviceDisplayServiceImpl;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
@@ -13,7 +14,7 @@ import java.util.List;
public class GatewayManageController {
@Autowired
- private DeviceDisplayServiceImpl.GatewayManageService gatewayManageService;
+ private GatewayManageService gatewayManageService;
//保存
@PostMapping(value="/save")
diff --git a/user-service/src/main/java/com/mh/user/entity/GatewayManageEntity.java b/user-service/src/main/java/com/mh/user/entity/GatewayManageEntity.java
index 7b1788e..764826c 100644
--- a/user-service/src/main/java/com/mh/user/entity/GatewayManageEntity.java
+++ b/user-service/src/main/java/com/mh/user/entity/GatewayManageEntity.java
@@ -29,4 +29,10 @@ public class GatewayManageEntity {
private String connectDate; //最新上线连接时间
private String remarks; //备注
private String type; //操作类型
+ private String heartBeat; // 心跳包
+ private String imei; // 设备IMEI
+ private String sn; // 设备SN
+ private String dataCom; // 通讯com口
+ private String thread; // 线程名称
+ private String communityType; // 通讯 类型:realCom,tcp
}
diff --git a/user-service/src/main/java/com/mh/user/job/JobCloudAndMeter.java b/user-service/src/main/java/com/mh/user/job/JobCloudAndMeter.java
index 63d78eb..6ff9ace 100644
--- a/user-service/src/main/java/com/mh/user/job/JobCloudAndMeter.java
+++ b/user-service/src/main/java/com/mh/user/job/JobCloudAndMeter.java
@@ -1,159 +1,159 @@
-package com.mh.user.job;
-
-import com.mh.user.constants.SocketMessage;
-import com.mh.user.entity.GatewayManageEntity;
-import com.mh.user.serialport.SerialPortThread;
-import com.mh.user.service.DeviceCodeParamService;
-import com.mh.user.service.impl.DeviceDisplayServiceImpl;
-import com.mh.user.utils.GetReadOrder485;
-import lombok.SneakyThrows;
-import lombok.extern.slf4j.Slf4j;
-import org.quartz.DisallowConcurrentExecution;
-import org.quartz.Job;
-import org.quartz.JobExecutionContext;
-import org.quartz.JobExecutionException;
-import org.springframework.beans.factory.annotation.Autowired;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * @author ljf
- * @title :
- * @description : 定时采集冷量计任务
- * @updateTime 2020-05-18
- * @throws :
- */
-/**
- * :@DisallowConcurrentExecution : 此标记用在实现Job的类上面,意思是不允许并发执行.
- * :注意org.quartz.threadPool.threadCount线程池中线程的数量至少要多个,否则@DisallowConcurrentExecution不生效
- * :假如Job的设置时间间隔为3秒,但Job执行时间是5秒,设置@DisallowConcurrentExecution以后程序会等任务执行完毕以后再去执行,
- * 否则会在3秒时再启用新的线程执行
- */
-@DisallowConcurrentExecution
-@Slf4j
-public class JobCloudAndMeter implements Job {
-
- @Autowired
- private SocketMessage socketMessage;
-
- @Autowired
- DeviceDisplayServiceImpl.GatewayManageService gatewayManageService;
-
- @Autowired
- DeviceCodeParamService deviceCodeParamService;
-
- private static int taskTimes = 1;
- List gatewayDtoList=new ArrayList<>();
- GetReadOrder485 getReadOrder485=new GetReadOrder485();
-
- @SneakyThrows
- @Override
- public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
- log.info("定时采集开始>>>>>>");
- if(taskTimes==2){//2
- int r=deviceCodeParamService.queryCount2(); //查询记录数
- if (r==0){
- getReadOrder485.createOrderParam2(); //生成采集参数
- }
-
- SerialPortThread myThread = new SerialPortThread();
- Thread thread = new Thread(myThread);
- myThread.setName("2","1");
- thread.start();
- System.out.println("当前活动线程数:"+Thread.activeCount());
- Thread.currentThread().sleep(3000);//毫秒
-
-// SerialPortThread myThread2 = new SerialPortThread();
-// Thread thread2 = new Thread(myThread2);
-// myThread2.setName("2","2");
-// thread2.start();
-// System.out.println("当前活动线程数:"+Thread.activeCount());
-// Thread.currentThread().sleep(3000);//毫秒
+//package com.mh.user.job;
//
-// SerialPortThread myThread3 = new SerialPortThread();
-// Thread thread3 = new Thread(myThread3);
-// myThread3.setName("2","3");
-// thread3.start();
-// System.out.println("当前活动线程数:"+Thread.activeCount());
-// Thread.currentThread().sleep(3000);//毫秒
+//import com.mh.user.constants.SocketMessage;
+//import com.mh.user.entity.GatewayManageEntity;
+//import com.mh.user.serialport.SerialPortThread;
+//import com.mh.user.service.DeviceCodeParamService;
+//import com.mh.user.service.GatewayManageService;
+//import com.mh.user.utils.GetReadOrder485;
+//import lombok.SneakyThrows;
+//import lombok.extern.slf4j.Slf4j;
+//import org.quartz.DisallowConcurrentExecution;
+//import org.quartz.Job;
+//import org.quartz.JobExecutionContext;
+//import org.quartz.JobExecutionException;
+//import org.springframework.beans.factory.annotation.Autowired;
+//import java.util.ArrayList;
+//import java.util.List;
//
-// SerialPortThread myThread4= new SerialPortThread();
-// Thread thread4 = new Thread(myThread4);
-// myThread4.setName("2","4");
-// thread4.start();
-// System.out.println("当前活动线程数:"+Thread.activeCount());
-// Thread.currentThread().sleep(3000);//毫秒
- System.out.println("采集水、电、运行状态!"+taskTimes);
- taskTimes++;
- }else if(taskTimes==3){//5
- int r=deviceCodeParamService.queryCount3();//查询记录数
- if (r==0){
- getReadOrder485.createOrderParam3(); //生成采集参数
- }
-
- SerialPortThread myThread = new SerialPortThread();
- Thread thread = new Thread(myThread);
- myThread.setName("3","1");
- thread.start();
- System.out.println("当前活动线程数:"+Thread.activeCount());
- Thread.currentThread().sleep(3000);//毫秒
-
-// SerialPortThread myThread2 = new SerialPortThread();
-// Thread thread2 = new Thread(myThread2);
-// myThread2.setName("3","2");
-// thread2.start();
-// System.out.println("当前活动线程数:"+Thread.activeCount());
-// Thread.currentThread().sleep(3000);//毫秒
+///**
+// * @author ljf
+// * @title :
+// * @description : 定时采集冷量计任务
+// * @updateTime 2020-05-18
+// * @throws :
+// */
+///**
+// * :@DisallowConcurrentExecution : 此标记用在实现Job的类上面,意思是不允许并发执行.
+// * :注意org.quartz.threadPool.threadCount线程池中线程的数量至少要多个,否则@DisallowConcurrentExecution不生效
+// * :假如Job的设置时间间隔为3秒,但Job执行时间是5秒,设置@DisallowConcurrentExecution以后程序会等任务执行完毕以后再去执行,
+// * 否则会在3秒时再启用新的线程执行
+// */
+//@DisallowConcurrentExecution
+//@Slf4j
+//public class JobCloudAndMeter implements Job {
//
-// SerialPortThread myThread3 = new SerialPortThread();
-// Thread thread3 = new Thread(myThread3);
-// myThread3.setName("3","3");
-// thread3.start();
-// System.out.println("当前活动线程数:"+Thread.activeCount());
-// Thread.currentThread().sleep(3000);//毫秒
+// @Autowired
+// private SocketMessage socketMessage;
//
-// SerialPortThread myThread4= new SerialPortThread();
-// Thread thread4 = new Thread(myThread4);
-// myThread4.setName("3","4");
-// thread4.start();
-// System.out.println("当前活动线程数:"+Thread.activeCount());
-// Thread.currentThread().sleep(3000);//毫秒
- System.out.println("采集设定温度、设定水位、故障状态!"+taskTimes);
- taskTimes=1;
- }else if(taskTimes==1){
- int r=deviceCodeParamService.queryCount(); //查询记录数
- if (r==0){
- getReadOrder485.createOrderParam(); //生成采集参数
- }
- SerialPortThread myThread = new SerialPortThread();
- Thread thread = new Thread(myThread);
- myThread.setName("1","1");
- thread.start();
- System.out.println("当前活动线程数:"+Thread.activeCount());
- Thread.currentThread().sleep(3000);//毫秒
-
-// SerialPortThread myThread2 = new SerialPortThread();
-// Thread thread2 = new Thread(myThread2);
-// myThread2.setName("1","2");
-// thread2.start();
-// System.out.println("当前活动线程数:"+Thread.activeCount());
-// Thread.currentThread().sleep(3000);//毫秒
+// @Autowired
+// GatewayManageService gatewayManageService;
//
-// SerialPortThread myThread3 = new SerialPortThread();
-// Thread thread3 = new Thread(myThread3);
-// myThread3.setName("1","3");
-// thread3.start();
-// System.out.println("当前活动线程数:"+Thread.activeCount());
-// Thread.currentThread().sleep(3000);//毫秒
+// @Autowired
+// DeviceCodeParamService deviceCodeParamService;
//
-// SerialPortThread myThread4= new SerialPortThread();
-// Thread thread4 = new Thread(myThread4);
-// myThread4.setName("1","4");
-// thread4.start();
-// System.out.println("当前活动线程数:"+Thread.activeCount());
-// Thread.currentThread().sleep(3000);//毫秒
- System.out.println("采集水位、水温!"+taskTimes);
- taskTimes++;
- }
- }
-}
+// private static int taskTimes = 1;
+// List gatewayDtoList=new ArrayList<>();
+// GetReadOrder485 getReadOrder485=new GetReadOrder485();
+//
+// @SneakyThrows
+// @Override
+// public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
+// log.info("定时采集开始>>>>>>");
+// if(taskTimes==2){//2
+// int r=deviceCodeParamService.queryCount2(); //查询记录数
+// if (r==0){
+// getReadOrder485.createOrderParam2(); //生成采集参数
+// }
+//
+// SerialPortThread myThread = new SerialPortThread();
+// Thread thread = new Thread(myThread);
+// myThread.setName("2","1");
+// thread.start();
+// System.out.println("当前活动线程数:"+Thread.activeCount());
+// Thread.currentThread().sleep(3000);//毫秒
+//
+//// SerialPortThread myThread2 = new SerialPortThread();
+//// Thread thread2 = new Thread(myThread2);
+//// myThread2.setName("2","2");
+//// thread2.start();
+//// System.out.println("当前活动线程数:"+Thread.activeCount());
+//// Thread.currentThread().sleep(3000);//毫秒
+////
+//// SerialPortThread myThread3 = new SerialPortThread();
+//// Thread thread3 = new Thread(myThread3);
+//// myThread3.setName("2","3");
+//// thread3.start();
+//// System.out.println("当前活动线程数:"+Thread.activeCount());
+//// Thread.currentThread().sleep(3000);//毫秒
+////
+//// SerialPortThread myThread4= new SerialPortThread();
+//// Thread thread4 = new Thread(myThread4);
+//// myThread4.setName("2","4");
+//// thread4.start();
+//// System.out.println("当前活动线程数:"+Thread.activeCount());
+//// Thread.currentThread().sleep(3000);//毫秒
+// System.out.println("采集水、电、运行状态!"+taskTimes);
+// taskTimes++;
+// }else if(taskTimes==3){//5
+// int r=deviceCodeParamService.queryCount3();//查询记录数
+// if (r==0){
+// getReadOrder485.createOrderParam3(); //生成采集参数
+// }
+//
+// SerialPortThread myThread = new SerialPortThread();
+// Thread thread = new Thread(myThread);
+// myThread.setName("3","1");
+// thread.start();
+// System.out.println("当前活动线程数:"+Thread.activeCount());
+// Thread.currentThread().sleep(3000);//毫秒
+//
+//// SerialPortThread myThread2 = new SerialPortThread();
+//// Thread thread2 = new Thread(myThread2);
+//// myThread2.setName("3","2");
+//// thread2.start();
+//// System.out.println("当前活动线程数:"+Thread.activeCount());
+//// Thread.currentThread().sleep(3000);//毫秒
+////
+//// SerialPortThread myThread3 = new SerialPortThread();
+//// Thread thread3 = new Thread(myThread3);
+//// myThread3.setName("3","3");
+//// thread3.start();
+//// System.out.println("当前活动线程数:"+Thread.activeCount());
+//// Thread.currentThread().sleep(3000);//毫秒
+////
+//// SerialPortThread myThread4= new SerialPortThread();
+//// Thread thread4 = new Thread(myThread4);
+//// myThread4.setName("3","4");
+//// thread4.start();
+//// System.out.println("当前活动线程数:"+Thread.activeCount());
+//// Thread.currentThread().sleep(3000);//毫秒
+// System.out.println("采集设定温度、设定水位、故障状态!"+taskTimes);
+// taskTimes=1;
+// }else if(taskTimes==1){
+// int r=deviceCodeParamService.queryCount(); //查询记录数
+// if (r==0){
+// getReadOrder485.createOrderParam(); //生成采集参数
+// }
+// SerialPortThread myThread = new SerialPortThread();
+// Thread thread = new Thread(myThread);
+// myThread.setName("1","1");
+// thread.start();
+// System.out.println("当前活动线程数:"+Thread.activeCount());
+// Thread.currentThread().sleep(3000);//毫秒
+//
+//// SerialPortThread myThread2 = new SerialPortThread();
+//// Thread thread2 = new Thread(myThread2);
+//// myThread2.setName("1","2");
+//// thread2.start();
+//// System.out.println("当前活动线程数:"+Thread.activeCount());
+//// Thread.currentThread().sleep(3000);//毫秒
+////
+//// SerialPortThread myThread3 = new SerialPortThread();
+//// Thread thread3 = new Thread(myThread3);
+//// myThread3.setName("1","3");
+//// thread3.start();
+//// System.out.println("当前活动线程数:"+Thread.activeCount());
+//// Thread.currentThread().sleep(3000);//毫秒
+////
+//// SerialPortThread myThread4= new SerialPortThread();
+//// Thread thread4 = new Thread(myThread4);
+//// myThread4.setName("1","4");
+//// thread4.start();
+//// System.out.println("当前活动线程数:"+Thread.activeCount());
+//// Thread.currentThread().sleep(3000);//毫秒
+// System.out.println("采集水位、水温!"+taskTimes);
+// taskTimes++;
+// }
+// }
+//}
diff --git a/user-service/src/main/java/com/mh/user/mapper/GatewayManageMapper.java b/user-service/src/main/java/com/mh/user/mapper/GatewayManageMapper.java
index d263efc..db66660 100644
--- a/user-service/src/main/java/com/mh/user/mapper/GatewayManageMapper.java
+++ b/user-service/src/main/java/com/mh/user/mapper/GatewayManageMapper.java
@@ -34,6 +34,12 @@ public interface GatewayManageMapper {
@Result(column = "create_date", property = "createDate"),
@Result(column = "connect_date", property = "connectDate"),
@Result(column = "remarks", property = "remarks"),
+ @Result(column = "heart_beat", property = "heartBeat"),
+ @Result(column = "imei", property = "imei"),
+ @Result(column = "sn", property = "sn"),
+ @Result(column = "data_com", property = "dataCom"),
+ @Result(column = "thread", property = "thread"),
+ @Result(column = "community_type", property = "communityType")
})
List queryByOther(@Param("grade") Integer grade, @Param("operator") Integer operator);
@@ -97,4 +103,12 @@ public interface GatewayManageMapper {
// 根据设备类型查询数据库,找出对应的详细信息
@Select("select id, gateway_name, gateway_ip, gateway_address, data_com, connect_date, internet_card, operator, gateway_port, grade from gateway_manage where grade = #{grade}")
GatewayManageEntity queryGatewayByGrade(@Param("grade") Long grade);
+
+ // 根据设备心跳包更新设备在线状态
+ @Update("update gateway_manage set grade = #{status} where heart_beat = #{heartBeat}")
+ void updateGatewayManageOnlineByHeartBeatCode(String heartBeat, int status);
+
+ // 根据设备IMEI更新设备在线状态
+ @Update("update gateway_manage set grade = #{status} where imei = #{imei}")
+ void updateGatewayManageOnlineByImei(String imei, int status);
}
diff --git a/user-service/src/main/java/com/mh/user/netty/NettyEchoServer.java b/user-service/src/main/java/com/mh/user/netty/NettyEchoServer.java
new file mode 100644
index 0000000..5829428
--- /dev/null
+++ b/user-service/src/main/java/com/mh/user/netty/NettyEchoServer.java
@@ -0,0 +1,53 @@
+package com.mh.user.netty;
+
+import com.mh.user.netty.handle.FourthChannelInitializer;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.*;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.logging.LogLevel;
+import io.netty.handler.logging.LoggingHandler;
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * @author LJF
+ * @title :Netty
+ * @description :netty 使用
+ * @updateTime 2020-04-21
+ * @throws :
+ */
+@Slf4j
+public class NettyEchoServer {
+
+ public void bind(int port) throws Exception {
+ // accept线程组,用来接收连接
+ EventLoopGroup bossGroup = new NioEventLoopGroup(2);
+ // IO 线程组,用来处理业务逻辑
+ EventLoopGroup workerGroup = new NioEventLoopGroup(4);
+
+ try {
+ // 服务端启动引导
+ ServerBootstrap serverBootstrap = new ServerBootstrap();
+ serverBootstrap.group(bossGroup,workerGroup) // 绑定两个线程
+ .channel(NioServerSocketChannel.class) // 指定通道类型
+ .option(ChannelOption.SO_BACKLOG, 1024) // 设置TCP连接的缓冲区
+ .option(ChannelOption.SO_KEEPALIVE, true)
+ .handler(new LoggingHandler(LogLevel.INFO)) // 设置日志级别
+ .childHandler(new FourthChannelInitializer()); // 初始化handle
+ // 通过bind启动服务
+ ChannelFuture f = serverBootstrap.bind(port).sync();
+ // 阻塞主线程,知道网络服务被关闭
+ //f.channel().closeFuture().sync();
+ f.channel().closeFuture().addListener(future -> {
+ log.info("Netty服务器已关闭");
+ workerGroup.shutdownGracefully();
+ bossGroup.shutdownGracefully();
+ });
+ } catch (Exception e){
+ log.error("netty服务器异常", e);
+ workerGroup.shutdownGracefully();
+ bossGroup.shutdownGracefully();
+ }
+ }
+}
diff --git a/user-service/src/main/java/com/mh/user/netty/decoder/CustomFrameEncoder.java b/user-service/src/main/java/com/mh/user/netty/decoder/CustomFrameEncoder.java
new file mode 100644
index 0000000..4ef4b62
--- /dev/null
+++ b/user-service/src/main/java/com/mh/user/netty/decoder/CustomFrameEncoder.java
@@ -0,0 +1,63 @@
+package com.mh.user.netty.decoder;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToByteEncoder;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class CustomFrameEncoder extends MessageToByteEncoder {
+ private static final byte FRAME_START = 0x68;
+ private static final byte FRAME_END = 0x16;
+
+ @Override
+ protected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out) throws Exception {
+ log.info("数据到达:"+msg);
+ // Write first frame start
+ out.writeByte(FRAME_START);
+
+ // Write meter number
+ byte[] meterNumberBytes = hexStringToByteArray(msg.substring(2, 14)); // Extract meter number from message
+ out.writeBytes(meterNumberBytes);
+
+ // Write second frame start
+ out.writeByte(FRAME_START);
+
+ // Write control code
+ byte controlCode = hexStringToByteArray(msg.substring(14, 16))[0]; // Extract control code from message
+ out.writeByte(controlCode);
+
+ // Write data length
+ int dataLength = Integer.parseInt(msg.substring(16, 18), 16); // Extract data length from message
+ out.writeByte(dataLength);
+
+ // Write data
+ byte[] dataBytes = hexStringToByteArray(msg.substring(18, 18 + (dataLength * 2))); // Extract data from message
+ out.writeBytes(dataBytes);
+
+ // Calculate and write checksum
+ byte checksum = calculateChecksum(msg.substring(14, 18 + (dataLength * 2))); // Calculate checksum
+ out.writeByte(checksum);
+
+ // Write frame end
+ out.writeByte(FRAME_END);
+ }
+
+ private byte[] hexStringToByteArray(String hexString) {
+ int len = hexString.length();
+ byte[] data = new byte[len / 2];
+ for (int i = 0; i < len; i += 2) {
+ data[i / 2] = (byte) ((Character.digit(hexString.charAt(i), 16) << 4)
+ + Character.digit(hexString.charAt(i + 1), 16));
+ }
+ return data;
+ }
+
+ private byte calculateChecksum(String message) {
+ byte checksum = 0;
+ for (int i = 0; i < message.length(); i += 2) {
+ checksum += Integer.parseInt(message.substring(i, i + 2), 16);
+ }
+ return (byte) (checksum & 0xFF);
+ }
+}
\ No newline at end of file
diff --git a/user-service/src/main/java/com/mh/user/netty/handle/DataUploadServerHandler.java b/user-service/src/main/java/com/mh/user/netty/handle/DataUploadServerHandler.java
new file mode 100644
index 0000000..c6311e2
--- /dev/null
+++ b/user-service/src/main/java/com/mh/user/netty/handle/DataUploadServerHandler.java
@@ -0,0 +1,50 @@
+package com.mh.user.netty.handle;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.mh.user.constants.FourthGEnum;
+import com.mh.user.entity.GatewayManageEntity;
+import com.mh.user.netty.session.ServerSession;
+import com.mh.user.netty.session.SessionMap;
+import com.mh.user.utils.CacheUtil;
+import com.mh.user.utils.ExchangeStringUtil;
+import com.mh.user.utils.NettyTools;
+import com.mh.user.utils.SpringBeanUtil;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.context.ApplicationContext;
+
+import java.util.List;
+
+@Slf4j
+public class DataUploadServerHandler extends ChannelInboundHandlerAdapter {
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ //将接收到的数据转为字符串,此字符串就是客户端发送的字符串
+ String receiveStr = ExchangeStringUtil.bytesToHexString((byte[]) msg);
+ // 判断当前报文是否是上报数据报文
+ if (receiveStr != null) {
+ // 判断属于哪一个DTU网关上报的数据
+ CacheUtil instance = CacheUtil.getInstance();
+ List gwList = instance.getGatewayInfo();
+ if (gwList != null && !gwList.isEmpty()) {
+ for (GatewayManageEntity gw : gwList) {
+ if (receiveStr.startsWith(ExchangeStringUtil.str2HexStr(gw.getSn()))) {
+ // 直接设置对应值
+ NettyTools.setReceiveMsg(gw.getHeartBeat(), receiveStr.substring(gw.getSn().length()));
+ } else {
+ // 判断是否登录,没有登录立马断开
+ String deviceCode = gw.getHeartBeat();
+ if (!SessionMap.inst().hasLogin(deviceCode+ctx.channel().remoteAddress())) {
+ ServerSession.closeSession(ctx);
+ return;
+ }
+ }
+ }
+ }
+ }
+ super.channelRead(ctx, msg);
+ }
+
+}
diff --git a/user-service/src/main/java/com/mh/user/netty/handle/ExceptionServerHandler.java b/user-service/src/main/java/com/mh/user/netty/handle/ExceptionServerHandler.java
new file mode 100644
index 0000000..c794f71
--- /dev/null
+++ b/user-service/src/main/java/com/mh/user/netty/handle/ExceptionServerHandler.java
@@ -0,0 +1,50 @@
+package com.mh.user.netty.handle;
+
+import com.mh.user.netty.session.ServerSession;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * @author LJF
+ * @version 1.0
+ * @project TAD_Server
+ * @description 异常处理
+ * @date 2023/7/5 09:02:03
+ */
+@Slf4j
+public class ExceptionServerHandler extends ChannelInboundHandlerAdapter {
+ //private final ApplicationContext applicationContext = ApplicationContextProvider.getApplicationContext();
+ //private final MeterInfoService meterInfoService = applicationContext.getBean(MeterInfoService.class);
+
+ @Override
+ public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
+ ctx.flush();
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ log.info("{}客户端断开连接!", ctx.channel().remoteAddress());
+ //String meterNumIp = ServerSession.closeSession(ctx);
+ //String[] split = meterNumIp.split("/");
+ //String meterNum = split[0];
+ //if (meterNum != null) {
+ // meterInfoService.insertTestDetail(meterNum,"offline");
+ //}
+ ServerSession.closeSession(ctx);
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+ if (cause instanceof UnsupportedOperationException) {
+ ServerSession.closeSession(ctx);
+ } else {
+ log.info("获得已知异常!",cause);
+ ctx.close();
+ }
+ log.info("获得已知异常!{}", cause.getMessage());
+ log.info("获得已知异常!", cause);
+ ServerSession.closeSession(ctx);
+ ctx.close();
+ }
+}
diff --git a/user-service/src/main/java/com/mh/user/netty/handle/FourthChannelInitializer.java b/user-service/src/main/java/com/mh/user/netty/handle/FourthChannelInitializer.java
new file mode 100644
index 0000000..bca94d0
--- /dev/null
+++ b/user-service/src/main/java/com/mh/user/netty/handle/FourthChannelInitializer.java
@@ -0,0 +1,34 @@
+package com.mh.user.netty.handle;
+
+import com.mh.user.netty.decoder.CustomFrameEncoder;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelInitializer;
+
+/**
+ * @author LJF
+ * @version 1.0
+ * @project TAD_Server
+ * @description 初始换一个服务端
+ * @date 2023/7/3 15:58:05
+ */
+public class FourthChannelInitializer extends ChannelInitializer {
+ @Override
+ protected void initChannel(Channel channel) throws Exception {
+ // 电表报文分隔符,一般都是16结尾
+ //ByteBuf delimiter = Unpooled.copiedBuffer(new byte[]{22});
+ //打印分隔符
+ //channel.pipeline().addLast("frameDecoder", new DelimiterBasedFrameDecoder(1024,false, delimiter));
+ //分割报文处理解码器,防止数据粘包
+// channel.pipeline().addLast(new CustomFrameEncoder());
+ // 处理登录
+ channel.pipeline().addLast(new LoginRequestHandler());
+ // 处理心跳
+ channel.pipeline().addLast(new HeartBeatServerHandler());
+ // 处理数据上报数据
+ channel.pipeline().addLast(new DataUploadServerHandler());
+ // 异常处理
+ channel.pipeline().addLast(new ExceptionServerHandler());
+ }
+
+
+}
diff --git a/user-service/src/main/java/com/mh/user/netty/handle/HeartBeatServerHandler.java b/user-service/src/main/java/com/mh/user/netty/handle/HeartBeatServerHandler.java
new file mode 100644
index 0000000..7f0a8d5
--- /dev/null
+++ b/user-service/src/main/java/com/mh/user/netty/handle/HeartBeatServerHandler.java
@@ -0,0 +1,102 @@
+package com.mh.user.netty.handle;
+
+import com.mh.user.model.SysLog;
+import com.mh.user.netty.session.ServerSession;
+import com.mh.user.service.GatewayManageService;
+import com.mh.user.service.SysLogService;
+import com.mh.user.utils.ExchangeStringUtil;
+import com.mh.user.utils.SpringContextUtils;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.timeout.IdleStateEvent;
+import io.netty.handler.timeout.IdleStateHandler;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.context.ApplicationContext;
+
+import java.util.Date;
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * @author LJF
+ * @version 1.0
+ * @project TAD_Server
+ * @description 心跳包设置
+ * @date 2023/7/4 13:54:27
+ */
+@Slf4j
+public class HeartBeatServerHandler extends IdleStateHandler {
+
+ private final ApplicationContext applicationContext = SpringContextUtils.getApplicationContext();
+ private final SysLogService sysLogService = applicationContext.getBean(SysLogService.class);
+ private final GatewayManageService gatewayManageService = applicationContext.getBean(GatewayManageService.class);
+
+ //目前网关默认心跳30秒,多长时间没有心跳,就关闭连接
+ private static final int READ_IDLE_GAP = 30;
+
+ /**
+ * @param readerIdleTimeSeconds 最长 没有 read到心跳的时间
+ * @param writerIdleTimeSeconds 心跳包我们只是进行接收心跳,没有所谓的写操作,我们将其设置为0
+ * @param allIdleTimeSeconds 如果我们定义 多长时间没有 读或写操作,我们使用这个参数。
+ */
+ public HeartBeatServerHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {
+ super(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds);
+ }
+
+ public HeartBeatServerHandler(long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) {
+ super(readerIdleTime, writerIdleTime, allIdleTime, unit);
+ }
+
+ public HeartBeatServerHandler(boolean observeOutput, long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) {
+ super(observeOutput, readerIdleTime, writerIdleTime, allIdleTime, unit);
+ }
+
+ public HeartBeatServerHandler() {
+ super(READ_IDLE_GAP, 0, 0, TimeUnit.SECONDS);
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+
+ //将接收到的数据转为字符串,此字符串就是客户端发送的字符串
+ String receiveStr = ExchangeStringUtil.bytesToHexString((byte[]) msg);
+ // 判断当前报文是否是登录报文
+ if (receiveStr != null && receiveStr.startsWith("2400")) {
+ if (receiveStr.length() != 8) {
+ super.channelRead(ctx, msg);
+ return;
+ }
+
+ // 通过对应的心跳包码进行判断,然后更新网关在线情况
+ gatewayManageService.updateGatewayManageOnlineByHeartBeatCode(receiveStr, 0);
+
+// //进行心跳包的处理
+// FutureTaskScheduler.add(() -> {
+// if (ctx.channel().isActive()) {
+// // 开始发送第一条采集数据报文
+// }
+// });
+ }
+ super.channelRead(ctx, msg);
+ }
+
+ @Override
+ protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
+ log.info("{} 已超过 " + READ_IDLE_GAP + " 秒没有接收到心跳包,已断开链接", ctx.channel().remoteAddress());
+ String imeiIP = ServerSession.closeSession(ctx);
+ String[] split = imeiIP.split("/");
+ String imei = split[0];
+ if (imei != null) {
+ // 添加到日志表,下线了
+ SysLog sysLog = new SysLog();
+ sysLog.setOperation(imei + "下线了");
+ sysLog.setCreateBy("开发者");
+ sysLog.setIp(imeiIP);
+ sysLog.setCreateTime(new Date());
+ sysLogService.insertLog(sysLog);
+ // 更新网关在线情况
+ gatewayManageService.updateGatewayManageOnlineByImei(imei, 1);
+ }
+ ctx.close();
+ }
+
+}
diff --git a/user-service/src/main/java/com/mh/user/netty/handle/LoginRequestHandler.java b/user-service/src/main/java/com/mh/user/netty/handle/LoginRequestHandler.java
new file mode 100644
index 0000000..8b72933
--- /dev/null
+++ b/user-service/src/main/java/com/mh/user/netty/handle/LoginRequestHandler.java
@@ -0,0 +1,101 @@
+package com.mh.user.netty.handle;
+
+import com.mh.common.utils.StringUtils;
+import com.mh.user.netty.session.ServerSession;
+import com.mh.user.netty.session.SessionMap;
+import com.mh.user.netty.task.CallbackTask;
+import com.mh.user.netty.task.CallbackTaskScheduler;
+import com.mh.user.utils.ExchangeStringUtil;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * @author LJF
+ * @version 1.0
+ * @project TAD_Server
+ * @description 登录报文处理
+ * @date 2023/7/3 15:34:11
+ */
+@Slf4j
+public class LoginRequestHandler extends ChannelInboundHandlerAdapter {
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ //将接收到的数据转为字符串,此字符串就是客户端发送的字符串
+ String receiveStr = ExchangeStringUtil.bytesToHexString((byte[]) msg);
+ if (StringUtils.isBlank(receiveStr)) {
+ super.channelRead(ctx, msg);
+ return;
+ }
+ // 判断当前报文是否是心跳包上线: 869530073040186
+ log.info("接收到的心跳报文 <== {}", receiveStr);
+ // 获取IMEI号
+ String deviceCode = receiveStr;
+ String meterNum = deviceCode;
+ deviceCode = deviceCode + ctx.channel().remoteAddress();
+ //新的session的创建
+ ServerSession session = new ServerSession(ctx.channel(), deviceCode);
+
+ //进行登录逻辑处理,异步进行处理。并且需要知道 处理的结果。 callbacktask就要
+ //派上用场了
+ String finalDeviceCode = deviceCode;
+ CallbackTaskScheduler.add(new CallbackTask() {
+ @Override
+ public Boolean execute() throws Exception {
+ //进行 login 逻辑的处理
+ return action(session, finalDeviceCode, ctx, meterNum);
+ }
+
+ //没有异常的话,我们进行处理
+ @Override
+ public void onBack(Boolean result) {
+ if (result) {
+ log.info("设备登录成功: 设备号 = " + session.getSessionId());
+ //ctx.pipeline().remove(LoginRequestHandler.class); //压测需要放开
+ } else {
+ log.info("设备刷新登录: 设备号 = " + session.getSessionId());
+ SessionMap.inst().updateSession(finalDeviceCode, session, meterNum);
+ //log.info("设备登录失败: 设备号 = " + session.getSessionId());
+ //ServerSession.closeSession(ctx);
+ // 假如说已经在会话中了,直接断开连接
+ //ctx.close();
+ }
+ }
+
+ //有异常的话,我们进行处理
+ @Override
+ public void onException(Throwable t) {
+ log.info("设备登录异常: 设备号 = " + session.getSessionId());
+ ServerSession.closeSession(ctx);
+ }
+ });
+ }
+
+ private boolean action(ServerSession session, String deviceCode, ChannelHandlerContext ctx, String meterNum) {
+ //user验证
+ boolean isValidUser = checkUser(deviceCode, session);
+ if (!isValidUser) {
+ //我们发送登录成功的报文给 客户端
+ session.bind();
+ return false;
+ }
+ //我们发送登录成功的报文给 客户端
+ session.bind();
+ return true;
+ }
+
+ private boolean checkUser(String deviceCode, ServerSession session) {
+ //当前用户已经登录
+ if (SessionMap.inst().hasLogin(deviceCode)) {
+ log.info("设备已经登录: 设备号 = " + deviceCode);
+ return false;
+ }
+ //一般情况下,我们会将 user存储到 DB中,然后对user的用户名和密码进行校验
+ //但是,我们这边没有进行db的集成,所以我们想一个别的办法进行user的校验。在我们的sessionMap进行以下校验
+ //为什么选sessionmap,因为我们user的会话,都是存储到sessionmap中的,sessionmap中只要有这个user的会话,说明就是ok的
+ return true;
+ }
+
+
+}
diff --git a/user-service/src/main/java/com/mh/user/netty/session/ServerSession.java b/user-service/src/main/java/com/mh/user/netty/session/ServerSession.java
new file mode 100644
index 0000000..285be2b
--- /dev/null
+++ b/user-service/src/main/java/com/mh/user/netty/session/ServerSession.java
@@ -0,0 +1,66 @@
+package com.mh.user.netty.session;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.util.AttributeKey;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+
+@Data
+@Slf4j
+public class ServerSession {
+ public static final AttributeKey SESSION_KEY =
+ AttributeKey.valueOf("SESSION_KEY");
+ //通道
+ private Channel channel;
+ private final String sessionId;
+ private boolean isLogin = false;
+
+ public ServerSession(Channel channel, String deviceCode){
+ this.channel = channel;
+ this.sessionId = deviceCode;
+ }
+
+ //session需要和通道进行一定的关联,他是在构造函数中关联上的;
+ //session还需要通过sessionkey和channel进行再次的关联;channel.attr方法.set当前的
+ // serverSession
+ //session需要被添加到我们的SessionMap中
+ public void bind(){
+ log.info("server Session 会话进行绑定 :" + channel.remoteAddress());
+ channel.attr(SESSION_KEY).set(this);
+ SessionMap.inst().addSession(sessionId, this);
+ this.isLogin = true;
+ }
+
+ //通过channel获取session
+ public static ServerSession getSession(ChannelHandlerContext ctx){
+ Channel channel = ctx.channel();
+ return channel.attr(SESSION_KEY).get();
+ }
+
+ //关闭session,新增返回一个meterNum用于纪录设备下线时间2024-05-08
+ public static String closeSession(ChannelHandlerContext ctx){
+ String meterNum = null;
+ ServerSession serverSession = ctx.channel().attr(SESSION_KEY).get();
+ if(serverSession != null && serverSession.getSessionId() != null) {
+ ChannelFuture future = serverSession.channel.close();
+ future.addListener((ChannelFutureListener) future1 -> {
+ if(!future1.isSuccess()) {
+ log.info("Channel close error!");
+ }
+ });
+ ctx.close();
+ meterNum = serverSession.sessionId;
+ SessionMap.inst().removeSession(serverSession.sessionId);
+ log.info(ctx.channel().remoteAddress()+" "+serverSession.sessionId + "==>移除会话");
+ }
+ return meterNum;
+ }
+
+ //写消息
+ public void writeAndFlush(Object msg) {
+ channel.writeAndFlush(msg);
+ }
+}
diff --git a/user-service/src/main/java/com/mh/user/netty/session/SessionMap.java b/user-service/src/main/java/com/mh/user/netty/session/SessionMap.java
new file mode 100644
index 0000000..4ad8543
--- /dev/null
+++ b/user-service/src/main/java/com/mh/user/netty/session/SessionMap.java
@@ -0,0 +1,96 @@
+package com.mh.user.netty.session;
+
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+@Data
+@Slf4j
+public class SessionMap {
+
+ private ThreadLocal sceneThreadLocal = new ThreadLocal<>();
+
+ //用单例模式进行sessionMap的创建
+ private SessionMap(){}
+
+ private static SessionMap singleInstance = new SessionMap();
+
+ public static SessionMap inst() {
+ return singleInstance;
+ }
+
+ //进行会话的保存
+ //key 我们使用 sessionId;value 需要是 serverSession
+ private ConcurrentHashMap map = new ConcurrentHashMap<>(256);
+ //添加session
+ public void addSession(String sessionId, ServerSession s) {
+ map.put(sessionId, s);
+ log.info("IP地址:"+s.getChannel().remoteAddress()+" "+ sessionId + " 表具上线,总共表具:" + map.size());
+ }
+
+ //删除session
+ public void removeSession(String sessionId) {
+ if(map.containsKey(sessionId)) {
+ ServerSession s = map.get(sessionId);
+ map.remove(sessionId);
+ log.info("设备id下线:{},在线设备:{}", s.getSessionId(), map.size() );
+ }
+ return;
+ }
+
+ public boolean hasLogin(String sessionId) {
+ Iterator> iterator = map.entrySet().iterator();
+ while(iterator.hasNext()) {
+ Map.Entry next = iterator.next();
+ if(sessionId != null && sessionId.equalsIgnoreCase(next.getValue().getSessionId())) {
+ return true ;
+ }
+ }
+ return false;
+ }
+
+ //如果在线,肯定有sessionMap里保存的 serverSession
+ //如果不在线,serverSession也没有。用这个来判断是否在线
+ public List getSessionBy(String sessionId) {
+ return map.values().stream().
+ filter(s -> s.getSessionId().equals(sessionId)).
+ collect(Collectors.toList());
+ }
+
+ public boolean getScene() {
+ return sceneThreadLocal.get();
+ }
+
+ public void initScene(Boolean status) {
+ if (sceneThreadLocal == null) {
+ log.info("======创建ThreadLocal======");
+ sceneThreadLocal = new ThreadLocal<>();
+ }
+ log.info("设置状态==>" + status);
+ sceneThreadLocal.set(status);
+ }
+
+ public void clearScene() {
+ initScene(null);
+ sceneThreadLocal.remove();
+ }
+
+ public void updateSession(String sessionId, ServerSession session, String meterNum) {
+ Iterator> iterator = map.entrySet().iterator();
+ while(iterator.hasNext()) {
+ Map.Entry next = iterator.next();
+ if (next.getKey().contains(meterNum)){
+ iterator.remove();
+ }
+ if(sessionId != null && sessionId.equalsIgnoreCase(next.getValue().getSessionId())) {
+ next.setValue(session);
+ }
+ }
+ }
+
+}
diff --git a/user-service/src/main/java/com/mh/user/netty/task/CallbackTask.java b/user-service/src/main/java/com/mh/user/netty/task/CallbackTask.java
new file mode 100644
index 0000000..1512fb1
--- /dev/null
+++ b/user-service/src/main/java/com/mh/user/netty/task/CallbackTask.java
@@ -0,0 +1,20 @@
+package com.mh.user.netty.task;
+
+/**
+ * @author LJF
+ * @version 1.0
+ * @project TAD_Server
+ * @description 回调任务
+ * @date 2023/7/3 15:34:11
+ */
+public interface CallbackTask {
+ T execute() throws Exception;
+
+ /**
+ * // 执行没有 异常的情况下的 返回值
+ * @param t
+ */
+ void onBack(T t);
+
+ void onException(Throwable t);
+}
diff --git a/user-service/src/main/java/com/mh/user/netty/task/CallbackTaskScheduler.java b/user-service/src/main/java/com/mh/user/netty/task/CallbackTaskScheduler.java
new file mode 100644
index 0000000..c242f04
--- /dev/null
+++ b/user-service/src/main/java/com/mh/user/netty/task/CallbackTaskScheduler.java
@@ -0,0 +1,80 @@
+package com.mh.user.netty.task;
+
+import com.google.common.util.concurrent.*;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * @author LJF
+ * @version 1.0
+ * @project TAD_Server
+ * @description 回调任务
+ * @date 2023/7/3 15:34:11
+ */
+public class CallbackTaskScheduler extends Thread {
+ private ConcurrentLinkedQueue executeTaskQueue =
+ new ConcurrentLinkedQueue<>();
+ private long sleepTime = 1000 * 10;
+ private final ExecutorService pool = Executors.newCachedThreadPool();
+ ListeningExecutorService lpool = MoreExecutors.listeningDecorator(pool);
+ private static CallbackTaskScheduler inst = new CallbackTaskScheduler();
+
+ private CallbackTaskScheduler() {
+ this.start();
+ }
+
+ //add task
+ public static void add(CallbackTask executeTask) {
+ inst.executeTaskQueue.add(executeTask);
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+ handleTask();
+ //为了避免频繁连接服务器,但是当前连接服务器过长导致失败
+ //threadSleep(sleepTime);
+ }
+ }
+
+ private void threadSleep(long sleepTime) {
+ try {
+ Thread.sleep(sleepTime);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ //任务执行
+ private void handleTask() {
+ CallbackTask executeTask = null;
+ while (executeTaskQueue.peek() != null) {
+ executeTask = executeTaskQueue.poll();
+ handleTask(executeTask);
+ }
+ }
+
+ private void handleTask(CallbackTask executeTask) {
+ ListenableFuture future = lpool.submit(new Callable() {
+ public T call() throws Exception {
+ return executeTask.execute();
+ }
+ });
+ Futures.addCallback(future, new FutureCallback() {
+ @Override
+ public void onSuccess(T t) {
+ executeTask.onBack(t);
+ }
+
+ @Override
+ public void onFailure(Throwable throwable) {
+ executeTask.onException(throwable);
+ }
+ }, lpool);
+ }
+
+}
+
diff --git a/user-service/src/main/java/com/mh/user/netty/task/ExecuteTask.java b/user-service/src/main/java/com/mh/user/netty/task/ExecuteTask.java
new file mode 100644
index 0000000..7cc5ceb
--- /dev/null
+++ b/user-service/src/main/java/com/mh/user/netty/task/ExecuteTask.java
@@ -0,0 +1,6 @@
+package com.mh.user.netty.task;
+
+//不需要知道异步线程的 返回值
+public interface ExecuteTask {
+ void execute();
+}
diff --git a/user-service/src/main/java/com/mh/user/netty/task/FutureTaskScheduler.java b/user-service/src/main/java/com/mh/user/netty/task/FutureTaskScheduler.java
new file mode 100644
index 0000000..3819b2c
--- /dev/null
+++ b/user-service/src/main/java/com/mh/user/netty/task/FutureTaskScheduler.java
@@ -0,0 +1,67 @@
+package com.mh.user.netty.task;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * @author LJF
+ * @version 1.0
+ * @project TAD_Server
+ * @description 任务定时
+ * @date 2023/7/3 15:34:11
+ */
+public class FutureTaskScheduler extends Thread{
+ private ConcurrentLinkedQueue executeTaskQueue =
+ new ConcurrentLinkedQueue<>();
+ private long sleepTime = 200;
+ private ExecutorService pool = Executors.newFixedThreadPool(10);
+ private static FutureTaskScheduler inst = new FutureTaskScheduler();
+ public FutureTaskScheduler() {
+ this.start();
+ }
+ //任务添加
+ public static void add(ExecuteTask executeTask) {
+ inst.executeTaskQueue.add(executeTask);
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+ handleTask();
+ //threadSleep(sleepTime);
+ }
+ }
+
+ private void threadSleep(long sleepTime) {
+ try {
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ //执行任务
+ private void handleTask() {
+ ExecuteTask executeTask;
+ while (executeTaskQueue.peek() != null) {
+ executeTask = executeTaskQueue.poll();
+ handleTask(executeTask);
+ }
+ //刷新心跳时间
+ }
+ private void handleTask(ExecuteTask executeTask) {
+ pool.execute(new ExecuteRunnable(executeTask));
+ }
+
+ class ExecuteRunnable implements Runnable {
+ ExecuteTask executeTask;
+ public ExecuteRunnable(ExecuteTask executeTask) {
+ this.executeTask = executeTask;
+ }
+ @Override
+ public void run() {
+ executeTask.execute();
+ }
+ }
+}
diff --git a/user-service/src/main/java/com/mh/user/serialport/SendAndReceiveByCom.java b/user-service/src/main/java/com/mh/user/serialport/SendAndReceiveByCom.java
index d5a9b6d..128dd5c 100644
--- a/user-service/src/main/java/com/mh/user/serialport/SendAndReceiveByCom.java
+++ b/user-service/src/main/java/com/mh/user/serialport/SendAndReceiveByCom.java
@@ -11,6 +11,7 @@ import com.mh.user.service.NowDataService;
import com.mh.user.service.SysParamService;
import com.mh.user.strategy.DeviceStrategy;
import com.mh.user.strategy.DeviceStrategyFactory;
+import com.mh.user.tcp.SendAndReceiveByTcp;
import com.mh.user.utils.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationContext;
@@ -119,20 +120,8 @@ public class SendAndReceiveByCom {
String dateStr = DateUtil.dateToString(date1, "yyyy-MM-dd HH:mm:ss");
if (bytes == null) {
SerialTool.closePort(serialPort);
- log.info("串口" + serialPort.getName() + "没有数据返回!" + i);
- log.info("----------------" + deviceType + "离线,设备号:" + deviceAddr + ",所属楼栋:" + buildingName + "----------------");
- String time1 = deviceInstallService.selectLastDate(deviceType, deviceAddr, buildingId);
- if (time1 == null) {
- time1 = dateStr;
- }
- int d = ExchangeStringUtil.compareCopyTime(time1, dateStr);
- if (d == 1) {
- deviceInstallService.updateNotOnline(deviceAddr, deviceType, buildingId, "离线"); //所有设备离线
- if (deviceType.equals("热泵")) {
- nowDataService.updateRunState(buildingId, deviceAddr, "离线", buildingName); //监控界面状态表热泵在线状态
- }
- }
- continue;
+ log.info("串口{}没有数据返回!{}", serialPort.getName(), i);
+ SendAndReceiveByTcp.printLog(deviceAddr, deviceType, buildingId, buildingName, dateStr, log, deviceInstallService, nowDataService);
}
// 处理返回来的数据报文
dealReceiveData(dateStr, serialPort, i, deviceAddr, deviceType, registerAddr, brand, buildingId, buildingName, bytes, device);
@@ -145,7 +134,7 @@ public class SendAndReceiveByCom {
} finally {
if (null != serialPort) {
SerialTool.closePort(serialPort);
- log.info("关闭串口==" + serialPort.getName());
+ log.info("关闭串口=={}", serialPort.getName());
}
}
}
diff --git a/user-service/src/main/java/com/mh/user/serialport/SerialPortSendReceive2.java b/user-service/src/main/java/com/mh/user/serialport/SerialPortSendReceive2.java
index 02b6f7d..3e28d28 100644
--- a/user-service/src/main/java/com/mh/user/serialport/SerialPortSendReceive2.java
+++ b/user-service/src/main/java/com/mh/user/serialport/SerialPortSendReceive2.java
@@ -5,6 +5,7 @@ import com.mh.user.constants.Constant;
import com.mh.user.entity.DeviceCodeParamEntity;
import com.mh.user.entity.SysParamEntity;
import com.mh.user.service.*;
+import com.mh.user.tcp.SendAndReceiveByTcp;
import com.mh.user.utils.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationContext;
@@ -114,19 +115,7 @@ public class SerialPortSendReceive2 {
if (bytes == null) {
SerialTool.closePort(serialPort);
log.info("串口" + serialPort.getName() + "没有数据返回!" + i);
- log.info("----------------" + deviceType + "离线,设备号:" + deviceAddr + ",所属楼栋:" + buildingName + "----------------");
- String time1 = deviceInstallService.selectLastDate(deviceType, deviceAddr, buildingId);
- if (time1 == null) {
- time1 = dateStr;
- }
- int d = ExchangeStringUtil.compareCopyTime(time1, dateStr);
- if (d == 1) {
- deviceInstallService.updateNotOnline(deviceAddr, deviceType, buildingId, "离线"); //所有设备离线
- if (deviceType.equals("热泵")) {
- nowDataService.updateRunState(buildingId, deviceAddr, "离线", buildingName); //监控界面状态表热泵在线状态
- }
- }
- continue;
+ SendAndReceiveByTcp.printLog(deviceAddr, deviceType, buildingId, buildingName, dateStr, log, deviceInstallService, nowDataService);
}
// 处理返回来的数据报文
dealReceiveData(dateStr, serialPort, i, deviceAddr, deviceType, registerAddr, brand, buildingId, buildingName, bytes);
diff --git a/user-service/src/main/java/com/mh/user/serialport/SerialPortSingle2.java b/user-service/src/main/java/com/mh/user/serialport/SerialPortSingle2.java
index f851a5f..9ebbb78 100644
--- a/user-service/src/main/java/com/mh/user/serialport/SerialPortSingle2.java
+++ b/user-service/src/main/java/com/mh/user/serialport/SerialPortSingle2.java
@@ -3,6 +3,7 @@ package com.mh.user.serialport;
import com.mh.common.utils.StringUtils;
import com.mh.user.constants.Constant;
import com.mh.user.entity.DeviceCodeParamEntity;
+import com.mh.user.entity.GatewayManageEntity;
import com.mh.user.factory.Device;
import com.mh.user.factory.DeviceFactory;
import com.mh.user.service.BuildingService;
@@ -10,12 +11,15 @@ import com.mh.user.service.DeviceInstallService;
import com.mh.user.service.NowDataService;
import com.mh.user.strategy.DeviceStrategy;
import com.mh.user.strategy.DeviceStrategyFactory;
+import com.mh.user.tcp.SendAndReceiveByTcp;
+import com.mh.user.tcp.TcpSingle;
import com.mh.user.utils.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationContext;
import java.io.IOException;
import java.util.Date;
+import java.util.List;
import purejavacomm.SerialPort;
@@ -36,6 +40,24 @@ public class SerialPortSingle2 {
BuildingService buildingService = context.getBean(BuildingService.class);
public String serialPortSend(DeviceCodeParamEntity deviceCodeParamEntity) {
+ // 增加一个判断,判断是否tcp通信
+ // 增加判断是什么通讯类型:realCom、tcp
+ CacheUtil instance = CacheUtil.getInstance();
+ List gwList = instance.getGatewayInfo();
+ if (gwList != null && !gwList.isEmpty()) {
+ for (GatewayManageEntity gw : gwList) {
+ if (gw.getDataCom().toUpperCase().equals(deviceCodeParamEntity.getDataCom().toUpperCase())) {
+ String communityType = gw.getCommunityType();
+ if (Constant.COMMUNITY_TYPE_TCP.equals(communityType)) {
+ TcpSingle tcpSingle = new TcpSingle();
+ return tcpSingle.serialPortSend(deviceCodeParamEntity, gw);
+ } else if (Constant.COMMUNITY_TYPE_REAL_COM.equals(communityType)) {
+ // 直接跳出判断,进行下一步
+ break;
+ }
+ }
+ }
+ }
SerialPort serialPort = null;
String rtData = "fail";
String comName = deviceCodeParamEntity.getDataCom().toUpperCase();
diff --git a/user-service/src/main/java/com/mh/user/serialport/SerialPortThread.java b/user-service/src/main/java/com/mh/user/serialport/SerialPortThread.java
index d6ba39f..31b83f1 100644
--- a/user-service/src/main/java/com/mh/user/serialport/SerialPortThread.java
+++ b/user-service/src/main/java/com/mh/user/serialport/SerialPortThread.java
@@ -1,8 +1,13 @@
package com.mh.user.serialport;
+import com.mh.user.constants.Constant;
+import com.mh.user.entity.GatewayManageEntity;
+import com.mh.user.tcp.SendAndReceiveByTcp;
+import com.mh.user.utils.CacheUtil;
import gnu.io.SerialPort;
import lombok.extern.slf4j.Slf4j;
+import java.util.List;
import java.util.concurrent.CountDownLatch;
/**
@@ -35,13 +40,36 @@ public class SerialPortThread implements Runnable{
public void run(){
log.info("创建发送接收数据线程>>>>>>>>>>>>>>" + thread);
- SendAndReceiveByCom sendAndReceiveByCom = new SendAndReceiveByCom();
- try {
- sendAndReceiveByCom.sendAndReceive(name, thread);
- } catch (Exception e) {
- log.error("串口通信发生异常: ", e);
- } finally {
- this.countDownLatch.countDown();
+ // 增加判断是什么通讯类型:realCom、tcp
+ CacheUtil instance = CacheUtil.getInstance();
+ List gwList = instance.getGatewayInfo();
+ if (gwList != null && !gwList.isEmpty()) {
+ for (GatewayManageEntity gw : gwList) {
+ if (gw.getDataCom().toUpperCase().equals("COM"+thread)) {
+ String communityType = gw.getCommunityType();
+ if (Constant.COMMUNITY_TYPE_REAL_COM.equals(communityType)) {
+ SendAndReceiveByCom sendAndReceiveByCom = new SendAndReceiveByCom();
+ try {
+ sendAndReceiveByCom.sendAndReceive(name, thread);
+ } catch (Exception e) {
+ log.error("串口通信发生异常: ", e);
+ } finally {
+ this.countDownLatch.countDown();
+ }
+ } else if (Constant.COMMUNITY_TYPE_TCP.equals(communityType)) {
+ // 开始TCP通信
+ SendAndReceiveByTcp sendAndReceiveByTcp = new SendAndReceiveByTcp();
+ try {
+ sendAndReceiveByTcp.sendAndReceive(name, thread, gw.getHeartBeat());
+ } catch (Exception e) {
+ log.error("TCP通信发生异常: ", e);
+ } finally {
+ this.countDownLatch.countDown();
+ }
+ }
+ }
+ }
}
+
}
}
diff --git a/user-service/src/main/java/com/mh/user/service/GatewayManageService.java b/user-service/src/main/java/com/mh/user/service/GatewayManageService.java
index 77b892a..12bbf3a 100644
--- a/user-service/src/main/java/com/mh/user/service/GatewayManageService.java
+++ b/user-service/src/main/java/com/mh/user/service/GatewayManageService.java
@@ -50,4 +50,7 @@ public interface GatewayManageService {
// 根据grade查询对应的网关路由信息
GatewayManageEntity queryGatewayByGrade(Long grade);
+ void updateGatewayManageOnlineByHeartBeatCode(String heartBeat, int status);
+
+ void updateGatewayManageOnlineByImei(String imei, int status);
}
diff --git a/user-service/src/main/java/com/mh/user/service/SysLogService.java b/user-service/src/main/java/com/mh/user/service/SysLogService.java
index 4ebbfad..627bfae 100644
--- a/user-service/src/main/java/com/mh/user/service/SysLogService.java
+++ b/user-service/src/main/java/com/mh/user/service/SysLogService.java
@@ -22,4 +22,6 @@ public interface SysLogService extends CurdService {
SysLog logInfo(int id);
SysParamEntity selectSysParam();
+
+ void insertLog(SysLog sysLog);
}
diff --git a/user-service/src/main/java/com/mh/user/service/impl/DeviceDisplayServiceImpl.java b/user-service/src/main/java/com/mh/user/service/impl/DeviceDisplayServiceImpl.java
index baef948..1c8b5c7 100644
--- a/user-service/src/main/java/com/mh/user/service/impl/DeviceDisplayServiceImpl.java
+++ b/user-service/src/main/java/com/mh/user/service/impl/DeviceDisplayServiceImpl.java
@@ -36,50 +36,50 @@ public class DeviceDisplayServiceImpl implements DeviceDisplayService {
return deviceDisplayMapper.queryDeviceStatus(deviceType);
}
- /**
- * 网关管理服务类
- * author:ljf
- * create—date:2020-05-21
- */
- public static interface GatewayManageService {
-
- /**
- * 根据条件查询网关信息
- * @param grade
- * @param operator
- * @return
- */
- List queryByOther(Integer grade, Integer operator);
-
- /**
- * 新增或更新网关信息
- * @param gatewayManageEntity
- */
- void addOrUpdateGateWayInfo(GatewayManageEntity gatewayManageEntity);
-
- // 查询全部
- List queryAll(String gatewayID, String operator, String grade, int page, int limit);
-
- // 查询记录数
- int queryCount(String gatewayID, String operator, String grade);
-
- // 添加网关设备
- String insertGatewayManage(GatewayManageEntity gatewayManageEntity);
-
- // 根据网关ID删除网关设备
- void deleteGatewayManageByID(int gatewayID);
-
- // 根据网关ID查询设备信息
- GatewayManageEntity queryGatewayByID(Long gatewayID);
-
- // 设备总数
- int queryByOtherCount(int page, int size, int gatewayID);
-
- // 更新网关管理器的最新连接数据
- void updateGatewayManage(String IP, String port);
-
- // 根据grade查询对应的网关路由信息
- GatewayManageEntity queryGatewayByGrade(Long grade);
-
- }
+// /**
+// * 网关管理服务类
+// * author:ljf
+// * create—date:2020-05-21
+// */
+// public static interface GatewayManageService {
+//
+// /**
+// * 根据条件查询网关信息
+// * @param grade
+// * @param operator
+// * @return
+// */
+// List queryByOther(Integer grade, Integer operator);
+//
+// /**
+// * 新增或更新网关信息
+// * @param gatewayManageEntity
+// */
+// void addOrUpdateGateWayInfo(GatewayManageEntity gatewayManageEntity);
+//
+// // 查询全部
+// List queryAll(String gatewayID, String operator, String grade, int page, int limit);
+//
+// // 查询记录数
+// int queryCount(String gatewayID, String operator, String grade);
+//
+// // 添加网关设备
+// String insertGatewayManage(GatewayManageEntity gatewayManageEntity);
+//
+// // 根据网关ID删除网关设备
+// void deleteGatewayManageByID(int gatewayID);
+//
+// // 根据网关ID查询设备信息
+// GatewayManageEntity queryGatewayByID(Long gatewayID);
+//
+// // 设备总数
+// int queryByOtherCount(int page, int size, int gatewayID);
+//
+// // 更新网关管理器的最新连接数据
+// void updateGatewayManage(String IP, String port);
+//
+// // 根据grade查询对应的网关路由信息
+// GatewayManageEntity queryGatewayByGrade(Long grade);
+//
+// }
}
diff --git a/user-service/src/main/java/com/mh/user/service/impl/GatewayManageServiceImpl.java b/user-service/src/main/java/com/mh/user/service/impl/GatewayManageServiceImpl.java
index 0973c14..e8ef4ea 100644
--- a/user-service/src/main/java/com/mh/user/service/impl/GatewayManageServiceImpl.java
+++ b/user-service/src/main/java/com/mh/user/service/impl/GatewayManageServiceImpl.java
@@ -2,6 +2,7 @@ package com.mh.user.service.impl;
import com.mh.user.entity.GatewayManageEntity;
import com.mh.user.mapper.GatewayManageMapper;
+import com.mh.user.service.GatewayManageService;
import org.springframework.stereotype.Service;
import java.util.List;
@@ -14,7 +15,7 @@ import java.util.List;
* @throws :
*/
@Service
-public class GatewayManageServiceImpl implements DeviceDisplayServiceImpl.GatewayManageService {
+public class GatewayManageServiceImpl implements GatewayManageService {
// 通过构造函数注入,引用Mapper
private final GatewayManageMapper gatewayManageMapper;
@@ -97,4 +98,14 @@ public class GatewayManageServiceImpl implements DeviceDisplayServiceImpl.Gatewa
public GatewayManageEntity queryGatewayByGrade(Long grade) {
return gatewayManageMapper.queryGatewayByGrade(grade);
}
+
+ @Override
+ public void updateGatewayManageOnlineByHeartBeatCode(String heartBeat, int status) {
+ gatewayManageMapper.updateGatewayManageOnlineByHeartBeatCode(heartBeat, status);
+ }
+
+ @Override
+ public void updateGatewayManageOnlineByImei(String imei, int status) {
+ gatewayManageMapper.updateGatewayManageOnlineByImei(imei, status);
+ }
}
diff --git a/user-service/src/main/java/com/mh/user/service/impl/SysLogServiceImpl.java b/user-service/src/main/java/com/mh/user/service/impl/SysLogServiceImpl.java
index c17a92a..7ca4c57 100644
--- a/user-service/src/main/java/com/mh/user/service/impl/SysLogServiceImpl.java
+++ b/user-service/src/main/java/com/mh/user/service/impl/SysLogServiceImpl.java
@@ -73,4 +73,9 @@ public class SysLogServiceImpl implements SysLogService {
public SysParamEntity selectSysParam() {
return sysLogMapper.selectSysParam();
}
+
+ @Override
+ public void insertLog(SysLog sysLog) {
+ sysLogMapper.insert(sysLog);
+ }
}
diff --git a/user-service/src/main/java/com/mh/user/tcp/SendAndReceiveByTcp.java b/user-service/src/main/java/com/mh/user/tcp/SendAndReceiveByTcp.java
new file mode 100644
index 0000000..d91474f
--- /dev/null
+++ b/user-service/src/main/java/com/mh/user/tcp/SendAndReceiveByTcp.java
@@ -0,0 +1,204 @@
+package com.mh.user.tcp;
+
+import com.mh.common.utils.StringUtils;
+import com.mh.user.constants.Constant;
+import com.mh.user.entity.DeviceCodeParamEntity;
+import com.mh.user.factory.Device;
+import com.mh.user.factory.DeviceFactory;
+import com.mh.user.netty.session.ServerSession;
+import com.mh.user.netty.session.SessionMap;
+import com.mh.user.serialport.SerialTool;
+import com.mh.user.service.BuildingService;
+import com.mh.user.service.DeviceInstallService;
+import com.mh.user.service.NowDataService;
+import com.mh.user.strategy.DeviceStrategy;
+import com.mh.user.strategy.DeviceStrategyFactory;
+import com.mh.user.utils.*;
+import lombok.extern.slf4j.Slf4j;
+import org.slf4j.Logger;
+import org.springframework.context.ApplicationContext;
+import purejavacomm.SerialPort;
+
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+/**
+ * @author LJF
+ * @version 1.0
+ * @project CHWS
+ * @description 通过tcp发送和接收数据
+ * @date 2024-03-18 14:56:32
+ */
+@Slf4j
+public class SendAndReceiveByTcp {
+
+ List deviceManageEntityList;
+
+ ApplicationContext context = SpringBeanUtil.getApplicationContext();
+ DeviceInstallService deviceInstallService = context.getBean(DeviceInstallService.class);
+ NowDataService nowDataService = context.getBean(NowDataService.class);
+ BuildingService buildingService = context.getBean(BuildingService.class);
+
+ public void sendAndReceive(String sort, String thread, String heartBeat) {
+ CacheUtil cacheUtil = CacheUtil.getInstance();
+ try {
+ //生成对应的采集指令
+ List deviceParamsByType = cacheUtil.getDeviceParamsByType(sort);
+ deviceManageEntityList = deviceParamsByType
+ .parallelStream()
+ .filter(value -> value.getThread().equals(thread))
+ .sorted(Comparator.comparing(DeviceCodeParamEntity::getDataCom))
+ .collect(Collectors.toList());
+ int size = deviceManageEntityList.size();
+ for (int i = 0; i < size; i++) {
+ //判断网页端是否有操作设备的
+ if (Constant.WEB_FLAG) {
+ log.info("有指令下发退出定时采集");
+ break;
+ }
+ String deviceAddr = deviceManageEntityList.get(i).getDeviceAddr();
+ String deviceType = deviceManageEntityList.get(i).getDeviceType();
+ String registerAddr = deviceManageEntityList.get(i).getRegisterAddr();
+ String brand = deviceManageEntityList.get(i).getBrand();//品牌
+ String buildingId = deviceManageEntityList.get(i).getBuildingId();
+ String buildingName = buildingService.queryBuildingName(buildingId); //查询楼栋名称
+ // 创建设备报文
+ Device device = DeviceFactory.createDevice(deviceType);
+ DeviceStrategy strategy = DeviceStrategyFactory.createStrategy(deviceType);
+ if (null == strategy) {
+ continue;
+ }
+ device.setStrategy(strategy);
+ String sendStr = device.createOrders(deviceManageEntityList.get(i));
+
+ try {
+ //向串口发送指令
+ if (StringUtils.isBlank(sendStr)) {
+ continue;
+ }
+ if (deviceType.equals("热泵")) {
+ for (int j = 0; j < 4; j++) {
+ Thread.sleep(1000);
+ // 判断网页端是否有操作设备的
+ if (Constant.WEB_FLAG) {
+ log.info("有指令下发退出定时采集");
+ break;
+ }
+ }
+ } else {
+ Thread.sleep(2000);
+ }
+ //从获取响应数据
+ ConcurrentHashMap map = SessionMap.inst().getMap();
+ Set> entries = map.entrySet();
+ boolean flag = false;
+ String keyVal = null;
+ for (Map.Entry entry : entries) {
+ String key = entry.getKey();
+ //log.info("当前session:{},当前sessionId:{},当前在线设备数:{}",entry,entry.getValue().getSessionId(),map.size());
+ if (key.contains(heartBeat)) {
+ flag = true;
+ keyVal = key;
+ break;
+ }
+ }
+ if (flag) {
+ ServerSession serverSession = map.get(keyVal);
+ serverSession.getChannel().writeAndFlush(NettyTools.createByteBuf(sendStr));
+ NettyTools.initReceiveMsg(heartBeat);
+ String receiveMsg = NettyTools.waitReceiveMsg(heartBeat);
+ Date date1 = new Date();
+ String dateStr = DateUtil.dateToString(date1, "yyyy-MM-dd HH:mm:ss");
+ if (StringUtils.isBlank(receiveMsg)) {
+ log.info("TCP客户端:{},没有数据返回!{}", keyVal, i);
+ printLog(deviceAddr, deviceType, buildingId, buildingName, dateStr, log, deviceInstallService, nowDataService);
+ continue;
+ }
+ // 处理返回来的数据报文
+ dealReceiveData(dateStr, keyVal, i, deviceAddr, deviceType, registerAddr, brand, buildingId, buildingName, receiveMsg, device);
+ } else {
+ log.error("没有找到心跳包数据:{}", heartBeat);
+ }
+ } catch (Exception e) {
+ log.error("发送窗口数据异常==>", e);
+ }
+ }
+ } catch (Exception e) {
+ log.error("-------------串口采集异常!----------->>", e);
+ }
+ }
+
+ public static void printLog(String deviceAddr, String deviceType, String buildingId, String buildingName, String dateStr, Logger log, DeviceInstallService deviceInstallService, NowDataService nowDataService) {
+ log.info("----------------{}离线,设备号:{},所属楼栋:{}----------------", deviceType, deviceAddr, buildingName);
+ String time1 = deviceInstallService.selectLastDate(deviceType, deviceAddr, buildingId);
+ if (time1 == null) {
+ time1 = dateStr;
+ }
+ int d = ExchangeStringUtil.compareCopyTime(time1, dateStr);
+ if (d == 1) {
+ deviceInstallService.updateNotOnline(deviceAddr, deviceType, buildingId, "离线"); //所有设备离线
+ if (deviceType.equals("热泵")) {
+ nowDataService.updateRunState(buildingId, deviceAddr, "离线", buildingName); //监控界面状态表热泵在线状态
+ }
+ }
+ return;
+ }
+
+ /**
+ * 处理返回来的数据
+ *
+ * @param dateStr
+ * @param serialPort
+ * @param i
+ * @param deviceAddr
+ * @param deviceType
+ * @param registerAddr
+ * @param brand
+ * @param buildingId
+ * @param buildingName
+ * @param receiveStr
+ * @throws InterruptedException
+ */
+ private void dealReceiveData(String dateStr,
+ String serialPort,
+ int i,
+ String deviceAddr,
+ String deviceType,
+ String registerAddr,
+ String brand,
+ String buildingId,
+ String buildingName, String receiveStr, Device device) {
+ try {
+ //去掉空格和null
+ receiveStr = receiveStr.replace("null", "");
+ receiveStr = receiveStr.replace(" ", "");
+ log.info("TCP客户端:{},接受第{}数据:{},大小: {}", serialPort, i, receiveStr, receiveStr.length());
+ //返回值全部变成大写
+ String receiveData = receiveStr.toUpperCase();
+ //截取去掉FE
+ String dataStr;
+ if (receiveData.length() > 8) {
+ String str1 = receiveData.substring(0, 8);
+ String str2 = receiveData.substring(8);
+ dataStr = str1.replace("FE", "") + str2;
+ } else {
+ dataStr = receiveData.replace("FE", "");
+ }
+ deviceInstallService.updateOnline(deviceAddr, deviceType, buildingId, "在线"); //设备在线
+ log.info("----------------{}在线,设备号:{},所属楼栋:{}----------------", deviceType, deviceAddr, buildingName);
+ if (deviceType.equals("热泵")) {
+ String strState = nowDataService.selectState(buildingId, deviceAddr);
+ if (strState != null && strState.equals("离线")) { //采集到数据
+ nowDataService.updateRunState(buildingId, deviceAddr, "不运行", buildingName); //监控界面状态表热泵在线状态
+ }
+ }
+ // 解析返回来的数据
+ device.analysisReceiveData(dateStr, deviceType, registerAddr, brand, buildingId, buildingName, dataStr);
+
+ } catch (Exception e) {
+ log.error("楼栋:{}设备类型:{}保存数据库失败!{}", buildingName, deviceType, i, e);
+ }
+ }
+
+}
diff --git a/user-service/src/main/java/com/mh/user/tcp/TcpSingle.java b/user-service/src/main/java/com/mh/user/tcp/TcpSingle.java
new file mode 100644
index 0000000..47daf0b
--- /dev/null
+++ b/user-service/src/main/java/com/mh/user/tcp/TcpSingle.java
@@ -0,0 +1,126 @@
+package com.mh.user.tcp;
+
+import com.mh.common.utils.StringUtils;
+import com.mh.user.constants.Constant;
+import com.mh.user.entity.DeviceCodeParamEntity;
+import com.mh.user.entity.GatewayManageEntity;
+import com.mh.user.factory.Device;
+import com.mh.user.factory.DeviceFactory;
+import com.mh.user.netty.session.ServerSession;
+import com.mh.user.netty.session.SessionMap;
+import com.mh.user.serialport.SerialTool;
+import com.mh.user.service.BuildingService;
+import com.mh.user.service.DeviceInstallService;
+import com.mh.user.service.NowDataService;
+import com.mh.user.strategy.DeviceStrategy;
+import com.mh.user.strategy.DeviceStrategyFactory;
+import com.mh.user.utils.*;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.context.ApplicationContext;
+import purejavacomm.SerialPort;
+
+import java.io.IOException;
+import java.util.Date;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static com.mh.user.tcp.SendAndReceiveByTcp.printLog;
+
+/**
+ * @author nxr
+ * @title :
+ * @description : 串口发送和接收处理,操作类
+ * @updateTime 2022-08-10
+ * @throws :
+ */
+@Slf4j
+public class TcpSingle {
+
+ // 调用service
+ ApplicationContext context = SpringBeanUtil.getApplicationContext();
+ DeviceInstallService deviceInstallService = context.getBean(DeviceInstallService.class);
+ NowDataService nowDataService = context.getBean(NowDataService.class);
+ BuildingService buildingService = context.getBean(BuildingService.class);
+
+ public String serialPortSend(DeviceCodeParamEntity deviceCodeParamEntity, GatewayManageEntity gatewayManageEntity) {
+ String rtData = "fail";
+ String comName = deviceCodeParamEntity.getDataCom().toUpperCase();
+ if (StringUtils.isBlank(comName)) {
+ return rtData;
+ }
+ try {
+ // 创建设备报文
+ Device device = DeviceFactory.createDevice(deviceCodeParamEntity.getDeviceType());
+ DeviceStrategy strategy = DeviceStrategyFactory.createStrategy(deviceCodeParamEntity.getDeviceType());
+ if (null == strategy) {
+ return rtData;
+ }
+ device.setStrategy(strategy);
+ String sendStr = device.createOrders(deviceCodeParamEntity);
+ //从获取响应数据
+ ConcurrentHashMap map = SessionMap.inst().getMap();
+ Set> entries = map.entrySet();
+ boolean flag = false;
+ String keyVal = null;
+ String heartBeat = gatewayManageEntity.getHeartBeat().toUpperCase();
+ for (Map.Entry entry : entries) {
+ String key = entry.getKey();
+ //log.info("当前session:{},当前sessionId:{},当前在线设备数:{}",entry,entry.getValue().getSessionId(),map.size());
+ if (key.contains(heartBeat)) {
+ flag = true;
+ keyVal = key;
+ break;
+ }
+ }
+ if (flag) {
+ ServerSession serverSession = map.get(keyVal);
+ serverSession.getChannel().writeAndFlush(NettyTools.createByteBuf(sendStr));
+ NettyTools.initReceiveMsg(heartBeat);
+ String receiveStr = "";
+ if (Constant.WEB_FLAG) {
+ receiveStr = NettyTools.waitReceiveMsg(heartBeat);
+ }
+ if (StringUtils.isBlank(receiveStr)) {
+ return Constant.FAIL;
+ }
+ receiveStr = receiveStr.replace("null", "").replace(" ", "");
+ log.info("TCP客户端" + heartBeat + "接收数据:" + receiveStr + ",大小: " + receiveStr.length());
+ //返回值全部变成大写
+ String receiveData = receiveStr.toUpperCase();
+ //截取去掉FE
+ String deviceType = deviceCodeParamEntity.getDeviceType();
+ String deviceAddr = deviceCodeParamEntity.getDeviceAddr();
+ String dataStr = "";
+ if (receiveData.length() > 8 && ("水表".equals(deviceType) || "电表".equals(deviceType))) {
+ String str1 = receiveData.substring(0, 8);
+ String str2 = receiveData.substring(8);
+ dataStr = str1.replace("FE", "") + str2;
+ } else {
+ dataStr = receiveData;
+ }
+ String registerAddr = deviceCodeParamEntity.getRegisterAddr();
+ String brand = deviceCodeParamEntity.getBrand();
+ String buildingId = deviceCodeParamEntity.getBuildingId();
+ String buildingName = buildingService.queryBuildingName(buildingId); //查询楼栋名称
+
+ deviceInstallService.updateOnline(deviceAddr, deviceType, buildingId, "在线"); //设备在线
+ log.info("{}在线,设备号:{},所属楼栋:{}", deviceType, deviceAddr, buildingName);
+ if (deviceType.equals("热泵")) {
+ String strState = nowDataService.selectState(buildingId, deviceAddr);
+ if (strState != null && strState.equals("离线")) { //采集到数据
+ nowDataService.updateRunState(buildingId, deviceAddr, "不运行", buildingName); //监控界面状态表热泵在线状态
+ }
+ }
+ rtData = device.analysisReceiveData(DateUtil.dateToString(new Date(), "yyyy-MM-dd HH:mm:ss"),
+ deviceType, registerAddr, brand, buildingId, buildingName, dataStr);
+ return rtData;
+ } else {
+ log.error("没有找到心跳包数据:{}", heartBeat);
+ }
+ } catch (Exception e) {
+ log.info("单抄TCP客户端:{}异常,没有数据返回!关闭串口", gatewayManageEntity.getHeartBeat(), e);
+ }
+ return Constant.FAIL;
+ }
+}
diff --git a/user-service/src/main/java/com/mh/user/utils/CacheUtil.java b/user-service/src/main/java/com/mh/user/utils/CacheUtil.java
index f65967e..158b75a 100644
--- a/user-service/src/main/java/com/mh/user/utils/CacheUtil.java
+++ b/user-service/src/main/java/com/mh/user/utils/CacheUtil.java
@@ -4,7 +4,9 @@ import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.github.benmanes.caffeine.cache.Cache;
import com.mh.user.entity.DeviceCodeParamEntity;
+import com.mh.user.entity.GatewayManageEntity;
import com.mh.user.service.DeviceCodeParamService;
+import com.mh.user.service.GatewayManageService;
import org.springframework.context.ApplicationContext;
import java.util.List;
@@ -22,17 +24,29 @@ public class CacheUtil {
Cache caffeineCache = (Cache) context.getBean("caffeineCache");
DeviceCodeParamService deviceCodeParamService = context.getBean(DeviceCodeParamService.class);
+ GatewayManageService gatewayManageService = context.getBean(GatewayManageService.class);
private static class SingletonHolder{
private static final CacheUtil instance=new CacheUtil();
}
private CacheUtil(){
+ // 缓存采集信息
createDeviceParams();
+ // 缓存网关信息数据
+ saveGatewayInfo();
}
+
public static CacheUtil getInstance(){
return SingletonHolder.instance;
}
+ private void saveGatewayInfo() {
+ if (caffeineCache.getIfPresent("gw") == null) {
+ List gatewayManageEntities = gatewayManageService.queryByOther(null, null);
+ caffeineCache.put("gw", gatewayManageEntities);
+ }
+ }
+
/**
* 把采集ddc设置的报文存入到缓存中
*/
@@ -70,6 +84,17 @@ public class CacheUtil {
return JSONArray.parseArray(JSONObject.toJSONString(cacheObject), DeviceCodeParamEntity.class);
}
+ public List getGatewayInfo() {
+ Object cacheObject = caffeineCache.getIfPresent("gw");
+ // 如果为空,重新添加
+ if (cacheObject == null) {
+ saveGatewayInfo();
+ // 在重新获取数据
+ cacheObject = caffeineCache.getIfPresent("gw");
+ }
+ return JSONArray.parseArray(JSONObject.toJSONString(cacheObject), GatewayManageEntity.class);
+ }
+
/**
* 删除缓存
*/
diff --git a/user-service/src/main/java/com/mh/user/utils/NettyTools.java b/user-service/src/main/java/com/mh/user/utils/NettyTools.java
new file mode 100644
index 0000000..cb35d18
--- /dev/null
+++ b/user-service/src/main/java/com/mh/user/utils/NettyTools.java
@@ -0,0 +1,101 @@
+package com.mh.user.utils;
+
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.mh.common.utils.StringUtils;
+import com.mh.user.constants.Constant;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.Objects;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author LJF
+ * @version 1.0
+ * @project TAD_Server
+ * @description 缓存等待数据
+ * @date 2023/7/4 08:45:16
+ */
+@Slf4j
+public class NettyTools {
+
+ public static ByteBuf getByteBuf(ChannelHandlerContext ctx, String sendStr) {
+ // byte类型的数据
+// String sendStr = "5803004900021914"; // 冷量计
+ // 申请一个数据结构存储信息
+ ByteBuf buffer = ctx.alloc().buffer();
+ // 将信息放入数据结构中
+ buffer.writeBytes(ExchangeStringUtil.hexStrToBinaryStr(sendStr));//对接需要16进制
+ return buffer;
+ }
+
+ public static ByteBuf createByteBuf(String sendStr) {
+ // byte类型的数据
+// String sendStr = "5803004900021914"; // 冷量计
+ // 申请一个数据结构存储信息
+ ByteBuf buffer = Unpooled.buffer();
+ // 将信息放入数据结构中
+ buffer.writeBytes(ExchangeStringUtil.hexStrToBinaryStr(sendStr));//对接需要16进制
+ return buffer;
+ }
+
+ /**
+ * 响应消息缓存
+ */
+ private static final Cache> responseMsgCache = CacheBuilder.newBuilder()
+ .maximumSize(50000)
+ .expireAfterWrite(1000, TimeUnit.SECONDS)
+ .build();
+
+
+ /**
+ * 等待响应消息
+ * @param key 消息唯一标识
+ * @return ReceiveDdcMsgVo
+ */
+ public static String waitReceiveMsg(String key) {
+
+ try {
+ //设置超时时间
+ String vo = Objects.requireNonNull(responseMsgCache.getIfPresent(key))
+ .poll(1000 * 10, TimeUnit.MILLISECONDS);
+
+ //删除key
+ responseMsgCache.invalidate(key);
+ return vo;
+ } catch (Exception e) {
+ log.error("获取数据异常,sn={},msg=null",key);
+ return Constant.FAIL;
+ }
+
+ }
+
+ /**
+ * 初始化响应消息的队列
+ * @param key 消息唯一标识
+ */
+ public static void initReceiveMsg(String key) {
+ responseMsgCache.put(key,new LinkedBlockingQueue(1));
+ }
+
+ /**
+ * 设置响应消息
+ * @param key 消息唯一标识
+ */
+ public static void setReceiveMsg(String key, String msg) {
+
+ if(responseMsgCache.getIfPresent(key) != null){
+ responseMsgCache.getIfPresent(key).add(msg);
+ return;
+ }
+
+ log.warn("sn {}不存在",key);
+ }
+
+}