Browse Source

1、中央热水接入通过Netty-TCPServer通信

dev
25604 2 weeks ago
parent
commit
5ea52c5c82
  1. 7
      user-service/pom.xml
  2. 2
      user-service/src/main/java/com/mh/user/constants/Constant.java
  3. 111
      user-service/src/main/java/com/mh/user/constants/FourthGEnum.java
  4. 3
      user-service/src/main/java/com/mh/user/controller/GatewayManageController.java
  5. 6
      user-service/src/main/java/com/mh/user/entity/GatewayManageEntity.java
  6. 294
      user-service/src/main/java/com/mh/user/job/JobCloudAndMeter.java
  7. 14
      user-service/src/main/java/com/mh/user/mapper/GatewayManageMapper.java
  8. 53
      user-service/src/main/java/com/mh/user/netty/NettyEchoServer.java
  9. 63
      user-service/src/main/java/com/mh/user/netty/decoder/CustomFrameEncoder.java
  10. 50
      user-service/src/main/java/com/mh/user/netty/handle/DataUploadServerHandler.java
  11. 50
      user-service/src/main/java/com/mh/user/netty/handle/ExceptionServerHandler.java
  12. 34
      user-service/src/main/java/com/mh/user/netty/handle/FourthChannelInitializer.java
  13. 102
      user-service/src/main/java/com/mh/user/netty/handle/HeartBeatServerHandler.java
  14. 101
      user-service/src/main/java/com/mh/user/netty/handle/LoginRequestHandler.java
  15. 66
      user-service/src/main/java/com/mh/user/netty/session/ServerSession.java
  16. 96
      user-service/src/main/java/com/mh/user/netty/session/SessionMap.java
  17. 20
      user-service/src/main/java/com/mh/user/netty/task/CallbackTask.java
  18. 80
      user-service/src/main/java/com/mh/user/netty/task/CallbackTaskScheduler.java
  19. 6
      user-service/src/main/java/com/mh/user/netty/task/ExecuteTask.java
  20. 67
      user-service/src/main/java/com/mh/user/netty/task/FutureTaskScheduler.java
  21. 19
      user-service/src/main/java/com/mh/user/serialport/SendAndReceiveByCom.java
  22. 15
      user-service/src/main/java/com/mh/user/serialport/SerialPortSendReceive2.java
  23. 22
      user-service/src/main/java/com/mh/user/serialport/SerialPortSingle2.java
  24. 28
      user-service/src/main/java/com/mh/user/serialport/SerialPortThread.java
  25. 3
      user-service/src/main/java/com/mh/user/service/GatewayManageService.java
  26. 2
      user-service/src/main/java/com/mh/user/service/SysLogService.java
  27. 92
      user-service/src/main/java/com/mh/user/service/impl/DeviceDisplayServiceImpl.java
  28. 13
      user-service/src/main/java/com/mh/user/service/impl/GatewayManageServiceImpl.java
  29. 5
      user-service/src/main/java/com/mh/user/service/impl/SysLogServiceImpl.java
  30. 204
      user-service/src/main/java/com/mh/user/tcp/SendAndReceiveByTcp.java
  31. 126
      user-service/src/main/java/com/mh/user/tcp/TcpSingle.java
  32. 25
      user-service/src/main/java/com/mh/user/utils/CacheUtil.java
  33. 101
      user-service/src/main/java/com/mh/user/utils/NettyTools.java

7
user-service/pom.xml

@ -173,6 +173,13 @@
<version>1.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.86.Final</version>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>

2
user-service/src/main/java/com/mh/user/constants/Constant.java

