Browse Source

1、计算pro_data_result改成java实现;

2、监听6001改成非阻塞;
3、定时计算pro_data_result改成计算前3分钟;
dev
25604 2 months ago
parent
commit
c5bf857ad4
  1. 129
      user-service/src/main/java/com/mh/user/job/DealDataJob.java
  2. 4
      user-service/src/main/java/com/mh/user/mapper/DataResultChMapper.java
  3. 18
      user-service/src/main/java/com/mh/user/mapper/DataResultFifteenMiMapper.java
  4. 15
      user-service/src/main/java/com/mh/user/mapper/DataResultFiveMiMapper.java
  5. 15
      user-service/src/main/java/com/mh/user/mapper/DataResultOneMiMapper.java
  6. 55
      user-service/src/main/java/com/mh/user/netty/EchoServer.java
  7. 16
      user-service/src/main/java/com/mh/user/service/ProDataResultService.java
  8. 65
      user-service/src/main/java/com/mh/user/service/chillers/impl/DealDataServiceImpl.java
  9. 7
      user-service/src/main/java/com/mh/user/service/impl/DataResultServiceImpl.java
  10. 328
      user-service/src/main/java/com/mh/user/service/impl/ProDataResultServiceImpl.java
  11. 104
      user-service/src/main/java/com/mh/user/strategy/ModbusProtocolStrategy.java
  12. 2
      user-service/src/main/resources/application-dev.yml
  13. 18
      user-service/src/test/java/com/mh/user/DealDataTest.java

129
user-service/src/main/java/com/mh/user/job/DealDataJob.java