@ -13,6 +13,8 @@ public class Constant {
public static final CharSequence CUSTOM_NAME_GUANGSHANG = "广州商学院";
public static final CharSequence CUSTOM_NAME_HUARUAN = "广州软件学院";
public static final String WEATHER_DATA = "weather_data";
public static final String COMMUNITY_TYPE_REAL_COM = "realCom";
public static final String COMMUNITY_TYPE_TCP = "tcp";
public static boolean CONTROL_WEB_FLAG = false;
public static boolean SEND_STATUS = false; // 指令发送状态
public static volatile boolean FLAG = false;

111
user-service/src/main/java/com/mh/user/constants/FourthGEnum.java

@ -0,0 +1,111 @@
package com.mh.user.constants;
/**
* @author LJF
* @version 1.0
* @project TAD_Server
* @description 4G设备常量值
* @date 2023/7/3 18:14:12
*/
public enum FourthGEnum {
LOGIN_HEART_RESPONSE("0D", "登录和心跳确认控制码"),
LOGIN_HEART_REQUEST("8D", "登录和心跳请求控制码"),
LOGIN_HEART_EXCEPTION("CD", "登录和心跳异常应答控制码"),
LOGIN_TYPE("02", "登录类型"),
HEART_TYPE("01", "心跳类型"),
CLOSE_CONTROL_TYPE("1C", "合闸指令"),
OPEN_CONTROL_TYPE("1A", "跳闸指令"),
DATA_ACTIVE_UPLOAD("8E", "数据主动上报控制码"),
PULL_AND_CLOSE_DEVICE_REQUEST("1C", "跳合闸请求控制码"),
PULL_AND_CLOSE_DEVICE_RESPONSE("9C", "跳合闸应答控制码"),
PULL_AND_CLOSE_DEVICE_EXCEPTION("DC", "跳合闸应答异常控制码"),
CLEAR_DEVICE_REQUEST("1A", "电表清零请求控制码"),
CLEAR_DEVICE_RESPONSE("9A", "电表清零应答控制码"),
CLEAR_DEVICE_EXCEPTION("DA", "电表清零应答异常控制码"),
READ_DEVICE_REQUEST("11", "读电表请求控制码"),
READ_DEVICE_RESPONSE("91", "读电表应答控制码"),
READ_DEVICE_EXCEPTION("D1", "读电表应答异常控制码"),
WRITE_DEVICE_REQUEST("14", "写电表请求控制码"),
WRITE_DEVICE_RESPONSE("94", "读电表应答控制码"),
WRITE_DEVICE_IP("94", "写电表IP应答控制码"),
WRITE_DEVICE_EXCEPTION("D4", "读电表应答异常控制码"),
CHARGE_DEVICE_REQUEST("0F", "充值指令"),
CHARGE_DEVICE_RESPONSE("8F", "充值正确应答控制码"),
CHARGE_DEVICE_EXCEPTION("CF", "充值异常应答控制码"),
READ_TOTAL_CHARGE_IDENTIFY_CODE("00000000", "当前组合有功总电能"),
READ_STATUS_IDENTIFY_CODE("03050004", "读电表运行状态"),
READ_UPLOAD_CODE("02008104", "读取上报报文"),
TY_UPLOAD_CODE("55C2", "腾越水表上报标识"),
WRITE_CHARGE_TIME_IDENTIFY_CODE("07008104", "写入充值次数"),
WRITE_LIMIT_POWER("10008104","写入限容功率"),
READ_LIMIT_POWER("10008104","读取限容功率"),
READ_SOFT_VERSION("01008004","读软件版本"),
READ_HARD_VERSION("02008004","读硬件版本"),
READ_ICCID("05008104","读取ICCID"),
READ_IMEI("04008104","读取IMEI"),
WRITE_IP("0B008104","设置IP端口号"),
READ_IP("0B008104","读取IP端口号"),
UPGRADES("13008104","触发升级,写1触发升级")
;
private String code;
private String desc;
FourthGEnum(String code, String desc) {
this.code = code;
this.desc = desc;
}
public String getCode() {
return code;
}
public void setCode(String code) {
this.code = code;
}
public String getDesc() {
return desc;
}
public void setDesc(String desc) {
this.desc = desc;
}
@Override
public String toString() {
return "FourthGEnum{" +
"code='" + code + '\'' +
", desc='" + desc + '\'' +
'}';
}
}

3
user-service/src/main/java/com/mh/user/controller/GatewayManageController.java

@ -2,6 +2,7 @@ package com.mh.user.controller;
import com.mh.common.http.HttpResult;
import com.mh.user.entity.GatewayManageEntity;
import com.mh.user.service.GatewayManageService;
import com.mh.user.service.impl.DeviceDisplayServiceImpl;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
@ -13,7 +14,7 @@ import java.util.List;
public class GatewayManageController {
@Autowired
private DeviceDisplayServiceImpl.GatewayManageService gatewayManageService;
private GatewayManageService gatewayManageService;
//保存
@PostMapping(value="/save")

6
user-service/src/main/java/com/mh/user/entity/GatewayManageEntity.java

@ -29,4 +29,10 @@ public class GatewayManageEntity {
private String connectDate; //最新上线连接时间
private String remarks; //备注
private String type; //操作类型
private String heartBeat; // 心跳包
private String imei; // 设备IMEI
private String sn; // 设备SN
private String dataCom; // 通讯com口
private String thread; // 线程名称
private String communityType; // 通讯 类型:realCom,tcp
}

294
user-service/src/main/java/com/mh/user/job/JobCloudAndMeter.java

@ -1,159 +1,159 @@
package com.mh.user.job;
import com.mh.user.constants.SocketMessage;
import com.mh.user.entity.GatewayManageEntity;
import com.mh.user.serialport.SerialPortThread;
import com.mh.user.service.DeviceCodeParamService;
import com.mh.user.service.impl.DeviceDisplayServiceImpl;
import com.mh.user.utils.GetReadOrder485;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.ArrayList;
import java.util.List;
/**
* @author ljf
* @title
* @description 定时采集冷量计任务
* @updateTime 2020-05-18
* @throws
*/
/**
* :@DisallowConcurrentExecution : 此标记用在实现Job的类上面,意思是不允许并发执行.
* :注意org.quartz.threadPool.threadCount线程池中线程的数量至少要多个,否则@DisallowConcurrentExecution不生效
* :假如Job的设置时间间隔为3秒,但Job执行时间是5秒,设置@DisallowConcurrentExecution以后程序会等任务执行完毕以后再去执行,
* 否则会在3秒时再启用新的线程执行
*/
@DisallowConcurrentExecution
@Slf4j
public class JobCloudAndMeter implements Job {
@Autowired
private SocketMessage socketMessage;
@Autowired
DeviceDisplayServiceImpl.GatewayManageService gatewayManageService;
@Autowired
DeviceCodeParamService deviceCodeParamService;
private static int taskTimes = 1;
List<GatewayManageEntity> gatewayDtoList=new ArrayList<>();
GetReadOrder485 getReadOrder485=new GetReadOrder485();
@SneakyThrows
@Override
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
log.info("定时采集开始>>>>>>");
if(taskTimes==2){//2
int r=deviceCodeParamService.queryCount2(); //查询记录数
if (r==0){
getReadOrder485.createOrderParam2(); //生成采集参数
}
SerialPortThread myThread = new SerialPortThread();
Thread thread = new Thread(myThread);
myThread.setName("2","1");
thread.start();
System.out.println("当前活动线程数:"+Thread.activeCount());
Thread.currentThread().sleep(3000);//毫秒
// SerialPortThread myThread2 = new SerialPortThread();
// Thread thread2 = new Thread(myThread2);
// myThread2.setName("2","2");
// thread2.start();
// System.out.println("当前活动线程数:"+Thread.activeCount());
// Thread.currentThread().sleep(3000);//毫秒
//package com.mh.user.job;
//
// SerialPortThread myThread3 = new SerialPortThread();
// Thread thread3 = new Thread(myThread3);
// myThread3.setName("2","3");
// thread3.start();
// System.out.println("当前活动线程数:"+Thread.activeCount());
// Thread.currentThread().sleep(3000);//毫秒
//import com.mh.user.constants.SocketMessage;
//import com.mh.user.entity.GatewayManageEntity;
//import com.mh.user.serialport.SerialPortThread;
//import com.mh.user.service.DeviceCodeParamService;
//import com.mh.user.service.GatewayManageService;
//import com.mh.user.utils.GetReadOrder485;
//import lombok.SneakyThrows;
//import lombok.extern.slf4j.Slf4j;
//import org.quartz.DisallowConcurrentExecution;
//import org.quartz.Job;
//import org.quartz.JobExecutionContext;
//import org.quartz.JobExecutionException;
//import org.springframework.beans.factory.annotation.Autowired;
//import java.util.ArrayList;
//import java.util.List;
//
// SerialPortThread myThread4= new SerialPortThread();
// Thread thread4 = new Thread(myThread4);
// myThread4.setName("2","4");
// thread4.start();
// System.out.println("当前活动线程数:"+Thread.activeCount());
// Thread.currentThread().sleep(3000);//毫秒
System.out.println("采集水、电、运行状态!"+taskTimes);
taskTimes++;
}else if(taskTimes==3){//5
int r=deviceCodeParamService.queryCount3();//查询记录数
if (r==0){
getReadOrder485.createOrderParam3(); //生成采集参数
}
SerialPortThread myThread = new SerialPortThread();
Thread thread = new Thread(myThread);
myThread.setName("3","1");
thread.start();
System.out.println("当前活动线程数:"+Thread.activeCount());
Thread.currentThread().sleep(3000);//毫秒
// SerialPortThread myThread2 = new SerialPortThread();
// Thread thread2 = new Thread(myThread2);
// myThread2.setName("3","2");
// thread2.start();
// System.out.println("当前活动线程数:"+Thread.activeCount());
// Thread.currentThread().sleep(3000);//毫秒
///**
// * @author ljf
// * @title :
// * @description : 定时采集冷量计任务
// * @updateTime 2020-05-18
// * @throws :
// */
///**
// * :@DisallowConcurrentExecution : 此标记用在实现Job的类上面,意思是不允许并发执行.
// * :注意org.quartz.threadPool.threadCount线程池中线程的数量至少要多个,否则@DisallowConcurrentExecution不生效
// * :假如Job的设置时间间隔为3秒,但Job执行时间是5秒,设置@DisallowConcurrentExecution以后程序会等任务执行完毕以后再去执行,
// * 否则会在3秒时再启用新的线程执行
// */
//@DisallowConcurrentExecution
//@Slf4j
//public class JobCloudAndMeter implements Job {
//
// SerialPortThread myThread3 = new SerialPortThread();
// Thread thread3 = new Thread(myThread3);
// myThread3.setName("3","3");
// thread3.start();
// System.out.println("当前活动线程数:"+Thread.activeCount());
// Thread.currentThread().sleep(3000);//毫秒
// @Autowired
// private SocketMessage socketMessage;
//
// SerialPortThread myThread4= new SerialPortThread();
// Thread thread4 = new Thread(myThread4);
// myThread4.setName("3","4");
// thread4.start();
// System.out.println("当前活动线程数:"+Thread.activeCount());
// Thread.currentThread().sleep(3000);//毫秒
System.out.println("采集设定温度、设定水位、故障状态!"+taskTimes);
taskTimes=1;
}else if(taskTimes==1){
int r=deviceCodeParamService.queryCount(); //查询记录数
if (r==0){
getReadOrder485.createOrderParam(); //生成采集参数
}
SerialPortThread myThread = new SerialPortThread();
Thread thread = new Thread(myThread);
myThread.setName("1","1");
thread.start();
System.out.println("当前活动线程数:"+Thread.activeCount());
Thread.currentThread().sleep(3000);//毫秒
// SerialPortThread myThread2 = new SerialPortThread();
// Thread thread2 = new Thread(myThread2);
// myThread2.setName("1","2");
// thread2.start();
// @Autowired
// GatewayManageService gatewayManageService;
//
// @Autowired
// DeviceCodeParamService deviceCodeParamService;
//
// private static int taskTimes = 1;
// List<GatewayManageEntity> gatewayDtoList=new ArrayList<>();
// GetReadOrder485 getReadOrder485=new GetReadOrder485();
//
// @SneakyThrows
// @Override
// public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
// log.info("定时采集开始>>>>>>");
// if(taskTimes==2){//2
// int r=deviceCodeParamService.queryCount2(); //查询记录数
// if (r==0){
// getReadOrder485.createOrderParam2(); //生成采集参数
// }
//
// SerialPortThread myThread = new SerialPortThread();
// Thread thread = new Thread(myThread);
// myThread.setName("2","1");
// thread.start();
// System.out.println("当前活动线程数:"+Thread.activeCount());
// Thread.currentThread().sleep(3000);//毫秒
//
// SerialPortThread myThread3 = new SerialPortThread();
// Thread thread3 = new Thread(myThread3);
// myThread3.setName("1","3");
// thread3.start();
//// SerialPortThread myThread2 = new SerialPortThread();
//// Thread thread2 = new Thread(myThread2);
//// myThread2.setName("2","2");
//// thread2.start();
//// System.out.println("当前活动线程数:"+Thread.activeCount());
//// Thread.currentThread().sleep(3000);//毫秒
////
//// SerialPortThread myThread3 = new SerialPortThread();
//// Thread thread3 = new Thread(myThread3);
//// myThread3.setName("2","3");
//// thread3.start();
//// System.out.println("当前活动线程数:"+Thread.activeCount());
//// Thread.currentThread().sleep(3000);//毫秒
////
//// SerialPortThread myThread4= new SerialPortThread();
//// Thread thread4 = new Thread(myThread4);
//// myThread4.setName("2","4");
//// thread4.start();
//// System.out.println("当前活动线程数:"+Thread.activeCount());
//// Thread.currentThread().sleep(3000);//毫秒
// System.out.println("采集水、电、运行状态!"+taskTimes);
// taskTimes++;
// }else if(taskTimes==3){//5
// int r=deviceCodeParamService.queryCount3();//查询记录数
// if (r==0){
// getReadOrder485.createOrderParam3(); //生成采集参数
// }
//
// SerialPortThread myThread = new SerialPortThread();
// Thread thread = new Thread(myThread);
// myThread.setName("3","1");
// thread.start();
// System.out.println("当前活动线程数:"+Thread.activeCount());
// Thread.currentThread().sleep(3000);//毫秒
//
// SerialPortThread myThread4= new SerialPortThread();
// Thread thread4 = new Thread(myThread4);
// myThread4.setName("1","4");
// thread4.start();
//// SerialPortThread myThread2 = new SerialPortThread();
//// Thread thread2 = new Thread(myThread2);
//// myThread2.setName("3","2");
//// thread2.start();
//// System.out.println("当前活动线程数:"+Thread.activeCount());
//// Thread.currentThread().sleep(3000);//毫秒
////
//// SerialPortThread myThread3 = new SerialPortThread();
//// Thread thread3 = new Thread(myThread3);
//// myThread3.setName("3","3");
//// thread3.start();
//// System.out.println("当前活动线程数:"+Thread.activeCount());
//// Thread.currentThread().sleep(3000);//毫秒
////
//// SerialPortThread myThread4= new SerialPortThread();
//// Thread thread4 = new Thread(myThread4);
//// myThread4.setName("3","4");
//// thread4.start();
//// System.out.println("当前活动线程数:"+Thread.activeCount());
//// Thread.currentThread().sleep(3000);//毫秒
// System.out.println("采集设定温度、设定水位、故障状态!"+taskTimes);
// taskTimes=1;
// }else if(taskTimes==1){
// int r=deviceCodeParamService.queryCount(); //查询记录数
// if (r==0){
// getReadOrder485.createOrderParam(); //生成采集参数
// }
// SerialPortThread myThread = new SerialPortThread();
// Thread thread = new Thread(myThread);
// myThread.setName("1","1");
// thread.start();
// System.out.println("当前活动线程数:"+Thread.activeCount());
// Thread.currentThread().sleep(3000);//毫秒
System.out.println("采集水位、水温!"+taskTimes);
taskTimes++;
}
}
}
//
//// SerialPortThread myThread2 = new SerialPortThread();
//// Thread thread2 = new Thread(myThread2);
//// myThread2.setName("1","2");
//// thread2.start();
//// System.out.println("当前活动线程数:"+Thread.activeCount());
//// Thread.currentThread().sleep(3000);//毫秒
////
//// SerialPortThread myThread3 = new SerialPortThread();
//// Thread thread3 = new Thread(myThread3);
//// myThread3.setName("1","3");
//// thread3.start();
//// System.out.println("当前活动线程数:"+Thread.activeCount());
//// Thread.currentThread().sleep(3000);//毫秒
////
//// SerialPortThread myThread4= new SerialPortThread();
//// Thread thread4 = new Thread(myThread4);
//// myThread4.setName("1","4");
//// thread4.start();
//// System.out.println("当前活动线程数:"+Thread.activeCount());
//// Thread.currentThread().sleep(3000);//毫秒
// System.out.println("采集水位、水温!"+taskTimes);
// taskTimes++;
// }
// }
//}

14
user-service/src/main/java/com/mh/user/mapper/GatewayManageMapper.java

@ -34,6 +34,12 @@ public interface GatewayManageMapper {
@Result(column = "create_date", property = "createDate"),
@Result(column = "connect_date", property = "connectDate"),
@Result(column = "remarks", property = "remarks"),
@Result(column = "heart_beat", property = "heartBeat"),
@Result(column = "imei", property = "imei"),
@Result(column = "sn", property = "sn"),
@Result(column = "data_com", property = "dataCom"),
@Result(column = "thread", property = "thread"),
@Result(column = "community_type", property = "communityType")
})
List<GatewayManageEntity> queryByOther(@Param("grade") Integer grade, @Param("operator") Integer operator);
@ -97,4 +103,12 @@ public interface GatewayManageMapper {
// 根据设备类型查询数据库,找出对应的详细信息
@Select("select id, gateway_name, gateway_ip, gateway_address, data_com, connect_date, internet_card, operator, gateway_port, grade from gateway_manage where grade = #{grade}")
GatewayManageEntity queryGatewayByGrade(@Param("grade") Long grade);
// 根据设备心跳包更新设备在线状态
@Update("update gateway_manage set grade = #{status} where heart_beat = #{heartBeat}")
void updateGatewayManageOnlineByHeartBeatCode(String heartBeat, int status);
// 根据设备IMEI更新设备在线状态
@Update("update gateway_manage set grade = #{status} where imei = #{imei}")
void updateGatewayManageOnlineByImei(String imei, int status);
}

53
user-service/src/main/java/com/mh/user/netty/NettyEchoServer.java

@ -0,0 +1,53 @@
package com.mh.user.netty;
import com.mh.user.netty.handle.FourthChannelInitializer;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
/**
* @author LJF
* @title Netty
* @description netty 使用
* @updateTime 2020-04-21
* @throws
*/
@Slf4j
public class NettyEchoServer {
public void bind(int port) throws Exception {
// accept线程组,用来接收连接
EventLoopGroup bossGroup = new NioEventLoopGroup(2);
// IO 线程组,用来处理业务逻辑
EventLoopGroup workerGroup = new NioEventLoopGroup(4);
try {
// 服务端启动引导
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workerGroup) // 绑定两个线程
.channel(NioServerSocketChannel.class) // 指定通道类型
.option(ChannelOption.SO_BACKLOG, 1024) // 设置TCP连接的缓冲区
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new LoggingHandler(LogLevel.INFO)) // 设置日志级别
.childHandler(new FourthChannelInitializer()); // 初始化handle
// 通过bind启动服务
ChannelFuture f = serverBootstrap.bind(port).sync();
// 阻塞主线程,知道网络服务被关闭
//f.channel().closeFuture().sync();
f.channel().closeFuture().addListener(future -> {
log.info("Netty服务器已关闭");
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
});
} catch (Exception e){
log.error("netty服务器异常", e);
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}

63
user-service/src/main/java/com/mh/user/netty/decoder/CustomFrameEncoder.java

@ -0,0 +1,63 @@
package com.mh.user.netty.decoder;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class CustomFrameEncoder extends MessageToByteEncoder<String> {
private static final byte FRAME_START = 0x68;
private static final byte FRAME_END = 0x16;
@Override
protected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out) throws Exception {
log.info("数据到达:"+msg);
// Write first frame start
out.writeByte(FRAME_START);
// Write meter number
byte[] meterNumberBytes = hexStringToByteArray(msg.substring(2, 14)); // Extract meter number from message
out.writeBytes(meterNumberBytes);
// Write second frame start
out.writeByte(FRAME_START);
// Write control code
byte controlCode = hexStringToByteArray(msg.substring(14, 16))[0]; // Extract control code from message
out.writeByte(controlCode);
// Write data length
int dataLength = Integer.parseInt(msg.substring(16, 18), 16); // Extract data length from message
out.writeByte(dataLength);
// Write data
byte[] dataBytes = hexStringToByteArray(msg.substring(18, 18 + (dataLength * 2))); // Extract data from message
out.writeBytes(dataBytes);
// Calculate and write checksum
byte checksum = calculateChecksum(msg.substring(14, 18 + (dataLength * 2))); // Calculate checksum
out.writeByte(checksum);
// Write frame end
out.writeByte(FRAME_END);
}
private byte[] hexStringToByteArray(String hexString) {
int len = hexString.length();
byte[] data = new byte[len / 2];
for (int i = 0; i < len; i += 2) {
data[i / 2] = (byte) ((Character.digit(hexString.charAt(i), 16) << 4)
+ Character.digit(hexString.charAt(i + 1), 16));
}
return data;
}
private byte calculateChecksum(String message) {
byte checksum = 0;
for (int i = 0; i < message.length(); i += 2) {
checksum += Integer.parseInt(message.substring(i, i + 2), 16);
}
return (byte) (checksum & 0xFF);
}
}

50
user-service/src/main/java/com/mh/user/netty/handle/DataUploadServerHandler.java

@ -0,0 +1,50 @@
package com.mh.user.netty.handle;
import com.github.benmanes.caffeine.cache.Cache;
import com.mh.user.constants.FourthGEnum;
import com.mh.user.entity.GatewayManageEntity;
import com.mh.user.netty.session.ServerSession;
import com.mh.user.netty.session.SessionMap;
import com.mh.user.utils.CacheUtil;
import com.mh.user.utils.ExchangeStringUtil;
import com.mh.user.utils.NettyTools;
import com.mh.user.utils.SpringBeanUtil;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationContext;
import java.util.List;
@Slf4j
public class DataUploadServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//将接收到的数据转为字符串,此字符串就是客户端发送的字符串
String receiveStr = ExchangeStringUtil.bytesToHexString((byte[]) msg);
// 判断当前报文是否是上报数据报文
if (receiveStr != null) {
// 判断属于哪一个DTU网关上报的数据
CacheUtil instance = CacheUtil.getInstance();
List<GatewayManageEntity> gwList = instance.getGatewayInfo();
if (gwList != null && !gwList.isEmpty()) {
for (GatewayManageEntity gw : gwList) {
if (receiveStr.startsWith(ExchangeStringUtil.str2HexStr(gw.getSn()))) {
// 直接设置对应值
NettyTools.setReceiveMsg(gw.getHeartBeat(), receiveStr.substring(gw.getSn().length()));
} else {
// 判断是否登录,没有登录立马断开
String deviceCode = gw.getHeartBeat();
if (!SessionMap.inst().hasLogin(deviceCode+ctx.channel().remoteAddress())) {
ServerSession.closeSession(ctx);
return;
}
}
}
}
}
super.channelRead(ctx, msg);
}
}

50
user-service/src/main/java/com/mh/user/netty/handle/ExceptionServerHandler.java

@ -0,0 +1,50 @@
package com.mh.user.netty.handle;
import com.mh.user.netty.session.ServerSession;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;
/**
* @author LJF
* @version 1.0
* @project TAD_Server
* @description 异常处理
* @date 2023/7/5 09:02:03
*/
@Slf4j
public class ExceptionServerHandler extends ChannelInboundHandlerAdapter {
//private final ApplicationContext applicationContext = ApplicationContextProvider.getApplicationContext();
//private final MeterInfoService meterInfoService = applicationContext.getBean(MeterInfoService.class);
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.info("{}客户端断开连接!", ctx.channel().remoteAddress());
//String meterNumIp = ServerSession.closeSession(ctx);
//String[] split = meterNumIp.split("/");
//String meterNum = split[0];
//if (meterNum != null) {
// meterInfoService.insertTestDetail(meterNum,"offline");
//}
ServerSession.closeSession(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (cause instanceof UnsupportedOperationException) {
ServerSession.closeSession(ctx);
} else {
log.info("获得已知异常!",cause);
ctx.close();
}
log.info("获得已知异常!{}", cause.getMessage());
log.info("获得已知异常!", cause);
ServerSession.closeSession(ctx);
ctx.close();
}
}