@ -6,9 +6,13 @@ import com.mh.user.utils.SimpleWeather;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* @author ljf
@ -22,6 +26,7 @@ import java.util.concurrent.CountDownLatch;
public class DealDataJob {
private final DealDataService dealDataService;
public DealDataJob(DealDataService dealDataService) {
this.dealDataService = dealDataService;
}
@ -32,41 +37,42 @@ public class DealDataJob {
@Scheduled(cron = "25 0/2 * * * ?") //每2分钟一次
public void ProEnergy() {
try {
SimpleDateFormat sdf1=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date date=new Date();
String curDate=sdf1.format(date);
SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date date = new Date();
String curDate = sdf1.format(date);
// dealDataService.proEnergy(curDate);//中央热水yyyy-MM-dd HH:mm:ss
String m=curDate.substring(15,16);
if (Integer.parseInt(m)==0){
curDate=curDate.substring(0,16)+":00";
}else if(Integer.parseInt(m)==2){
curDate=curDate.substring(0,16)+":00";
}else if(Integer.parseInt(m)==4){
curDate=curDate.substring(0,16)+":00";
}else if(Integer.parseInt(m)==6){
curDate=curDate.substring(0,16)+":00";
}else if(Integer.parseInt(m)==8){
curDate=curDate.substring(0,16)+":00";
String m = curDate.substring(15, 16);
if (Integer.parseInt(m) == 0) {
curDate = curDate.substring(0, 16) + ":00";
} else if (Integer.parseInt(m) == 2) {
curDate = curDate.substring(0, 16) + ":00";
} else if (Integer.parseInt(m) == 4) {
curDate = curDate.substring(0, 16) + ":00";
} else if (Integer.parseInt(m) == 6) {
curDate = curDate.substring(0, 16) + ":00";
} else if (Integer.parseInt(m) == 8) {
curDate = curDate.substring(0, 16) + ":00";
}
dealDataService.proEnergyData(curDate);//中央空调yyyy-MM-dd HH:mm:ss
log.info("---------数据分析定时汇总,每二分钟!"+curDate);
log.info("---------数据分析定时汇总,每二分钟!" + curDate);
} catch (Exception e) {
log.error(" ProEnergy 数据分析定时汇总异常", e);
// e.printStackTrace();
}
}
/**
* 定时处理数据每十五分钟处理一次
*/
@Scheduled(cron = "0 0/16 * * * ?")
public void dealData() {
try {
SimpleDateFormat sdf1=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date date=new Date();
String curDate=sdf1.format(date);
SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date date = new Date();
String curDate = sdf1.format(date);
dealDataService.proDeviceState(curDate);
log.info("---------设备状态定时汇总,每十五分钟!"+curDate);
log.info("---------设备状态定时汇总,每十五分钟!" + curDate);
} catch (Exception e) {
log.error("设备状态定时汇总异常", e);
}
@ -78,30 +84,57 @@ public class DealDataJob {
@Scheduled(cron = "0 0/1 * * * ?")
public void proDataResult() {
try {
// 通过多线程进行处理
String curDate = ExchangeStringUtil.dateTime(1, "");
// 添加开始程序时间
long startTime = System.currentTimeMillis();
String curDate = ExchangeStringUtil.dateTime(3, "");
List<String> list = dealDataService.queryProjectId("1");
if (!list.isEmpty()) {
CountDownLatch latch = new CountDownLatch(list.size());
for (String projectId : list) {
new Thread(() -> {
try {
dealDataService.proDataResult(curDate, projectId);
} catch (Exception e) {
log.error("能效监测定时汇总异常", e);
} finally {
latch.countDown();
}
}).start();
// 使用较小的线程池,减少并发冲突
ExecutorService executor = Executors.newFixedThreadPool(
Math.min(Math.max(list.size() / 2, 1), 5)
);
try {
CountDownLatch latch = new CountDownLatch(list.size());
// 分批处理,避免同时处理过多项目
for (String projectId : list) {
final String pid = projectId;
String finalCurDate = curDate;
executor.submit(() -> {
try {
// 添加延迟,错开执行时间
Thread.sleep(100);
dealDataService.proDataResult(finalCurDate, pid);
// proDataResultService.processDailySummary(curDate, pid);
} catch (Exception e) {
log.error("能效监测定时汇总异常,项目ID: {}", pid, e);
} finally {
latch.countDown();
}
});
}
// 添加超时控制,避免无限期等待
boolean completed = latch.await(2, TimeUnit.MINUTES);
if (!completed) {
log.warn("能效监测定时汇总未在2分钟内完成,项目数量: {}", list.size());
}
} finally {
executor.shutdown();
}
latch.await(); // 等待所有子线程执行完毕
}
// 结束时间
long endTime = System.currentTimeMillis();
log.info("计算one、five、fifteen耗时:{} 毫秒", endTime-startTime);
log.info("---------能效监测定时汇总,每一分钟!{}", curDate);
} catch (Exception e) {
log.error("能效监测定时汇总异常", e);
}
}
/**
* 定时处理数据获取环境温度和湿度,所有项目共有
*/
@ -109,17 +142,17 @@ public class DealDataJob {
// @Scheduled(cron = "0 0/1 * * * ?") //1分钟
public void saveTempHumidity() {
try {
SimpleDateFormat sdf1=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date date=new Date();
Map<String, Object> map= SimpleWeather.queryWeather("广州");
String curDate=sdf1.format(date);
SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date date = new Date();
Map<String, Object> map = SimpleWeather.queryWeather("广州");
String curDate = sdf1.format(date);
if (map.isEmpty()) {
// 重新请求接口
map= SimpleWeather.queryWeather("广州");
map = SimpleWeather.queryWeather("广州");
}
String temperature=map.get("temperature").toString();
String humidity=map.get("humidity").toString();
dealDataService.saveTempHumidity(curDate,temperature,humidity);
String temperature = map.get("temperature").toString();
String humidity = map.get("humidity").toString();
dealDataService.saveTempHumidity(curDate, temperature, humidity);
log.info("{},{}℃,{}%", curDate, temperature, humidity);
} catch (Exception e) {
// e.printStackTrace();
@ -134,16 +167,16 @@ public class DealDataJob {
// @Scheduled(cron = "0 0/1 * * * ?") //1分钟
public void proEnergyDaySum() {
try {
SimpleDateFormat sdf1=new SimpleDateFormat("yyyy-MM-dd");
Date date=new Date();
SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd");
Date date = new Date();
Calendar calendar = Calendar.getInstance();
calendar.setTime(date);
calendar.add(Calendar.DATE, -1);// 减少一天
String curDate=sdf1.format(calendar.getTime());
List<String> list=dealDataService.queryProjectId("2");
if (!list.isEmpty()){
for(String projectId:list){
dealDataService.proEnergyDaySum(curDate,projectId);
String curDate = sdf1.format(calendar.getTime());
List<String> list = dealDataService.queryProjectId("2");
if (!list.isEmpty()) {
for (String projectId : list) {
dealDataService.proEnergyDaySum(curDate, projectId);
}
}
log.info("---------中央热水生产概况汇总,每一小时!{}", curDate);

4
user-service/src/main/java/com/mh/user/mapper/DataResultChMapper.java

@ -6,6 +6,7 @@ import com.mh.user.entity.DataResultClEntity;
import org.apache.ibatis.annotations.*;
import java.util.List;
import java.util.Map;
/**
* @author LJF
@ -70,4 +71,7 @@ public interface DataResultChMapper extends BaseMapper<DataResultChEntity> {
" t1.cur_date ")
List<DataResultChEntity> queryDataResultCh(@Param("projectId") String projectId, @Param("startDate") String startDate, @Param("curDate") String curDate);
@Select("select top 1 * from data_result_ch " +
" where cur_date >= #{start} and cur_date <= #{end} and register_addr = #{registerAddr} and project_id = #{projectId} and device_addr = #{deviceAddr} order by cur_date desc ")
DataResultChEntity selectLatestByCondition(Map<String, Object> params);
}

18
user-service/src/main/java/com/mh/user/mapper/DataResultFifteenMiMapper.java

@ -4,9 +4,7 @@ import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.mh.user.entity.DataResultChEntity;
import com.mh.user.entity.DataResultFifteenMiEntity;
import com.mh.user.mapper.provider.DataResultProvider;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.SelectProvider;
import org.apache.ibatis.annotations.*;
import java.util.List;
@ -37,4 +35,18 @@ public interface DataResultFifteenMiMapper extends BaseMapper<DataResultFifteenM
@Param("registerAddr") String registerAddr,
@Param("mtNum") String mtNum);
@Select("SELECT TOP 1 id FROM data_result_five_mi WHERE cur_date >= #{timeStr} and cur_date <= #{timeStr} " +
"AND device_addr = #{deviceAddr} AND project_id = #{projectId}")
Long selectExistId(@Param("timeStr") String timeStr,
@Param("deviceAddr") String deviceAddr,
@Param("projectId") String projectId);
@Update("UPDATE data_result_five_mi SET cur_value = #{curValue}, grade = #{grade} WHERE id = #{id}")
void updateByIdCustomize(@Param("id") Long id,
@Param("curValue") String curValue,
@Param("grade") int grade);
@Insert("INSERT INTO data_result_fifteen_mi (device_addr, device_type, cur_date, cur_value, fun_code, register_addr, register_name, grade, project_id) " +
"VALUES (#{deviceAddr}, #{deviceType}, #{curDate}, #{curValue}, #{funCode}, #{registerAddr}, #{registerName}, #{grade}, #{projectId})")
void saveDataResultFifteenMi(DataResultFifteenMiEntity record);
}

15
user-service/src/main/java/com/mh/user/mapper/DataResultFiveMiMapper.java

@ -37,4 +37,19 @@ public interface DataResultFiveMiMapper extends BaseMapper<DataResultFiveMiEntit
@Param("curDate") String curDate,
@Param("registerAddr") String registerAddr,
@Param("mtNum") String mtNum);
@Select("SELECT TOP 1 id FROM data_result_five_mi WHERE cur_date >= #{timeStr} and cur_date <= #{timeStr} " +
"AND device_addr = #{deviceAddr} AND project_id = #{projectId}")
Long selectExistId(@Param("timeStr") String timeStr,
@Param("deviceAddr") String deviceAddr,
@Param("projectId") String projectId);
@Update("UPDATE data_result_five_mi SET cur_value = #{curValue}, grade = #{grade} WHERE id = #{id}")
void updateByIdCustomize(@Param("id") Long id,
@Param("curValue") String curValue,
@Param("grade") int grade);
@Insert("INSERT INTO data_result_five_mi (device_addr, device_type, cur_date, cur_value, fun_code, register_addr, register_name, grade, project_id) " +
"VALUES (#{deviceAddr}, #{deviceType}, #{curDate}, #{curValue}, #{funCode}, #{registerAddr}, #{registerName}, #{grade}, #{projectId})")
void saveDataResultFiveMi(DataResultFiveMiEntity record);
}

15
user-service/src/main/java/com/mh/user/mapper/DataResultOneMiMapper.java

@ -43,4 +43,19 @@ public interface DataResultOneMiMapper extends BaseMapper<DataResultOneMiEntity>
@SelectProvider(type = DataResultProvider.class,method = "dataResultOneMiCount")
int dataResultOneMiCount(@Param("projectId") String projectId,@Param("startDate") String startDate,@Param("curDate") String curDate);
@Delete("DELETE FROM data_result_one_mi WHERE cur_date >= #{timeStr} and cur_date <= #{timeStr} " +
"AND device_addr = #{deviceAddr} AND project_id = #{projectId}")
void deleteByTimeDeviceProject(@Param("timeStr") String timeStr,
@Param("deviceAddr") String deviceAddr,
@Param("projectId") String projectId);
@Select("SELECT count(1) FROM data_result_one_mi WHERE cur_date >= #{timeStr} and cur_date <= #{timeStr} " +
"AND device_addr = #{deviceAddr} AND project_id = #{projectId} and register_addr = #{registerAddr} and device_type = #{deviceType} ")
int selectCountByTimeDeviceProject(@Param("timeStr") String timeStr,
@Param("deviceAddr") String deviceAddr,
@Param("projectId") String projectId,@Param("registerAddr") String registerAddr, @Param("deviceType") String deviceType);
@Insert("INSERT INTO data_result_one_mi (device_addr, device_type, cur_date, cur_value, fun_code, register_addr, register_name, grade, project_id) " +
"VALUES (#{deviceAddr}, #{deviceType}, #{curDate}, #{curValue}, #{funCode}, #{registerAddr}, #{registerName}, #{grade}, #{projectId})")
void saveDataResultOneMi(DataResultOneMiEntity dataResultOneMiEntity);
}

55
user-service/src/main/java/com/mh/user/netty/EchoServer.java

@ -17,29 +17,50 @@ public class EchoServer {
}
public void start() {
// 创建Even-LoopGroup
// 创建Event-LoopGroup
NioEventLoopGroup bossGroup = new NioEventLoopGroup(); // bossGroup 用于监听客户端连接,专门负责与客户端创建连接,并把连接注册到workerGroup的Selector中。
NioEventLoopGroup workerGroup = new NioEventLoopGroup(); // workerGroup用于处理每一个连接发生的读写事件。
try {
ServerBootstrap serverBootstrap = new ServerBootstrap(); // 2、创建Server-Bootstrap
ServerBootstrap serverBootstrap = new ServerBootstrap(); // 创建Server-Bootstrap
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // 3、指定所使用的NIO传输Channel
.localAddress(port) // 4、指定端口设置套接字
.channel(NioServerSocketChannel.class) // 指定所使用的NIO传输Channel
.localAddress(port) // 指定端口设置套接字
.option(ChannelOption.SO_BACKLOG, 1204)
.childHandler(new ServerChannelInitializer());
ChannelFuture channelFuture = serverBootstrap.bind().sync(); // 6、异步绑定服务器;调用sync()方法,阻塞式等待,直到绑定完成
log.info("服务器启动开始监听端口:"+port);
channelFuture.channel().closeFuture().sync(); // 7、获取Channel的closeFuture,并且阻塞当前线程,直到它完成
} catch (InterruptedException e) {
log.error("服务器启动失败", e);
} finally {
try {
bossGroup.shutdownGracefully().sync(); // 8、关闭EventLoopGroup,关闭所有的资源
workerGroup.shutdownGracefully().sync(); // 关闭
} catch (InterruptedException e) {
e.printStackTrace();
}
// 异步绑定服务器,不使用sync()阻塞
ChannelFuture channelFuture = serverBootstrap.bind();
channelFuture.addListener(future -> {
if (future.isSuccess()) {
log.info("服务器启动成功,开始监听端口:{}", port);
} else {
log.error("服务器启动失败", future.cause());
}
});
// 添加JVM关闭钩子,确保优雅关闭
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
log.info("正在关闭服务器...");
try {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
log.info("服务器已关闭");
} catch (Exception e) {
log.error("关闭服务器时发生错误", e);
}
}));
// 不阻塞主线程,服务器在后台运行
log.info("服务器启动命令已发送,端口:{}", port);
} catch (Exception e) {
log.error("服务器启动过程中发生异常", e);
// 发生异常时关闭资源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
// 注意:这里不再等待channel关闭,方法会立即返回
}
}

16
user-service/src/main/java/com/mh/user/service/ProDataResultService.java

@ -0,0 +1,16 @@
package com.mh.user.service;
import java.text.ParseException;
/**
* @author LJF
* @version 1.0
* @project mh_esi
* @description pro_data_result修改java
* @date 2025-08-26 09:08:15
*/
public interface ProDataResultService {
void processDailySummary(String curDate, String projectId) throws ParseException;
}

65
user-service/src/main/java/com/mh/user/service/chillers/impl/DealDataServiceImpl.java

@ -2,10 +2,13 @@ package com.mh.user.service.chillers.impl;
import com.mh.user.mapper.chillers.DealDataMapper;
import com.mh.user.service.chillers.DealDataService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author ljf
@ -14,8 +17,11 @@ import java.util.List;
* @updateTime 2020-07-28
* @throws
*/
@Slf4j
@Service
public class DealDataServiceImpl implements DealDataService {
// 添加锁对象管理
private static final Map<String, Object> LOCK_MAP = new ConcurrentHashMap<>();
private final DealDataMapper dealDataMapper;
@ -36,10 +42,65 @@ public class DealDataServiceImpl implements DealDataService {
dealDataMapper.proEnergyData(curDate);
}
// 清理锁对象的方法
private void cleanupLocks() {
if (LOCK_MAP.size() > 1000) {
synchronized (LOCK_MAP) {
if (LOCK_MAP.size() > 1000) {
LOCK_MAP.clear();
}
}
}
}
// 判断是否为死锁异常
private boolean isDeadlockException(Exception e) {
if (e.getMessage() != null) {
String message = e.getMessage().toLowerCase();
return message.contains("死锁") || message.contains("deadlock");
}
return false;
}
@Override
public void proDataResult(String curDate,String projectId) {
public void proDataResult(String curDate, String projectId) {
String lockKey = "proDataResult_" + projectId;
Object lockObject = LOCK_MAP.computeIfAbsent(lockKey, k -> new Object());
synchronized (lockObject) {
int retryCount = 0;
final int maxRetries = 3;
while (true) {
try {
dealDataMapper.proDataResult(curDate, projectId);
break; // 成功执行则跳出循环
} catch (Exception e) {
retryCount++;
if (retryCount >= maxRetries) {
log.error("proDataResult执行失败,已重试" + maxRetries + "次");
break;
}
// 检查是否为死锁异常
if (isDeadlockException(e)) {
log.error("检测到死锁,进行第{}次重试,项目ID:{}", retryCount, projectId);
// 随机延迟后重试
try {
Thread.sleep(10);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new RuntimeException(ie);
}
} else {
throw new RuntimeException("非死锁异常,项目ID:" + projectId, e);
}
}
}
}
dealDataMapper.proDataResult(curDate,projectId);
// 清理锁对象
cleanupLocks();
}
@Override

7
user-service/src/main/java/com/mh/user/service/impl/DataResultServiceImpl.java

@ -409,9 +409,10 @@ public class DataResultServiceImpl implements DataResultService {
} else {
count = dataResultFiveMiCount(projectId, startDate, curDate, momentCoolingStr, registerAddr);
}
new Thread(() ->{
dealDataExceptionService.dealFifteenInstantaneousExceptionData(mergedRecords);
}).start();
// 屏蔽异常处理
// new Thread(() ->{
// dealDataExceptionService.dealFifteenInstantaneousExceptionData(mergedRecords);
// }).start();
return HttpResult.ok(count, mergedRecords);
}
// else if (dateType.equals("12小时")) {

328
user-service/src/main/java/com/mh/user/service/impl/ProDataResultServiceImpl.java

@ -0,0 +1,328 @@
package com.mh.user.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.mh.user.entity.DataResultChEntity;
import com.mh.user.entity.DataResultFifteenMiEntity;
import com.mh.user.entity.DataResultFiveMiEntity;
import com.mh.user.entity.DataResultOneMiEntity;
import com.mh.user.mapper.DataResultChMapper;
import com.mh.user.mapper.DataResultFifteenMiMapper;
import com.mh.user.mapper.DataResultFiveMiMapper;
import com.mh.user.mapper.DataResultOneMiMapper;
import com.mh.user.service.ProDataResultService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.sql.Timestamp;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
/**
* @author LJF
* @version 1.0
* @project mh_esi
* @description pro_data_result改成java
* @date 2025-08-26 09:07:39
*/
@Slf4j
@Service
@Transactional(rollbackFor = Exception.class)
public class ProDataResultServiceImpl implements ProDataResultService {
private static final List<String> DEVICE_CODES = Arrays.asList("0020", "0073", "0075", "0006", "0010", "0012", "0014");
@Autowired
private DataResultChMapper dataResultChMapper;
@Autowired
private DataResultOneMiMapper oneMiMapper;
@Autowired
private DataResultFiveMiMapper fiveMiMapper;
@Autowired
private DataResultFifteenMiMapper fifteenMiMapper;
@Override
public void processDailySummary(String curDateStr, String projectId) throws ParseException {
// 时间格式标准化
SimpleDateFormat inputSdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date curDate = inputSdf.parse(curDateStr);
SimpleDateFormat outputSdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:00");
String standardDate = outputSdf.format(curDate);
log.info("处理日期:{}", standardDate);
// 计算时间窗口参数
int minute = curDate.getMinutes();
int intervalMinute = (minute / 5) * 5;
String minuteStr = String.format("%02d", intervalMinute);
log.info("分钟区间:{}", minuteStr);
// 遍历设备编码
for (String deviceCode : DEVICE_CODES) {
log.info("处理设备编码:{}", deviceCode);
// 检查是否存在待处理数据
Map<String, Object> countParams = new HashMap<>();
// countParams.put("start", standardDate);
// countParams.put("end", addMinutes(standardDate, 5));
countParams.put("register_addr", deviceCode);
countParams.put("project_id", projectId);
int recordCount = Math.toIntExact(dataResultChMapper.selectCount(new QueryWrapper<DataResultChEntity>()
.allEq(countParams)
.ge("cur_date", standardDate)
.lt("cur_date", addMinutes(standardDate, 5))));
if (recordCount == 0) continue;
// 获取当前设备编码下的所有设备地址
List<String> deviceAddrs = dataResultChMapper.selectObjs(new QueryWrapper<DataResultChEntity>()
.select("DISTINCT device_addr")
.eq("register_addr", deviceCode)
.eq("project_id", projectId)
.between("cur_date", standardDate, addMinutes(standardDate, 5)))
.stream().map(Object::toString).collect(Collectors.toList());
// 处理每个设备地址
for (String deviceAddr : deviceAddrs) {
// processIntervalData(standardDate, deviceCode, projectId, deviceAddr, 1);
processIntervalData(standardDate, deviceCode, projectId, deviceAddr, 5, minuteStr);
processIntervalData(standardDate, deviceCode, projectId, deviceAddr, 15, minuteStr);
}
}
}
private void processIntervalData(String baseDate, String registerAddr, String projectId,
String deviceAddr, int interval, String... extraParams) throws ParseException {
// 计算时间窗口
String startTime = baseDate;
String endTime = addMinutes(baseDate, interval);
// 获取最新数据
Map<String, Object> params = new HashMap<>();
params.put("start", startTime);
params.put("end", endTime);
params.put("registerAddr", registerAddr);
params.put("projectId", projectId);
params.put("deviceAddr", deviceAddr);
DataResultChEntity latest = dataResultChMapper.selectLatestByCondition(params);
if (latest == null) return;
// 数值有效性校验
String curValue = isNumeric(latest.getCurValue()) ? latest.getCurValue() : "0.00";
if (!isNumeric(latest.getCurValue())) {
log.info("无效数值,已替换为0.00:{}", latest.getCurValue());
}
// 计算汇总时间点
String summaryTime = extraParams.length > 0 ?
baseDate.substring(0, 13) + ":" + extraParams[0] + ":00" :
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(latest.getCurDate());
// 处理不同时间间隔的汇总
switch (interval) {
case 1:
handleOneMinute(latest, deviceAddr, projectId, summaryTime, curValue);
break;
case 5:
handleFiveMinute(latest, deviceAddr, projectId, summaryTime, curValue);
break;
case 15:
handleFifteenMinute(latest, deviceAddr, projectId, summaryTime, curValue);
break;
}
}
// 在类中添加静态锁对象
private static final Map<String, Object> LOCK_MAP = new ConcurrentHashMap<>();
private void handleOneMinute(DataResultChEntity source, String deviceAddr, String projectId,
String summaryTime, String curValue) {
// 创建针对特定设备和项目的锁key
String lockKey = deviceAddr + "_" + projectId;
Object lockObject = LOCK_MAP.computeIfAbsent(lockKey, k -> new Object());
try {
// 使用特定锁,减少锁竞争
synchronized (lockObject) {
// 添加重试机制避免死锁
int retryCount = 0;
final int maxRetries = 3;
while (retryCount < maxRetries) {
try {
// 查询历史数据
int count = oneMiMapper.selectCountByTimeDeviceProject(summaryTime, deviceAddr, projectId, source.getRegisterAddr(), source.getDeviceType());
if (count == 0) {
// 插入新数据
DataResultOneMiEntity record = new DataResultOneMiEntity();
BeanUtils.copyProperties(source, record);
record.setCurDate(Timestamp.valueOf(summaryTime));
record.setCurValue(curValue);
oneMiMapper.saveDataResultOneMi(record);
}
break; // 成功执行则跳出循环
} catch (Exception e) {
retryCount++;
if (retryCount >= maxRetries) {
throw e; // 达到最大重试次数后抛出异常
}
// 检查是否为死锁异常
if (e.getMessage() != null &&
(e.getMessage().contains("死锁") || e.getMessage().contains("deadlock"))) {
log.warn("检测到死锁,进行第{}次重试,设备地址:{},项目ID:{}",
retryCount, deviceAddr, projectId);
// 随机延迟后重试
try {
Thread.sleep(new Random().nextInt(200) + 100);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new RuntimeException(ie);
}
} else {
throw e; // 非死锁异常直接抛出
}
}
}
}
} finally {
// 清理长时间不用的锁对象,避免内存泄漏
cleanupLocks();
}
}
// 清理锁对象的方法
private void cleanupLocks() {
// 简单的清理策略:当锁数量超过一定阈值时,清空所有锁
// 实际项目中可以根据需要实现更复杂的清理逻辑
if (LOCK_MAP.size() > 1000) {
synchronized (LOCK_MAP) {
if (LOCK_MAP.size() > 1000) {
LOCK_MAP.clear();
}
}
}
}
private void handleFiveMinute(DataResultChEntity source, String deviceAddr, String projectId,
String summaryTime, String curValue) {
// 创建针对特定设备和项目的锁key
String lockKey = "five_" + deviceAddr + "_" + projectId;
Object lockObject = LOCK_MAP.computeIfAbsent(lockKey, k -> new Object());
try {
// 使用特定锁,减少锁竞争
synchronized (lockObject) {
// 添加重试机制避免死锁
executeWithRetry(() -> {
Long existId = fiveMiMapper.selectExistId(summaryTime, deviceAddr, projectId);
DataResultFiveMiEntity record = new DataResultFiveMiEntity();
BeanUtils.copyProperties(source, record);
record.setCurDate(Timestamp.valueOf(summaryTime));
record.setCurValue(curValue);
if (existId != null) {
fiveMiMapper.updateByIdCustomize(existId, curValue, source.getGrade());
} else {
fiveMiMapper.saveDataResultFiveMi(record);
}
});
}
} finally {
// 清理长时间不用的锁对象,避免内存泄漏
cleanupLocks();
}
}
private void handleFifteenMinute(DataResultChEntity source, String deviceAddr, String projectId,
String summaryTime, String curValue) {
// 创建针对特定设备和项目的锁key
String lockKey = "fifteen_" + deviceAddr + "_" + projectId;
Object lockObject = LOCK_MAP.computeIfAbsent(lockKey, k -> new Object());
try {
// 使用特定锁,减少锁竞争
synchronized (lockObject) {
// 添加重试机制避免死锁
executeWithRetry(() -> {
Long existId = fifteenMiMapper.selectExistId(summaryTime, deviceAddr, projectId);
DataResultFifteenMiEntity record = new DataResultFifteenMiEntity();
BeanUtils.copyProperties(source, record);
record.setCurDate(Timestamp.valueOf(summaryTime));
record.setCurValue(curValue);
if (existId != null) {
fifteenMiMapper.updateByIdCustomize(existId, curValue, source.getGrade());
} else {
fifteenMiMapper.saveDataResultFifteenMi(record);
}
});
}
} finally {
// 清理长时间不用的锁对象,避免内存泄漏
cleanupLocks();
}
}
// 通用重试方法
private void executeWithRetry(Runnable operation) {
int retryCount = 0;
final int maxRetries = 3;
while (retryCount < maxRetries) {
try {
operation.run();
break;
} catch (Exception e) {
retryCount++;
if (retryCount >= maxRetries) {
throw new RuntimeException("操作失败,已重试" + maxRetries + "次", e);
}
// 检查是否为死锁异常
if (isDeadlockException(e)) {
log.warn("检测到死锁,进行第{}次重试", retryCount);
// 随机延迟后重试
try {
Thread.sleep(new Random().nextInt(200) + 100);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new RuntimeException(ie);
}
} else {
throw new RuntimeException("非死锁异常", e); // 非死锁异常直接抛出
}
}
}
}
// 判断是否为死锁异常
private boolean isDeadlockException(Exception e) {
if (e.getMessage() != null) {
String message = e.getMessage().toLowerCase();
return message.contains("死锁") || message.contains("deadlock");
}
return false;
}
private String addMinutes(String baseDate, int minutes) throws ParseException {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date date = sdf.parse(baseDate);
date.setTime(date.getTime() + minutes * 60 * 1000L);
return sdf.format(date);
}
private boolean isNumeric(String str) {
return str != null && str.matches("-?\\d+(\\.\\d+)?");
}
}

104
user-service/src/main/java/com/mh/user/strategy/ModbusProtocolStrategy.java

@ -15,6 +15,7 @@ import java.math.BigDecimal;
import java.math.RoundingMode;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import java.util.concurrent.ThreadPoolExecutor;
@ -110,10 +111,9 @@ public class ModbusProtocolStrategy implements ProtocolStrategy {
sValue = String.valueOf(Long.parseLong(sValue));
} else if (deviceCodeParamEntity.getDigit() < 0) {
sValue = String.valueOf(Long.parseLong(sValue)) + ExchangeStringUtil.addZeroForNum("0", -deviceCodeParamEntity.getDigit());
}
else {
} else {
// 保留位数
sValue = (new BigDecimal(sValue)).divide(new BigDecimal(ExchangeStringUtil.rightAddZeroForNum("1", deviceCodeParamEntity.getDigit()+1)), 2, RoundingMode.HALF_UP).toString();
sValue = (new BigDecimal(sValue)).divide(new BigDecimal(ExchangeStringUtil.rightAddZeroForNum("1", deviceCodeParamEntity.getDigit() + 1)), 2, RoundingMode.HALF_UP).toString();
}
break;
case 2:
@ -189,81 +189,77 @@ public class ModbusProtocolStrategy implements ProtocolStrategy {
}
}
/**
* 格式化数据
*
* @param deviceCodeParam
* @param dataResultCh
* @param chillerAddr
* @param date
* @param data
* @throws ParseException
*/
public void initialDataResultCh(DeviceCodeParamEntity deviceCodeParam, DataResultChEntity dataResultCh, String chillerAddr, Date date, String data) throws ParseException {
dataResultCh.setDeviceAddr(chillerAddr);
dataResultCh.setDeviceType(deviceCodeParam.getDeviceType());
dataResultCh.setCurDate(date);
dataResultCh.setCurValue(data);
dataResultCh.setRegisterAddr(deviceCodeParam.getRegisterAddr());
dataResultCh.setRegisterName(deviceCodeParam.getRegisterName());
dataResultCh.setGrade(deviceCodeParam.getGrade());
dataResultCh.setFunCode(deviceCodeParam.getFunCode());
dataResultCh.setProjectId(deviceCodeParam.getProjectId());
String projectName = projectInfoService.selectName(deviceCodeParam.getProjectId());
log.info("冷水机:{},状态:{},项目名称:{}", chillerAddr, data, projectName);
dataResultService.saveDataResultChiller(dataResultCh);
// dataResultService.deleteDataResultNow(deviceCodeParam.getDeviceAddr(), deviceCodeParam.getDeviceType(), deviceCodeParam.getRegisterAddr(), deviceCodeParam.getProjectId());
log.info("冷水机保存成功!项目名称:{}", projectName);
}
/**
* 格式化数据
*
* @param deviceCodeParam
* @param dataResultCh
* @param chillerAddr
* @param date
* @param data
* @throws ParseException
*/
public void initialDataResultCh(DeviceCodeParamEntity deviceCodeParam, DataResultChEntity dataResultCh, String chillerAddr, Date date, String data) throws ParseException {
dataResultCh.setDeviceAddr(chillerAddr);
dataResultCh.setDeviceType(deviceCodeParam.getDeviceType());
dataResultCh.setCurDate(date);
dataResultCh.setCurValue(data);
dataResultCh.setRegisterAddr(deviceCodeParam.getRegisterAddr());
dataResultCh.setRegisterName(deviceCodeParam.getRegisterName());
dataResultCh.setGrade(deviceCodeParam.getGrade());
dataResultCh.setFunCode(deviceCodeParam.getFunCode());
dataResultCh.setProjectId(deviceCodeParam.getProjectId());
String projectName = projectInfoService.selectName(deviceCodeParam.getProjectId());
log.info("冷水机:{},状态:{},项目名称:{}", chillerAddr, data, projectName);
dataResultService.saveDataResultChiller(dataResultCh);
// dataResultService.deleteDataResultNow(deviceCodeParam.getDeviceAddr(), deviceCodeParam.getDeviceType(), deviceCodeParam.getRegisterAddr(), deviceCodeParam.getProjectId());
log.info("冷水机保存成功!项目名称:{}", projectName);
}
//解析冷量表
public void analysisCloudOrder485(final String data, final DeviceCodeParamEntity deviceCodeParam) {
// threadPoolService.execute(() -> {
//创建SimpleDateFormat对象,指定样式 2019-05-13 22:39:30
//解析冷量表
public void analysisCloudOrder485(final String data, final DeviceCodeParamEntity deviceCodeParam) {
Date date = new Date();
String dateStr = sdf1.format(date);
String cloudId = deviceCodeParam.getDeviceAddr();
DataResultChEntity dataResultCh = new DataResultChEntity();
DataResultClEntity dataResultCl = new DataResultClEntity();
String registerAddr = deviceCodeParam.getRegisterAddr();
int grade = deviceCodeParam.getGrade();
// 使用Calendar进行更安全的时间处理
Calendar cal = Calendar.getInstance();
cal.setTime(date);
if (ExchangeStringUtil.isInDate(date, "00:00:00", "00:00:30")) {
dateStr = dateStr.substring(0, 17) + "00";
System.out.println("插入时间00" + dateStr);
cal.set(Calendar.SECOND, 0);
date = cal.getTime();
log.info("插入时间00: {}", sdf1.format(date));
} else if (ExchangeStringUtil.isInDate(date, "00:00:30", "00:00:59")) {
dateStr = dateStr.substring(0, 17) + "30";
System.out.println("插入时间30" + dateStr);
cal.set(Calendar.SECOND, 30);
date = cal.getTime();
log.info("插入时间30: {}", sdf1.format(date));
}
try {
// if (registerAddr.equals("0004")
// || registerAddr.equals("0020")
// || registerAddr.equals("0073")
// || registerAddr.equals("0075"))
if (grade>=140 && grade<=149) {
if (grade >= 140 && grade <= 149) {
dataResultCh.setDeviceAddr(cloudId);
dataResultCh.setDeviceType(deviceCodeParam.getDeviceType());
dataResultCh.setFunCode(deviceCodeParam.getFunCode());
dataResultCh.setCurDate(sdf1.parse(dateStr));
dataResultCh.setCurDate(date); // 直接使用Date对象,避免再次解析
dataResultCh.setCurValue(data);
dataResultCh.setRegisterAddr(deviceCodeParam.getRegisterAddr());
dataResultCh.setRegisterName(deviceCodeParam.getRegisterName());
dataResultCh.setGrade(deviceCodeParam.getGrade());
dataResultCh.setProjectId(deviceCodeParam.getProjectId());
dataResultCh.setGrade(deviceCodeParam.getGrade());
String projectName = projectInfoService.selectName(deviceCodeParam.getProjectId());
log.info("冷量计==>{},寄存器地址==>{},读数==>{},项目名称==>{}", cloudId, registerAddr, dataResultCh.getCurValue(), projectName);
dataResultService.saveDataResultCh(dataResultCh);
log.info("冷量计瞬时冷量/流量保存数据库成功!项目名称:{}", projectName);
}
// else if (registerAddr.equals("0080") || registerAddr.equals("0077"))
else if (grade >= 40 && grade <= 49)
{
} else if (grade >= 40 && grade <= 49) {
dataResultCl.setDeviceAddr(cloudId);
dataResultCl.setDeviceType(deviceCodeParam.getDeviceType());
dataResultCl.setCurDate(sdf1.parse(dateStr));
dataResultCl.setCurDate(date); // 直接使用Date对象,避免再次解析
BigDecimal lData = new BigDecimal(data);
dataResultCl.setCurValue(lData);//字符串转整型
dataResultCl.setCurValue(lData);
dataResultCl.setRegisterAddr(deviceCodeParam.getRegisterAddr());
dataResultCl.setRegisterName(deviceCodeParam.getRegisterName());
dataResultCl.setGrade(deviceCodeParam.getGrade());
@ -281,7 +277,7 @@ public void analysisCloudOrder485(final String data, final DeviceCodeParamEntity
} catch (Exception e) {
log.error("保存冷量计数据失败!", e);
}
// });
}
}
}

2
user-service/src/main/resources/application-dev.yml

@ -5,7 +5,7 @@ spring:
name: jnd-user-service
datasource:
#添加allowMultiQueries=true 在批量更新时才不会出错
url: jdbc:sqlserver://127.0.0.1:1433;DatabaseName=mh_jnd;allowMultiQueries=true
url: jdbc:sqlserver://127.0.0.1:1433;DatabaseName=mh_jnd2;allowMultiQueries=true
driver-class-name: com.microsoft.sqlserver.jdbc.SQLServerDriver
username: sa
password: mh@803

18
user-service/src/test/java/com/mh/user/DealDataTest.java

@ -3,6 +3,7 @@ package com.mh.user;
import com.github.pagehelper.PageInfo;
import com.mh.common.page.PageResult;
import com.mh.user.dto.EnergyMomYoyDataDTO;
import com.mh.user.job.DealDataJob;
import com.mh.user.mapper.chillers.DealDataMapper;
import com.mh.user.model.EnergyModel;
import com.mh.user.service.EnergyDataService;
@ -27,6 +28,23 @@ public class DealDataTest extends UserServiceApplicationTests {
@Autowired
private DealDataService dealDataService;
@Autowired
private DealDataJob dealDataJob;
@Test
public void dealDataJob() throws InterruptedException {
// for (int i = 0; i < 10; i++) {
// dealDataJob.proDataResult("2025-08-26 05:45:00");
// dealDataJob.proDataResult("2025-08-26 05:46:00");
// dealDataJob.proDataResult("2025-08-26 05:47:00");
// dealDataJob.proDataResult("2025-08-26 05:48:00");
// dealDataJob.proDataResult("2025-08-26 05:49:00");
// dealDataJob.proDataResult("2025-08-26 05:50:00");
// Thread.sleep(60000);
// }
}
@Test
public void dealChillersData() {
try {

Loading…
Cancel
Save