34
user-service/src/main/java/com/mh/user/netty/handle/FourthChannelInitializer.java

@ -0,0 +1,34 @@
package com.mh.user.netty.handle;
import com.mh.user.netty.decoder.CustomFrameEncoder;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
/**
* @author LJF
* @version 1.0
* @project TAD_Server
* @description 初始换一个服务端
* @date 2023/7/3 15:58:05
*/
public class FourthChannelInitializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel channel) throws Exception {
// 电表报文分隔符,一般都是16结尾
//ByteBuf delimiter = Unpooled.copiedBuffer(new byte[]{22});
//打印分隔符
//channel.pipeline().addLast("frameDecoder", new DelimiterBasedFrameDecoder(1024,false, delimiter));
//分割报文处理解码器,防止数据粘包
// channel.pipeline().addLast(new CustomFrameEncoder());
// 处理登录
channel.pipeline().addLast(new LoginRequestHandler());
// 处理心跳
channel.pipeline().addLast(new HeartBeatServerHandler());
// 处理数据上报数据
channel.pipeline().addLast(new DataUploadServerHandler());
// 异常处理
channel.pipeline().addLast(new ExceptionServerHandler());
}
}

102
user-service/src/main/java/com/mh/user/netty/handle/HeartBeatServerHandler.java

@ -0,0 +1,102 @@
package com.mh.user.netty.handle;
import com.mh.user.model.SysLog;
import com.mh.user.netty.session.ServerSession;
import com.mh.user.service.GatewayManageService;
import com.mh.user.service.SysLogService;
import com.mh.user.utils.ExchangeStringUtil;
import com.mh.user.utils.SpringContextUtils;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationContext;
import java.util.Date;
import java.util.concurrent.TimeUnit;
/**
* @author LJF
* @version 1.0
* @project TAD_Server
* @description 心跳包设置
* @date 2023/7/4 13:54:27
*/
@Slf4j
public class HeartBeatServerHandler extends IdleStateHandler {
private final ApplicationContext applicationContext = SpringContextUtils.getApplicationContext();
private final SysLogService sysLogService = applicationContext.getBean(SysLogService.class);
private final GatewayManageService gatewayManageService = applicationContext.getBean(GatewayManageService.class);
//目前网关默认心跳30秒,多长时间没有心跳,就关闭连接
private static final int READ_IDLE_GAP = 30;
/**
* @param readerIdleTimeSeconds 最长 没有 read到心跳的时间
* @param writerIdleTimeSeconds 心跳包我们只是进行接收心跳没有所谓的写操作我们将其设置为0
* @param allIdleTimeSeconds 如果我们定义 多长时间没有 读或写操作我们使用这个参数
*/
public HeartBeatServerHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {
super(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds);
}
public HeartBeatServerHandler(long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) {
super(readerIdleTime, writerIdleTime, allIdleTime, unit);
}
public HeartBeatServerHandler(boolean observeOutput, long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) {
super(observeOutput, readerIdleTime, writerIdleTime, allIdleTime, unit);
}
public HeartBeatServerHandler() {
super(READ_IDLE_GAP, 0, 0, TimeUnit.SECONDS);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//将接收到的数据转为字符串,此字符串就是客户端发送的字符串
String receiveStr = ExchangeStringUtil.bytesToHexString((byte[]) msg);
// 判断当前报文是否是登录报文
if (receiveStr != null && receiveStr.startsWith("2400")) {
if (receiveStr.length() != 8) {
super.channelRead(ctx, msg);
return;
}
// 通过对应的心跳包码进行判断,然后更新网关在线情况
gatewayManageService.updateGatewayManageOnlineByHeartBeatCode(receiveStr, 0);
// //进行心跳包的处理
// FutureTaskScheduler.add(() -> {
// if (ctx.channel().isActive()) {
// // 开始发送第一条采集数据报文
// }
// });
}
super.channelRead(ctx, msg);
}
@Override
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
log.info("{} 已超过 " + READ_IDLE_GAP + " 秒没有接收到心跳包,已断开链接", ctx.channel().remoteAddress());
String imeiIP = ServerSession.closeSession(ctx);
String[] split = imeiIP.split("/");
String imei = split[0];
if (imei != null) {
// 添加到日志表,下线了
SysLog sysLog = new SysLog();
sysLog.setOperation(imei + "下线了");
sysLog.setCreateBy("开发者");
sysLog.setIp(imeiIP);
sysLog.setCreateTime(new Date());
sysLogService.insertLog(sysLog);
// 更新网关在线情况
gatewayManageService.updateGatewayManageOnlineByImei(imei, 1);
}
ctx.close();
}
}

101
user-service/src/main/java/com/mh/user/netty/handle/LoginRequestHandler.java

@ -0,0 +1,101 @@
package com.mh.user.netty.handle;
import com.mh.common.utils.StringUtils;
import com.mh.user.netty.session.ServerSession;
import com.mh.user.netty.session.SessionMap;
import com.mh.user.netty.task.CallbackTask;
import com.mh.user.netty.task.CallbackTaskScheduler;
import com.mh.user.utils.ExchangeStringUtil;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;
/**
* @author LJF
* @version 1.0
* @project TAD_Server
* @description 登录报文处理
* @date 2023/7/3 15:34:11
*/
@Slf4j
public class LoginRequestHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//将接收到的数据转为字符串,此字符串就是客户端发送的字符串
String receiveStr = ExchangeStringUtil.bytesToHexString((byte[]) msg);
if (StringUtils.isBlank(receiveStr)) {
super.channelRead(ctx, msg);
return;
}
// 判断当前报文是否是心跳包上线: 869530073040186
log.info("接收到的心跳报文 <== {}", receiveStr);
// 获取IMEI号
String deviceCode = receiveStr;
String meterNum = deviceCode;
deviceCode = deviceCode + ctx.channel().remoteAddress();
//新的session的创建
ServerSession session = new ServerSession(ctx.channel(), deviceCode);
//进行登录逻辑处理,异步进行处理。并且需要知道 处理的结果。 callbacktask就要
//派上用场了
String finalDeviceCode = deviceCode;
CallbackTaskScheduler.add(new CallbackTask<Boolean>() {
@Override
public Boolean execute() throws Exception {
//进行 login 逻辑的处理
return action(session, finalDeviceCode, ctx, meterNum);
}
//没有异常的话,我们进行处理
@Override
public void onBack(Boolean result) {
if (result) {
log.info("设备登录成功: 设备号 = " + session.getSessionId());
//ctx.pipeline().remove(LoginRequestHandler.class); //压测需要放开
} else {
log.info("设备刷新登录: 设备号 = " + session.getSessionId());
SessionMap.inst().updateSession(finalDeviceCode, session, meterNum);
//log.info("设备登录失败: 设备号 = " + session.getSessionId());
//ServerSession.closeSession(ctx);
// 假如说已经在会话中了,直接断开连接
//ctx.close();
}
}
//有异常的话,我们进行处理
@Override
public void onException(Throwable t) {
log.info("设备登录异常: 设备号 = " + session.getSessionId());
ServerSession.closeSession(ctx);
}
});
}
private boolean action(ServerSession session, String deviceCode, ChannelHandlerContext ctx, String meterNum) {
//user验证
boolean isValidUser = checkUser(deviceCode, session);
if (!isValidUser) {
//我们发送登录成功的报文给 客户端
session.bind();
return false;
}
//我们发送登录成功的报文给 客户端
session.bind();
return true;
}
private boolean checkUser(String deviceCode, ServerSession session) {
//当前用户已经登录
if (SessionMap.inst().hasLogin(deviceCode)) {
log.info("设备已经登录: 设备号 = " + deviceCode);
return false;
}
//一般情况下,我们会将 user存储到 DB中,然后对user的用户名和密码进行校验
//但是,我们这边没有进行db的集成,所以我们想一个别的办法进行user的校验。在我们的sessionMap进行以下校验
//为什么选sessionmap,因为我们user的会话,都是存储到sessionmap中的,sessionmap中只要有这个user的会话,说明就是ok的
return true;
}
}

66
user-service/src/main/java/com/mh/user/netty/session/ServerSession.java

@ -0,0 +1,66 @@
package com.mh.user.netty.session;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.AttributeKey;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
@Data
@Slf4j
public class ServerSession {
public static final AttributeKey<ServerSession> SESSION_KEY =
AttributeKey.valueOf("SESSION_KEY");
//通道
private Channel channel;
private final String sessionId;
private boolean isLogin = false;
public ServerSession(Channel channel, String deviceCode){
this.channel = channel;
this.sessionId = deviceCode;
}
//session需要和通道进行一定的关联,他是在构造函数中关联上的;
//session还需要通过sessionkey和channel进行再次的关联;channel.attr方法.set当前的
// serverSession
//session需要被添加到我们的SessionMap中
public void bind(){
log.info("server Session 会话进行绑定 :" + channel.remoteAddress());
channel.attr(SESSION_KEY).set(this);
SessionMap.inst().addSession(sessionId, this);
this.isLogin = true;
}
//通过channel获取session
public static ServerSession getSession(ChannelHandlerContext ctx){
Channel channel = ctx.channel();
return channel.attr(SESSION_KEY).get();
}
//关闭session,新增返回一个meterNum用于纪录设备下线时间2024-05-08
public static String closeSession(ChannelHandlerContext ctx){
String meterNum = null;
ServerSession serverSession = ctx.channel().attr(SESSION_KEY).get();
if(serverSession != null && serverSession.getSessionId() != null) {
ChannelFuture future = serverSession.channel.close();
future.addListener((ChannelFutureListener) future1 -> {
if(!future1.isSuccess()) {
log.info("Channel close error!");
}
});
ctx.close();
meterNum = serverSession.sessionId;
SessionMap.inst().removeSession(serverSession.sessionId);
log.info(ctx.channel().remoteAddress()+" "+serverSession.sessionId + "==>移除会话");
}
return meterNum;
}
//写消息
public void writeAndFlush(Object msg) {
channel.writeAndFlush(msg);
}
}

96
user-service/src/main/java/com/mh/user/netty/session/SessionMap.java

@ -0,0 +1,96 @@
package com.mh.user.netty.session;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
@Data
@Slf4j
public class SessionMap {
private ThreadLocal<Boolean> sceneThreadLocal = new ThreadLocal<>();
//用单例模式进行sessionMap的创建
private SessionMap(){}
private static SessionMap singleInstance = new SessionMap();
public static SessionMap inst() {
return singleInstance;
}
//进行会话的保存
//key 我们使用 sessionId;value 需要是 serverSession
private ConcurrentHashMap<String, ServerSession> map = new ConcurrentHashMap<>(256);
//添加session
public void addSession(String sessionId, ServerSession s) {
map.put(sessionId, s);
log.info("IP地址:"+s.getChannel().remoteAddress()+" "+ sessionId + " 表具上线,总共表具:" + map.size());
}
//删除session
public void removeSession(String sessionId) {
if(map.containsKey(sessionId)) {
ServerSession s = map.get(sessionId);
map.remove(sessionId);
log.info("设备id下线:{},在线设备:{}", s.getSessionId(), map.size() );
}
return;
}
public boolean hasLogin(String sessionId) {
Iterator<Map.Entry<String, ServerSession>> iterator = map.entrySet().iterator();
while(iterator.hasNext()) {
Map.Entry<String, ServerSession> next = iterator.next();
if(sessionId != null && sessionId.equalsIgnoreCase(next.getValue().getSessionId())) {
return true ;
}
}
return false;
}
//如果在线,肯定有sessionMap里保存的 serverSession
//如果不在线,serverSession也没有。用这个来判断是否在线
public List<ServerSession> getSessionBy(String sessionId) {
return map.values().stream().
filter(s -> s.getSessionId().equals(sessionId)).
collect(Collectors.toList());
}
public boolean getScene() {
return sceneThreadLocal.get();
}
public void initScene(Boolean status) {
if (sceneThreadLocal == null) {
log.info("======创建ThreadLocal======");
sceneThreadLocal = new ThreadLocal<>();
}
log.info("设置状态==>" + status);
sceneThreadLocal.set(status);
}
public void clearScene() {
initScene(null);
sceneThreadLocal.remove();
}
public void updateSession(String sessionId, ServerSession session, String meterNum) {
Iterator<Map.Entry<String, ServerSession>> iterator = map.entrySet().iterator();
while(iterator.hasNext()) {
Map.Entry<String, ServerSession> next = iterator.next();
if (next.getKey().contains(meterNum)){
iterator.remove();
}
if(sessionId != null && sessionId.equalsIgnoreCase(next.getValue().getSessionId())) {
next.setValue(session);
}
}
}
}

20
user-service/src/main/java/com/mh/user/netty/task/CallbackTask.java

@ -0,0 +1,20 @@
package com.mh.user.netty.task;
/**
* @author LJF
* @version 1.0
* @project TAD_Server
* @description 回调任务
* @date 2023/7/3 15:34:11
*/
public interface CallbackTask<T> {
T execute() throws Exception;
/**
* // 执行没有 异常的情况下的 返回值
* @param t
*/
void onBack(T t);
void onException(Throwable t);
}

80
user-service/src/main/java/com/mh/user/netty/task/CallbackTaskScheduler.java

@ -0,0 +1,80 @@
package com.mh.user.netty.task;
import com.google.common.util.concurrent.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author LJF
* @version 1.0
* @project TAD_Server
* @description 回调任务
* @date 2023/7/3 15:34:11
*/
public class CallbackTaskScheduler extends Thread {
private ConcurrentLinkedQueue<CallbackTask> executeTaskQueue =
new ConcurrentLinkedQueue<>();
private long sleepTime = 1000 * 10;
private final ExecutorService pool = Executors.newCachedThreadPool();
ListeningExecutorService lpool = MoreExecutors.listeningDecorator(pool);
private static CallbackTaskScheduler inst = new CallbackTaskScheduler();
private CallbackTaskScheduler() {
this.start();
}
//add task
public static <T> void add(CallbackTask<T> executeTask) {
inst.executeTaskQueue.add(executeTask);
}
@Override
public void run() {
while (true) {
handleTask();
//为了避免频繁连接服务器,但是当前连接服务器过长导致失败
//threadSleep(sleepTime);
}
}
private void threadSleep(long sleepTime) {
try {
Thread.sleep(sleepTime);
} catch (Exception e) {
e.printStackTrace();
}
}
//任务执行
private void handleTask() {
CallbackTask executeTask = null;
while (executeTaskQueue.peek() != null) {
executeTask = executeTaskQueue.poll();
handleTask(executeTask);
}
}
private <T> void handleTask(CallbackTask<T> executeTask) {
ListenableFuture<T> future = lpool.submit(new Callable<T>() {
public T call() throws Exception {
return executeTask.execute();
}
});
Futures.addCallback(future, new FutureCallback<T>() {
@Override
public void onSuccess(T t) {
executeTask.onBack(t);
}
@Override
public void onFailure(Throwable throwable) {
executeTask.onException(throwable);
}
}, lpool);
}
}

6
user-service/src/main/java/com/mh/user/netty/task/ExecuteTask.java

@ -0,0 +1,6 @@
package com.mh.user.netty.task;
//不需要知道异步线程的 返回值
public interface ExecuteTask {
void execute();
}

67
user-service/src/main/java/com/mh/user/netty/task/FutureTaskScheduler.java

@ -0,0 +1,67 @@
package com.mh.user.netty.task;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author LJF
* @version 1.0
* @project TAD_Server
* @description 任务定时
* @date 2023/7/3 15:34:11
*/
public class FutureTaskScheduler extends Thread{
private ConcurrentLinkedQueue<ExecuteTask> executeTaskQueue =
new ConcurrentLinkedQueue<>();
private long sleepTime = 200;
private ExecutorService pool = Executors.newFixedThreadPool(10);
private static FutureTaskScheduler inst = new FutureTaskScheduler();
public FutureTaskScheduler() {
this.start();
}
//任务添加
public static void add(ExecuteTask executeTask) {
inst.executeTaskQueue.add(executeTask);
}
@Override
public void run() {
while (true) {
handleTask();
//threadSleep(sleepTime);
}
}
private void threadSleep(long sleepTime) {
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//执行任务
private void handleTask() {
ExecuteTask executeTask;
while (executeTaskQueue.peek() != null) {
executeTask = executeTaskQueue.poll();
handleTask(executeTask);
}
//刷新心跳时间
}
private void handleTask(ExecuteTask executeTask) {
pool.execute(new ExecuteRunnable(executeTask));
}
class ExecuteRunnable implements Runnable {
ExecuteTask executeTask;
public ExecuteRunnable(ExecuteTask executeTask) {
this.executeTask = executeTask;
}
@Override
public void run() {
executeTask.execute();
}
}
}

19
user-service/src/main/java/com/mh/user/serialport/SendAndReceiveByCom.java

@ -11,6 +11,7 @@ import com.mh.user.service.NowDataService;
import com.mh.user.service.SysParamService;
import com.mh.user.strategy.DeviceStrategy;
import com.mh.user.strategy.DeviceStrategyFactory;
import com.mh.user.tcp.SendAndReceiveByTcp;
import com.mh.user.utils.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationContext;
@ -119,20 +120,8 @@ public class SendAndReceiveByCom {
String dateStr = DateUtil.dateToString(date1, "yyyy-MM-dd HH:mm:ss");
if (bytes == null) {
SerialTool.closePort(serialPort);
log.info("串口" + serialPort.getName() + "没有数据返回!" + i);
log.info("----------------" + deviceType + "离线,设备号:" + deviceAddr + ",所属楼栋:" + buildingName + "----------------");
String time1 = deviceInstallService.selectLastDate(deviceType, deviceAddr, buildingId);
if (time1 == null) {
time1 = dateStr;
}
int d = ExchangeStringUtil.compareCopyTime(time1, dateStr);
if (d == 1) {
deviceInstallService.updateNotOnline(deviceAddr, deviceType, buildingId, "离线"); //所有设备离线
if (deviceType.equals("热泵")) {
nowDataService.updateRunState(buildingId, deviceAddr, "离线", buildingName); //监控界面状态表热泵在线状态
}
}
continue;
log.info("串口{}没有数据返回!{}", serialPort.getName(), i);
SendAndReceiveByTcp.printLog(deviceAddr, deviceType, buildingId, buildingName, dateStr, log, deviceInstallService, nowDataService);
}
// 处理返回来的数据报文
dealReceiveData(dateStr, serialPort, i, deviceAddr, deviceType, registerAddr, brand, buildingId, buildingName, bytes, device);
@ -145,7 +134,7 @@ public class SendAndReceiveByCom {
} finally {
if (null != serialPort) {
SerialTool.closePort(serialPort);
log.info("关闭串口==" + serialPort.getName());
log.info("关闭串口=={}", serialPort.getName());
}
}
}

15
user-service/src/main/java/com/mh/user/serialport/SerialPortSendReceive2.java

@ -5,6 +5,7 @@ import com.mh.user.constants.Constant;
import com.mh.user.entity.DeviceCodeParamEntity;
import com.mh.user.entity.SysParamEntity;
import com.mh.user.service.*;
import com.mh.user.tcp.SendAndReceiveByTcp;
import com.mh.user.utils.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationContext;
@ -114,19 +115,7 @@ public class SerialPortSendReceive2 {
if (bytes == null) {
SerialTool.closePort(serialPort);
log.info("串口" + serialPort.getName() + "没有数据返回!" + i);
log.info("----------------" + deviceType + "离线,设备号:" + deviceAddr + ",所属楼栋:" + buildingName + "----------------");
String time1 = deviceInstallService.selectLastDate(deviceType, deviceAddr, buildingId);
if (time1 == null) {
time1 = dateStr;
}
int d = ExchangeStringUtil.compareCopyTime(time1, dateStr);
if (d == 1) {
deviceInstallService.updateNotOnline(deviceAddr, deviceType, buildingId, "离线"); //所有设备离线
if (deviceType.equals("热泵")) {
nowDataService.updateRunState(buildingId, deviceAddr, "离线", buildingName); //监控界面状态表热泵在线状态
}
}
continue;
SendAndReceiveByTcp.printLog(deviceAddr, deviceType, buildingId, buildingName, dateStr, log, deviceInstallService, nowDataService);
}
// 处理返回来的数据报文
dealReceiveData(dateStr, serialPort, i, deviceAddr, deviceType, registerAddr, brand, buildingId, buildingName, bytes);

22
user-service/src/main/java/com/mh/user/serialport/SerialPortSingle2.java

@ -3,6 +3,7 @@ package com.mh.user.serialport;
import com.mh.common.utils.StringUtils;
import com.mh.user.constants.Constant;
import com.mh.user.entity.DeviceCodeParamEntity;
import com.mh.user.entity.GatewayManageEntity;
import com.mh.user.factory.Device;
import com.mh.user.factory.DeviceFactory;
import com.mh.user.service.BuildingService;
@ -10,12 +11,15 @@ import com.mh.user.service.DeviceInstallService;
import com.mh.user.service.NowDataService;
import com.mh.user.strategy.DeviceStrategy;
import com.mh.user.strategy.DeviceStrategyFactory;
import com.mh.user.tcp.SendAndReceiveByTcp;
import com.mh.user.tcp.TcpSingle;
import com.mh.user.utils.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationContext;
import java.io.IOException;
import java.util.Date;
import java.util.List;
import purejavacomm.SerialPort;
@ -36,6 +40,24 @@ public class SerialPortSingle2 {
BuildingService buildingService = context.getBean(BuildingService.class);
public String serialPortSend(DeviceCodeParamEntity deviceCodeParamEntity) {
// 增加一个判断,判断是否tcp通信
// 增加判断是什么通讯类型:realCom、tcp
CacheUtil instance = CacheUtil.getInstance();
List<GatewayManageEntity> gwList = instance.getGatewayInfo();
if (gwList != null && !gwList.isEmpty()) {
for (GatewayManageEntity gw : gwList) {
if (gw.getDataCom().toUpperCase().equals(deviceCodeParamEntity.getDataCom().toUpperCase())) {
String communityType = gw.getCommunityType();
if (Constant.COMMUNITY_TYPE_TCP.equals(communityType)) {
TcpSingle tcpSingle = new TcpSingle();
return tcpSingle.serialPortSend(deviceCodeParamEntity, gw);
} else if (Constant.COMMUNITY_TYPE_REAL_COM.equals(communityType)) {
// 直接跳出判断,进行下一步
break;
}
}
}
}
SerialPort serialPort = null;
String rtData = "fail";
String comName = deviceCodeParamEntity.getDataCom().toUpperCase();

28
user-service/src/main/java/com/mh/user/serialport/SerialPortThread.java

@ -1,8 +1,13 @@
package com.mh.user.serialport;
import com.mh.user.constants.Constant;
import com.mh.user.entity.GatewayManageEntity;
import com.mh.user.tcp.SendAndReceiveByTcp;
import com.mh.user.utils.CacheUtil;
import gnu.io.SerialPort;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.concurrent.CountDownLatch;
/**
@ -35,6 +40,14 @@ public class SerialPortThread implements Runnable{
public void run(){
log.info("创建发送接收数据线程>>>>>>>>>>>>>>" + thread);
// 增加判断是什么通讯类型:realCom、tcp
CacheUtil instance = CacheUtil.getInstance();
List<GatewayManageEntity> gwList = instance.getGatewayInfo();
if (gwList != null && !gwList.isEmpty()) {
for (GatewayManageEntity gw : gwList) {
if (gw.getDataCom().toUpperCase().equals("COM"+thread)) {
String communityType = gw.getCommunityType();
if (Constant.COMMUNITY_TYPE_REAL_COM.equals(communityType)) {
SendAndReceiveByCom sendAndReceiveByCom = new SendAndReceiveByCom();
try {
sendAndReceiveByCom.sendAndReceive(name, thread);
@ -43,5 +56,20 @@ public class SerialPortThread implements Runnable{
} finally {
this.countDownLatch.countDown();
}
} else if (Constant.COMMUNITY_TYPE_TCP.equals(communityType)) {
// 开始TCP通信
SendAndReceiveByTcp sendAndReceiveByTcp = new SendAndReceiveByTcp();
try {
sendAndReceiveByTcp.sendAndReceive(name, thread, gw.getHeartBeat());
} catch (Exception e) {
log.error("TCP通信发生异常: ", e);
} finally {
this.countDownLatch.countDown();
}
}
}
}
}
}
}

3
user-service/src/main/java/com/mh/user/service/GatewayManageService.java

@ -50,4 +50,7 @@ public interface GatewayManageService {
// 根据grade查询对应的网关路由信息
GatewayManageEntity queryGatewayByGrade(Long grade);
void updateGatewayManageOnlineByHeartBeatCode(String heartBeat, int status);
void updateGatewayManageOnlineByImei(String imei, int status);
}

2
user-service/src/main/java/com/mh/user/service/SysLogService.java

@ -22,4 +22,6 @@ public interface SysLogService extends CurdService<SysLog> {
SysLog logInfo(int id);
SysParamEntity selectSysParam();
void insertLog(SysLog sysLog);
}

92
user-service/src/main/java/com/mh/user/service/impl/DeviceDisplayServiceImpl.java

@ -36,50 +36,50 @@ public class DeviceDisplayServiceImpl implements DeviceDisplayService {
return deviceDisplayMapper.queryDeviceStatus(deviceType);
}
/**
* 网关管理服务类
* authorljf
* createdate2020-05-21
*/
public static interface GatewayManageService {
/**
* 根据条件查询网关信息
* @param grade
* @param operator
* @return
*/
List<GatewayManageEntity> queryByOther(Integer grade, Integer operator);
/**
* 新增或更新网关信息
* @param gatewayManageEntity
*/
void addOrUpdateGateWayInfo(GatewayManageEntity gatewayManageEntity);
// 查询全部
List<GatewayManageEntity> queryAll(String gatewayID, String operator, String grade, int page, int limit);
// 查询记录数
int queryCount(String gatewayID, String operator, String grade);
// 添加网关设备
String insertGatewayManage(GatewayManageEntity gatewayManageEntity);
// 根据网关ID删除网关设备
void deleteGatewayManageByID(int gatewayID);
// 根据网关ID查询设备信息
GatewayManageEntity queryGatewayByID(Long gatewayID);
// 设备总数
int queryByOtherCount(int page, int size, int gatewayID);
// 更新网关管理器的最新连接数据
void updateGatewayManage(String IP, String port);
// 根据grade查询对应的网关路由信息
GatewayManageEntity queryGatewayByGrade(Long grade);
}
// /**
// * 网关管理服务类
// * author:ljf
// * create—date:2020-05-21
// */
// public static interface GatewayManageService {
//
// /**
// * 根据条件查询网关信息
// * @param grade
// * @param operator
// * @return
// */
// List<GatewayManageEntity> queryByOther(Integer grade, Integer operator);
//
// /**
// * 新增或更新网关信息
// * @param gatewayManageEntity
// */
// void addOrUpdateGateWayInfo(GatewayManageEntity gatewayManageEntity);
//
// // 查询全部
// List<GatewayManageEntity> queryAll(String gatewayID, String operator, String grade, int page, int limit);
//
// // 查询记录数
// int queryCount(String gatewayID, String operator, String grade);
//
// // 添加网关设备
// String insertGatewayManage(GatewayManageEntity gatewayManageEntity);
//
// // 根据网关ID删除网关设备
// void deleteGatewayManageByID(int gatewayID);
//
// // 根据网关ID查询设备信息
// GatewayManageEntity queryGatewayByID(Long gatewayID);
//
// // 设备总数
// int queryByOtherCount(int page, int size, int gatewayID);
//
// // 更新网关管理器的最新连接数据
// void updateGatewayManage(String IP, String port);
//
// // 根据grade查询对应的网关路由信息
// GatewayManageEntity queryGatewayByGrade(Long grade);
//
// }
}

13
user-service/src/main/java/com/mh/user/service/impl/GatewayManageServiceImpl.java

@ -2,6 +2,7 @@ package com.mh.user.service.impl;
import com.mh.user.entity.GatewayManageEntity;
import com.mh.user.mapper.GatewayManageMapper;
import com.mh.user.service.GatewayManageService;
import org.springframework.stereotype.Service;
import java.util.List;
@ -14,7 +15,7 @@ import java.util.List;
* @throws
*/
@Service
public class GatewayManageServiceImpl implements DeviceDisplayServiceImpl.GatewayManageService {
public class GatewayManageServiceImpl implements GatewayManageService {
// 通过构造函数注入,引用Mapper
private final GatewayManageMapper gatewayManageMapper;
@ -97,4 +98,14 @@ public class GatewayManageServiceImpl implements DeviceDisplayServiceImpl.Gatewa
public GatewayManageEntity queryGatewayByGrade(Long grade) {
return gatewayManageMapper.queryGatewayByGrade(grade);
}
@Override
public void updateGatewayManageOnlineByHeartBeatCode(String heartBeat, int status) {
gatewayManageMapper.updateGatewayManageOnlineByHeartBeatCode(heartBeat, status);
}
@Override
public void updateGatewayManageOnlineByImei(String imei, int status) {
gatewayManageMapper.updateGatewayManageOnlineByImei(imei, status);
}
}

5
user-service/src/main/java/com/mh/user/service/impl/SysLogServiceImpl.java

@ -73,4 +73,9 @@ public class SysLogServiceImpl implements SysLogService {
public SysParamEntity selectSysParam() {
return sysLogMapper.selectSysParam();
}
@Override
public void insertLog(SysLog sysLog) {
sysLogMapper.insert(sysLog);
}
}

204
user-service/src/main/java/com/mh/user/tcp/SendAndReceiveByTcp.java

@ -0,0 +1,204 @@
package com.mh.user.tcp;
import com.mh.common.utils.StringUtils;
import com.mh.user.constants.Constant;
import com.mh.user.entity.DeviceCodeParamEntity;
import com.mh.user.factory.Device;
import com.mh.user.factory.DeviceFactory;
import com.mh.user.netty.session.ServerSession;
import com.mh.user.netty.session.SessionMap;
import com.mh.user.serialport.SerialTool;
import com.mh.user.service.BuildingService;
import com.mh.user.service.DeviceInstallService;
import com.mh.user.service.NowDataService;
import com.mh.user.strategy.DeviceStrategy;
import com.mh.user.strategy.DeviceStrategyFactory;
import com.mh.user.utils.*;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.springframework.context.ApplicationContext;
import purejavacomm.SerialPort;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
/**
* @author LJF
* @version 1.0
* @project CHWS
* @description 通过tcp发送和接收数据
* @date 2024-03-18 14:56:32
*/
@Slf4j
public class SendAndReceiveByTcp {
List<DeviceCodeParamEntity> deviceManageEntityList;
ApplicationContext context = SpringBeanUtil.getApplicationContext();
DeviceInstallService deviceInstallService = context.getBean(DeviceInstallService.class);
NowDataService nowDataService = context.getBean(NowDataService.class);
BuildingService buildingService = context.getBean(BuildingService.class);
public void sendAndReceive(String sort, String thread, String heartBeat) {
CacheUtil cacheUtil = CacheUtil.getInstance();
try {
//生成对应的采集指令
List<DeviceCodeParamEntity> deviceParamsByType = cacheUtil.getDeviceParamsByType(sort);
deviceManageEntityList = deviceParamsByType
.parallelStream()
.filter(value -> value.getThread().equals(thread))
.sorted(Comparator.comparing(DeviceCodeParamEntity::getDataCom))
.collect(Collectors.toList());
int size = deviceManageEntityList.size();
for (int i = 0; i < size; i++) {
//判断网页端是否有操作设备的
if (Constant.WEB_FLAG) {
log.info("有指令下发退出定时采集");
break;
}
String deviceAddr = deviceManageEntityList.get(i).getDeviceAddr();
String deviceType = deviceManageEntityList.get(i).getDeviceType();
String registerAddr = deviceManageEntityList.get(i).getRegisterAddr();
String brand = deviceManageEntityList.get(i).getBrand();//品牌
String buildingId = deviceManageEntityList.get(i).getBuildingId();
String buildingName = buildingService.queryBuildingName(buildingId); //查询楼栋名称
// 创建设备报文
Device device = DeviceFactory.createDevice(deviceType);
DeviceStrategy strategy = DeviceStrategyFactory.createStrategy(deviceType);
if (null == strategy) {
continue;
}
device.setStrategy(strategy);
String sendStr = device.createOrders(deviceManageEntityList.get(i));
try {
//向串口发送指令
if (StringUtils.isBlank(sendStr)) {
continue;
}
if (deviceType.equals("热泵")) {
for (int j = 0; j < 4; j++) {
Thread.sleep(1000);
// 判断网页端是否有操作设备的
if (Constant.WEB_FLAG) {
log.info("有指令下发退出定时采集");
break;
}
}
} else {
Thread.sleep(2000);
}
//从获取响应数据
ConcurrentHashMap<String, ServerSession> map = SessionMap.inst().getMap();
Set<Map.Entry<String, ServerSession>> entries = map.entrySet();
boolean flag = false;
String keyVal = null;
for (Map.Entry<String, ServerSession> entry : entries) {
String key = entry.getKey();
//log.info("当前session:{},当前sessionId:{},当前在线设备数:{}",entry,entry.getValue().getSessionId(),map.size());
if (key.contains(heartBeat)) {
flag = true;
keyVal = key;
break;
}
}
if (flag) {
ServerSession serverSession = map.get(keyVal);
serverSession.getChannel().writeAndFlush(NettyTools.createByteBuf(sendStr));
NettyTools.initReceiveMsg(heartBeat);
String receiveMsg = NettyTools.waitReceiveMsg(heartBeat);
Date date1 = new Date();
String dateStr = DateUtil.dateToString(date1, "yyyy-MM-dd HH:mm:ss");
if (StringUtils.isBlank(receiveMsg)) {
log.info("TCP客户端:{},没有数据返回!{}", keyVal, i);
printLog(deviceAddr, deviceType, buildingId, buildingName, dateStr, log, deviceInstallService, nowDataService);
continue;
}
// 处理返回来的数据报文
dealReceiveData(dateStr, keyVal, i, deviceAddr, deviceType, registerAddr, brand, buildingId, buildingName, receiveMsg, device);
} else {
log.error("没有找到心跳包数据:{}", heartBeat);
}
} catch (Exception e) {
log.error("发送窗口数据异常==>", e);
}
}
} catch (Exception e) {
log.error("-------------串口采集异常!----------->>", e);
}
}
public static void printLog(String deviceAddr, String deviceType, String buildingId, String buildingName, String dateStr, Logger log, DeviceInstallService deviceInstallService, NowDataService nowDataService) {
log.info("----------------{}离线,设备号:{},所属楼栋:{}----------------", deviceType, deviceAddr, buildingName);
String time1 = deviceInstallService.selectLastDate(deviceType, deviceAddr, buildingId);
if (time1 == null) {
time1 = dateStr;
}
int d = ExchangeStringUtil.compareCopyTime(time1, dateStr);
if (d == 1) {
deviceInstallService.updateNotOnline(deviceAddr, deviceType, buildingId, "离线"); //所有设备离线
if (deviceType.equals("热泵")) {
nowDataService.updateRunState(buildingId, deviceAddr, "离线", buildingName); //监控界面状态表热泵在线状态
}
}
return;
}
/**
* 处理返回来的数据
*
* @param dateStr
* @param serialPort
* @param i
* @param deviceAddr
* @param deviceType
* @param registerAddr
* @param brand
* @param buildingId
* @param buildingName
* @param receiveStr
* @throws InterruptedException
*/
private void dealReceiveData(String dateStr,
String serialPort,
int i,
String deviceAddr,
String deviceType,
String registerAddr,
String brand,
String buildingId,
String buildingName, String receiveStr, Device device) {
try {
//去掉空格和null
receiveStr = receiveStr.replace("null", "");
receiveStr = receiveStr.replace(" ", "");
log.info("TCP客户端:{},接受第{}数据:{},大小: {}", serialPort, i, receiveStr, receiveStr.length());
//返回值全部变成大写
String receiveData = receiveStr.toUpperCase();
//截取去掉FE
String dataStr;
if (receiveData.length() > 8) {
String str1 = receiveData.substring(0, 8);
String str2 = receiveData.substring(8);
dataStr = str1.replace("FE", "") + str2;
} else {
dataStr = receiveData.replace("FE", "");
}
deviceInstallService.updateOnline(deviceAddr, deviceType, buildingId, "在线"); //设备在线
log.info("----------------{}在线,设备号:{},所属楼栋:{}----------------", deviceType, deviceAddr, buildingName);
if (deviceType.equals("热泵")) {
String strState = nowDataService.selectState(buildingId, deviceAddr);
if (strState != null && strState.equals("离线")) { //采集到数据
nowDataService.updateRunState(buildingId, deviceAddr, "不运行", buildingName); //监控界面状态表热泵在线状态
}
}
// 解析返回来的数据
device.analysisReceiveData(dateStr, deviceType, registerAddr, brand, buildingId, buildingName, dataStr);
} catch (Exception e) {
log.error("楼栋:{}设备类型:{}保存数据库失败!{}", buildingName, deviceType, i, e);
}
}
}

126
user-service/src/main/java/com/mh/user/tcp/TcpSingle.java

@ -0,0 +1,126 @@
package com.mh.user.tcp;
import com.mh.common.utils.StringUtils;
import com.mh.user.constants.Constant;
import com.mh.user.entity.DeviceCodeParamEntity;
import com.mh.user.entity.GatewayManageEntity;
import com.mh.user.factory.Device;
import com.mh.user.factory.DeviceFactory;
import com.mh.user.netty.session.ServerSession;
import com.mh.user.netty.session.SessionMap;
import com.mh.user.serialport.SerialTool;
import com.mh.user.service.BuildingService;
import com.mh.user.service.DeviceInstallService;
import com.mh.user.service.NowDataService;
import com.mh.user.strategy.DeviceStrategy;
import com.mh.user.strategy.DeviceStrategyFactory;
import com.mh.user.utils.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationContext;
import purejavacomm.SerialPort;
import java.io.IOException;
import java.util.Date;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import static com.mh.user.tcp.SendAndReceiveByTcp.printLog;
/**
* @author nxr
* @title
* @description 串口发送和接收处理,操作类
* @updateTime 2022-08-10
* @throws
*/
@Slf4j
public class TcpSingle {
// 调用service
ApplicationContext context = SpringBeanUtil.getApplicationContext();
DeviceInstallService deviceInstallService = context.getBean(DeviceInstallService.class);
NowDataService nowDataService = context.getBean(NowDataService.class);
BuildingService buildingService = context.getBean(BuildingService.class);
public String serialPortSend(DeviceCodeParamEntity deviceCodeParamEntity, GatewayManageEntity gatewayManageEntity) {
String rtData = "fail";
String comName = deviceCodeParamEntity.getDataCom().toUpperCase();
if (StringUtils.isBlank(comName)) {
return rtData;
}
try {
// 创建设备报文
Device device = DeviceFactory.createDevice(deviceCodeParamEntity.getDeviceType());
DeviceStrategy strategy = DeviceStrategyFactory.createStrategy(deviceCodeParamEntity.getDeviceType());
if (null == strategy) {
return rtData;
}
device.setStrategy(strategy);
String sendStr = device.createOrders(deviceCodeParamEntity);
//从获取响应数据
ConcurrentHashMap<String, ServerSession> map = SessionMap.inst().getMap();
Set<Map.Entry<String, ServerSession>> entries = map.entrySet();
boolean flag = false;
String keyVal = null;
String heartBeat = gatewayManageEntity.getHeartBeat().toUpperCase();
for (Map.Entry<String, ServerSession> entry : entries) {
String key = entry.getKey();
//log.info("当前session:{},当前sessionId:{},当前在线设备数:{}",entry,entry.getValue().getSessionId(),map.size());
if (key.contains(heartBeat)) {
flag = true;
keyVal = key;
break;
}
}
if (flag) {
ServerSession serverSession = map.get(keyVal);
serverSession.getChannel().writeAndFlush(NettyTools.createByteBuf(sendStr));
NettyTools.initReceiveMsg(heartBeat);
String receiveStr = "";
if (Constant.WEB_FLAG) {
receiveStr = NettyTools.waitReceiveMsg(heartBeat);
}
if (StringUtils.isBlank(receiveStr)) {
return Constant.FAIL;
}
receiveStr = receiveStr.replace("null", "").replace(" ", "");
log.info("TCP客户端" + heartBeat + "接收数据:" + receiveStr + ",大小: " + receiveStr.length());
//返回值全部变成大写
String receiveData = receiveStr.toUpperCase();
//截取去掉FE
String deviceType = deviceCodeParamEntity.getDeviceType();
String deviceAddr = deviceCodeParamEntity.getDeviceAddr();
String dataStr = "";
if (receiveData.length() > 8 && ("水表".equals(deviceType) || "电表".equals(deviceType))) {
String str1 = receiveData.substring(0, 8);
String str2 = receiveData.substring(8);
dataStr = str1.replace("FE", "") + str2;
} else {
dataStr = receiveData;
}
String registerAddr = deviceCodeParamEntity.getRegisterAddr();
String brand = deviceCodeParamEntity.getBrand();
String buildingId = deviceCodeParamEntity.getBuildingId();
String buildingName = buildingService.queryBuildingName(buildingId); //查询楼栋名称
deviceInstallService.updateOnline(deviceAddr, deviceType, buildingId, "在线"); //设备在线
log.info("{}在线,设备号:{},所属楼栋:{}", deviceType, deviceAddr, buildingName);
if (deviceType.equals("热泵")) {
String strState = nowDataService.selectState(buildingId, deviceAddr);
if (strState != null && strState.equals("离线")) { //采集到数据
nowDataService.updateRunState(buildingId, deviceAddr, "不运行", buildingName); //监控界面状态表热泵在线状态
}
}
rtData = device.analysisReceiveData(DateUtil.dateToString(new Date(), "yyyy-MM-dd HH:mm:ss"),
deviceType, registerAddr, brand, buildingId, buildingName, dataStr);
return rtData;
} else {
log.error("没有找到心跳包数据:{}", heartBeat);
}
} catch (Exception e) {
log.info("单抄TCP客户端:{}异常,没有数据返回!关闭串口", gatewayManageEntity.getHeartBeat(), e);
}
return Constant.FAIL;
}
}

25
user-service/src/main/java/com/mh/user/utils/CacheUtil.java

@ -4,7 +4,9 @@ import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.github.benmanes.caffeine.cache.Cache;
import com.mh.user.entity.DeviceCodeParamEntity;
import com.mh.user.entity.GatewayManageEntity;
import com.mh.user.service.DeviceCodeParamService;
import com.mh.user.service.GatewayManageService;
import org.springframework.context.ApplicationContext;
import java.util.List;
@ -22,17 +24,29 @@ public class CacheUtil {
Cache caffeineCache = (Cache) context.getBean("caffeineCache");
DeviceCodeParamService deviceCodeParamService = context.getBean(DeviceCodeParamService.class);
GatewayManageService gatewayManageService = context.getBean(GatewayManageService.class);
private static class SingletonHolder{
private static final CacheUtil instance=new CacheUtil();
}
private CacheUtil(){
// 缓存采集信息
createDeviceParams();
// 缓存网关信息数据
saveGatewayInfo();
}
public static CacheUtil getInstance(){
return SingletonHolder.instance;
}
private void saveGatewayInfo() {
if (caffeineCache.getIfPresent("gw") == null) {
List<GatewayManageEntity> gatewayManageEntities = gatewayManageService.queryByOther(null, null);
caffeineCache.put("gw", gatewayManageEntities);
}
}
/**
* 把采集ddc设置的报文存入到缓存中
*/
@ -70,6 +84,17 @@ public class CacheUtil {
return JSONArray.parseArray(JSONObject.toJSONString(cacheObject), DeviceCodeParamEntity.class);
}
public List<GatewayManageEntity> getGatewayInfo() {
Object cacheObject = caffeineCache.getIfPresent("gw");
// 如果为空,重新添加
if (cacheObject == null) {
saveGatewayInfo();
// 在重新获取数据
cacheObject = caffeineCache.getIfPresent("gw");
}
return JSONArray.parseArray(JSONObject.toJSONString(cacheObject), GatewayManageEntity.class);
}
/**
* 删除缓存
*/

101
user-service/src/main/java/com/mh/user/utils/NettyTools.java

@ -0,0 +1,101 @@
package com.mh.user.utils;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.mh.common.utils.StringUtils;
import com.mh.user.constants.Constant;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* @author LJF
* @version 1.0
* @project TAD_Server
* @description 缓存等待数据
* @date 2023/7/4 08:45:16
*/
@Slf4j
public class NettyTools {
public static ByteBuf getByteBuf(ChannelHandlerContext ctx, String sendStr) {
// byte类型的数据
// String sendStr = "5803004900021914"; // 冷量计
// 申请一个数据结构存储信息
ByteBuf buffer = ctx.alloc().buffer();
// 将信息放入数据结构中
buffer.writeBytes(ExchangeStringUtil.hexStrToBinaryStr(sendStr));//对接需要16进制
return buffer;
}
public static ByteBuf createByteBuf(String sendStr) {
// byte类型的数据
// String sendStr = "5803004900021914"; // 冷量计
// 申请一个数据结构存储信息
ByteBuf buffer = Unpooled.buffer();
// 将信息放入数据结构中
buffer.writeBytes(ExchangeStringUtil.hexStrToBinaryStr(sendStr));//对接需要16进制
return buffer;
}
/**
* 响应消息缓存
*/
private static final Cache<String, BlockingQueue<String>> responseMsgCache = CacheBuilder.newBuilder()
.maximumSize(50000)
.expireAfterWrite(1000, TimeUnit.SECONDS)
.build();
/**
* 等待响应消息
* @param key 消息唯一标识
* @return ReceiveDdcMsgVo
*/
public static String waitReceiveMsg(String key) {
try {
//设置超时时间
String vo = Objects.requireNonNull(responseMsgCache.getIfPresent(key))
.poll(1000 * 10, TimeUnit.MILLISECONDS);
//删除key
responseMsgCache.invalidate(key);
return vo;
} catch (Exception e) {
log.error("获取数据异常,sn={},msg=null",key);
return Constant.FAIL;
}
}
/**
* 初始化响应消息的队列
* @param key 消息唯一标识
*/
public static void initReceiveMsg(String key) {
responseMsgCache.put(key,new LinkedBlockingQueue<String>(1));
}
/**
* 设置响应消息
* @param key 消息唯一标识
*/
public static void setReceiveMsg(String key, String msg) {
if(responseMsgCache.getIfPresent(key) != null){
responseMsgCache.getIfPresent(key).add(msg);
return;
}
log.warn("sn {}不存在",key);
}
}
Loading…
Cancel
Save