diff --git a/user-service/src/main/java/com/mh/user/job/DealDataJob.java b/user-service/src/main/java/com/mh/user/job/DealDataJob.java index a6f1c1a..5aef4d6 100644 --- a/user-service/src/main/java/com/mh/user/job/DealDataJob.java +++ b/user-service/src/main/java/com/mh/user/job/DealDataJob.java @@ -6,9 +6,13 @@ import com.mh.user.utils.SimpleWeather; import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; + import java.text.SimpleDateFormat; import java.util.*; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; /** * @author ljf @@ -22,6 +26,7 @@ import java.util.concurrent.CountDownLatch; public class DealDataJob { private final DealDataService dealDataService; + public DealDataJob(DealDataService dealDataService) { this.dealDataService = dealDataService; } @@ -32,41 +37,42 @@ public class DealDataJob { @Scheduled(cron = "25 0/2 * * * ?") //每2分钟一次 public void ProEnergy() { try { - SimpleDateFormat sdf1=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - Date date=new Date(); - String curDate=sdf1.format(date); + SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + Date date = new Date(); + String curDate = sdf1.format(date); // dealDataService.proEnergy(curDate);//中央热水yyyy-MM-dd HH:mm:ss - String m=curDate.substring(15,16); - if (Integer.parseInt(m)==0){ - curDate=curDate.substring(0,16)+":00"; - }else if(Integer.parseInt(m)==2){ - curDate=curDate.substring(0,16)+":00"; - }else if(Integer.parseInt(m)==4){ - curDate=curDate.substring(0,16)+":00"; - }else if(Integer.parseInt(m)==6){ - curDate=curDate.substring(0,16)+":00"; - }else if(Integer.parseInt(m)==8){ - curDate=curDate.substring(0,16)+":00"; + String m = curDate.substring(15, 16); + if (Integer.parseInt(m) == 0) { + curDate = curDate.substring(0, 16) + ":00"; + } else if (Integer.parseInt(m) == 2) { + curDate = curDate.substring(0, 16) + ":00"; + } else if (Integer.parseInt(m) == 4) { + curDate = curDate.substring(0, 16) + ":00"; + } else if (Integer.parseInt(m) == 6) { + curDate = curDate.substring(0, 16) + ":00"; + } else if (Integer.parseInt(m) == 8) { + curDate = curDate.substring(0, 16) + ":00"; } dealDataService.proEnergyData(curDate);//中央空调yyyy-MM-dd HH:mm:ss - log.info("---------数据分析定时汇总,每二分钟!"+curDate); + log.info("---------数据分析定时汇总,每二分钟!" + curDate); } catch (Exception e) { log.error(" ProEnergy 数据分析定时汇总异常", e); // e.printStackTrace(); } } + /** * 定时处理数据:每十五分钟处理一次 */ @Scheduled(cron = "0 0/16 * * * ?") public void dealData() { try { - SimpleDateFormat sdf1=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - Date date=new Date(); - String curDate=sdf1.format(date); + SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + Date date = new Date(); + String curDate = sdf1.format(date); dealDataService.proDeviceState(curDate); - log.info("---------设备状态定时汇总,每十五分钟!"+curDate); + log.info("---------设备状态定时汇总,每十五分钟!" + curDate); } catch (Exception e) { log.error("设备状态定时汇总异常", e); } @@ -78,30 +84,57 @@ public class DealDataJob { @Scheduled(cron = "0 0/1 * * * ?") public void proDataResult() { try { - // 通过多线程进行处理 - String curDate = ExchangeStringUtil.dateTime(1, ""); + // 添加开始程序时间 + long startTime = System.currentTimeMillis(); + String curDate = ExchangeStringUtil.dateTime(3, ""); List list = dealDataService.queryProjectId("1"); if (!list.isEmpty()) { - CountDownLatch latch = new CountDownLatch(list.size()); - for (String projectId : list) { - new Thread(() -> { - try { - dealDataService.proDataResult(curDate, projectId); - } catch (Exception e) { - log.error("能效监测定时汇总异常", e); - } finally { - latch.countDown(); - } - }).start(); + // 使用较小的线程池,减少并发冲突 + ExecutorService executor = Executors.newFixedThreadPool( + Math.min(Math.max(list.size() / 2, 1), 5) + ); + + try { + CountDownLatch latch = new CountDownLatch(list.size()); + // 分批处理,避免同时处理过多项目 + for (String projectId : list) { + final String pid = projectId; + String finalCurDate = curDate; + executor.submit(() -> { + try { + // 添加延迟,错开执行时间 + Thread.sleep(100); + dealDataService.proDataResult(finalCurDate, pid); +// proDataResultService.processDailySummary(curDate, pid); + } catch (Exception e) { + log.error("能效监测定时汇总异常,项目ID: {}", pid, e); + } finally { + latch.countDown(); + } + }); + } + + // 添加超时控制,避免无限期等待 + boolean completed = latch.await(2, TimeUnit.MINUTES); + if (!completed) { + log.warn("能效监测定时汇总未在2分钟内完成,项目数量: {}", list.size()); + } + } finally { + executor.shutdown(); } - latch.await(); // 等待所有子线程执行完毕 } + // 结束时间 + long endTime = System.currentTimeMillis(); + log.info("计算one、five、fifteen耗时:{} 毫秒", endTime-startTime); log.info("---------能效监测定时汇总,每一分钟!{}", curDate); } catch (Exception e) { log.error("能效监测定时汇总异常", e); } } + + + /** * 定时处理数据:获取环境温度和湿度,所有项目共有 */ @@ -109,17 +142,17 @@ public class DealDataJob { // @Scheduled(cron = "0 0/1 * * * ?") //1分钟 public void saveTempHumidity() { try { - SimpleDateFormat sdf1=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - Date date=new Date(); - Map map= SimpleWeather.queryWeather("广州"); - String curDate=sdf1.format(date); + SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + Date date = new Date(); + Map map = SimpleWeather.queryWeather("广州"); + String curDate = sdf1.format(date); if (map.isEmpty()) { // 重新请求接口 - map= SimpleWeather.queryWeather("广州"); + map = SimpleWeather.queryWeather("广州"); } - String temperature=map.get("temperature").toString(); - String humidity=map.get("humidity").toString(); - dealDataService.saveTempHumidity(curDate,temperature,humidity); + String temperature = map.get("temperature").toString(); + String humidity = map.get("humidity").toString(); + dealDataService.saveTempHumidity(curDate, temperature, humidity); log.info("{},{}℃,{}%", curDate, temperature, humidity); } catch (Exception e) { // e.printStackTrace(); @@ -134,16 +167,16 @@ public class DealDataJob { // @Scheduled(cron = "0 0/1 * * * ?") //1分钟 public void proEnergyDaySum() { try { - SimpleDateFormat sdf1=new SimpleDateFormat("yyyy-MM-dd"); - Date date=new Date(); + SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd"); + Date date = new Date(); Calendar calendar = Calendar.getInstance(); calendar.setTime(date); calendar.add(Calendar.DATE, -1);// 减少一天 - String curDate=sdf1.format(calendar.getTime()); - List list=dealDataService.queryProjectId("2"); - if (!list.isEmpty()){ - for(String projectId:list){ - dealDataService.proEnergyDaySum(curDate,projectId); + String curDate = sdf1.format(calendar.getTime()); + List list = dealDataService.queryProjectId("2"); + if (!list.isEmpty()) { + for (String projectId : list) { + dealDataService.proEnergyDaySum(curDate, projectId); } } log.info("---------中央热水生产概况汇总,每一小时!{}", curDate); diff --git a/user-service/src/main/java/com/mh/user/mapper/DataResultChMapper.java b/user-service/src/main/java/com/mh/user/mapper/DataResultChMapper.java index 3e27e4b..86561c6 100644 --- a/user-service/src/main/java/com/mh/user/mapper/DataResultChMapper.java +++ b/user-service/src/main/java/com/mh/user/mapper/DataResultChMapper.java @@ -6,6 +6,7 @@ import com.mh.user.entity.DataResultClEntity; import org.apache.ibatis.annotations.*; import java.util.List; +import java.util.Map; /** * @author LJF @@ -70,4 +71,7 @@ public interface DataResultChMapper extends BaseMapper { " t1.cur_date ") List queryDataResultCh(@Param("projectId") String projectId, @Param("startDate") String startDate, @Param("curDate") String curDate); + @Select("select top 1 * from data_result_ch " + + " where cur_date >= #{start} and cur_date <= #{end} and register_addr = #{registerAddr} and project_id = #{projectId} and device_addr = #{deviceAddr} order by cur_date desc ") + DataResultChEntity selectLatestByCondition(Map params); } diff --git a/user-service/src/main/java/com/mh/user/mapper/DataResultFifteenMiMapper.java b/user-service/src/main/java/com/mh/user/mapper/DataResultFifteenMiMapper.java index 7ee7084..a498278 100644 --- a/user-service/src/main/java/com/mh/user/mapper/DataResultFifteenMiMapper.java +++ b/user-service/src/main/java/com/mh/user/mapper/DataResultFifteenMiMapper.java @@ -4,9 +4,7 @@ import com.baomidou.mybatisplus.core.mapper.BaseMapper; import com.mh.user.entity.DataResultChEntity; import com.mh.user.entity.DataResultFifteenMiEntity; import com.mh.user.mapper.provider.DataResultProvider; -import org.apache.ibatis.annotations.Mapper; -import org.apache.ibatis.annotations.Param; -import org.apache.ibatis.annotations.SelectProvider; +import org.apache.ibatis.annotations.*; import java.util.List; @@ -37,4 +35,18 @@ public interface DataResultFifteenMiMapper extends BaseMapper= #{timeStr} and cur_date <= #{timeStr} " + + "AND device_addr = #{deviceAddr} AND project_id = #{projectId}") + Long selectExistId(@Param("timeStr") String timeStr, + @Param("deviceAddr") String deviceAddr, + @Param("projectId") String projectId); + + @Update("UPDATE data_result_five_mi SET cur_value = #{curValue}, grade = #{grade} WHERE id = #{id}") + void updateByIdCustomize(@Param("id") Long id, + @Param("curValue") String curValue, + @Param("grade") int grade); + + @Insert("INSERT INTO data_result_fifteen_mi (device_addr, device_type, cur_date, cur_value, fun_code, register_addr, register_name, grade, project_id) " + + "VALUES (#{deviceAddr}, #{deviceType}, #{curDate}, #{curValue}, #{funCode}, #{registerAddr}, #{registerName}, #{grade}, #{projectId})") + void saveDataResultFifteenMi(DataResultFifteenMiEntity record); } diff --git a/user-service/src/main/java/com/mh/user/mapper/DataResultFiveMiMapper.java b/user-service/src/main/java/com/mh/user/mapper/DataResultFiveMiMapper.java index d7be2fa..6282bca 100644 --- a/user-service/src/main/java/com/mh/user/mapper/DataResultFiveMiMapper.java +++ b/user-service/src/main/java/com/mh/user/mapper/DataResultFiveMiMapper.java @@ -37,4 +37,19 @@ public interface DataResultFiveMiMapper extends BaseMapper= #{timeStr} and cur_date <= #{timeStr} " + + "AND device_addr = #{deviceAddr} AND project_id = #{projectId}") + Long selectExistId(@Param("timeStr") String timeStr, + @Param("deviceAddr") String deviceAddr, + @Param("projectId") String projectId); + + @Update("UPDATE data_result_five_mi SET cur_value = #{curValue}, grade = #{grade} WHERE id = #{id}") + void updateByIdCustomize(@Param("id") Long id, + @Param("curValue") String curValue, + @Param("grade") int grade); + + @Insert("INSERT INTO data_result_five_mi (device_addr, device_type, cur_date, cur_value, fun_code, register_addr, register_name, grade, project_id) " + + "VALUES (#{deviceAddr}, #{deviceType}, #{curDate}, #{curValue}, #{funCode}, #{registerAddr}, #{registerName}, #{grade}, #{projectId})") + void saveDataResultFiveMi(DataResultFiveMiEntity record); } diff --git a/user-service/src/main/java/com/mh/user/mapper/DataResultOneMiMapper.java b/user-service/src/main/java/com/mh/user/mapper/DataResultOneMiMapper.java index a950e96..af2cf68 100644 --- a/user-service/src/main/java/com/mh/user/mapper/DataResultOneMiMapper.java +++ b/user-service/src/main/java/com/mh/user/mapper/DataResultOneMiMapper.java @@ -43,4 +43,19 @@ public interface DataResultOneMiMapper extends BaseMapper @SelectProvider(type = DataResultProvider.class,method = "dataResultOneMiCount") int dataResultOneMiCount(@Param("projectId") String projectId,@Param("startDate") String startDate,@Param("curDate") String curDate); + @Delete("DELETE FROM data_result_one_mi WHERE cur_date >= #{timeStr} and cur_date <= #{timeStr} " + + "AND device_addr = #{deviceAddr} AND project_id = #{projectId}") + void deleteByTimeDeviceProject(@Param("timeStr") String timeStr, + @Param("deviceAddr") String deviceAddr, + @Param("projectId") String projectId); + + @Select("SELECT count(1) FROM data_result_one_mi WHERE cur_date >= #{timeStr} and cur_date <= #{timeStr} " + + "AND device_addr = #{deviceAddr} AND project_id = #{projectId} and register_addr = #{registerAddr} and device_type = #{deviceType} ") + int selectCountByTimeDeviceProject(@Param("timeStr") String timeStr, + @Param("deviceAddr") String deviceAddr, + @Param("projectId") String projectId,@Param("registerAddr") String registerAddr, @Param("deviceType") String deviceType); + + @Insert("INSERT INTO data_result_one_mi (device_addr, device_type, cur_date, cur_value, fun_code, register_addr, register_name, grade, project_id) " + + "VALUES (#{deviceAddr}, #{deviceType}, #{curDate}, #{curValue}, #{funCode}, #{registerAddr}, #{registerName}, #{grade}, #{projectId})") + void saveDataResultOneMi(DataResultOneMiEntity dataResultOneMiEntity); } diff --git a/user-service/src/main/java/com/mh/user/netty/EchoServer.java b/user-service/src/main/java/com/mh/user/netty/EchoServer.java index 1fafed0..ee589d1 100644 --- a/user-service/src/main/java/com/mh/user/netty/EchoServer.java +++ b/user-service/src/main/java/com/mh/user/netty/EchoServer.java @@ -17,29 +17,50 @@ public class EchoServer { } public void start() { - // 创建Even-LoopGroup + // 创建Event-LoopGroup NioEventLoopGroup bossGroup = new NioEventLoopGroup(); // bossGroup 用于监听客户端连接,专门负责与客户端创建连接,并把连接注册到workerGroup的Selector中。 NioEventLoopGroup workerGroup = new NioEventLoopGroup(); // workerGroup用于处理每一个连接发生的读写事件。 + try { - ServerBootstrap serverBootstrap = new ServerBootstrap(); // 2、创建Server-Bootstrap + ServerBootstrap serverBootstrap = new ServerBootstrap(); // 创建Server-Bootstrap serverBootstrap.group(bossGroup, workerGroup) - .channel(NioServerSocketChannel.class) // 3、指定所使用的NIO传输Channel - .localAddress(port) // 4、指定端口设置套接字 + .channel(NioServerSocketChannel.class) // 指定所使用的NIO传输Channel + .localAddress(port) // 指定端口设置套接字 .option(ChannelOption.SO_BACKLOG, 1204) .childHandler(new ServerChannelInitializer()); - ChannelFuture channelFuture = serverBootstrap.bind().sync(); // 6、异步绑定服务器;调用sync()方法,阻塞式等待,直到绑定完成 - log.info("服务器启动开始监听端口:"+port); - channelFuture.channel().closeFuture().sync(); // 7、获取Channel的closeFuture,并且阻塞当前线程,直到它完成 - - } catch (InterruptedException e) { - log.error("服务器启动失败", e); - } finally { - try { - bossGroup.shutdownGracefully().sync(); // 8、关闭EventLoopGroup,关闭所有的资源 - workerGroup.shutdownGracefully().sync(); // 关闭 - } catch (InterruptedException e) { - e.printStackTrace(); - } + + // 异步绑定服务器,不使用sync()阻塞 + ChannelFuture channelFuture = serverBootstrap.bind(); + channelFuture.addListener(future -> { + if (future.isSuccess()) { + log.info("服务器启动成功,开始监听端口:{}", port); + } else { + log.error("服务器启动失败", future.cause()); + } + }); + + // 添加JVM关闭钩子,确保优雅关闭 + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + log.info("正在关闭服务器..."); + try { + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); + log.info("服务器已关闭"); + } catch (Exception e) { + log.error("关闭服务器时发生错误", e); + } + })); + + // 不阻塞主线程,服务器在后台运行 + log.info("服务器启动命令已发送,端口:{}", port); + + } catch (Exception e) { + log.error("服务器启动过程中发生异常", e); + // 发生异常时关闭资源 + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); } + // 注意:这里不再等待channel关闭,方法会立即返回 } + } diff --git a/user-service/src/main/java/com/mh/user/service/ProDataResultService.java b/user-service/src/main/java/com/mh/user/service/ProDataResultService.java new file mode 100644 index 0000000..9d8bd22 --- /dev/null +++ b/user-service/src/main/java/com/mh/user/service/ProDataResultService.java @@ -0,0 +1,16 @@ +package com.mh.user.service; + +import java.text.ParseException; + +/** + * @author LJF + * @version 1.0 + * @project mh_esi + * @description pro_data_result修改java + * @date 2025-08-26 09:08:15 + */ +public interface ProDataResultService { + + void processDailySummary(String curDate, String projectId) throws ParseException; + +} diff --git a/user-service/src/main/java/com/mh/user/service/chillers/impl/DealDataServiceImpl.java b/user-service/src/main/java/com/mh/user/service/chillers/impl/DealDataServiceImpl.java index 19a7973..ccbd0ef 100644 --- a/user-service/src/main/java/com/mh/user/service/chillers/impl/DealDataServiceImpl.java +++ b/user-service/src/main/java/com/mh/user/service/chillers/impl/DealDataServiceImpl.java @@ -2,10 +2,13 @@ package com.mh.user.service.chillers.impl; import com.mh.user.mapper.chillers.DealDataMapper; import com.mh.user.service.chillers.DealDataService; +import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; /** * @author ljf @@ -14,8 +17,11 @@ import java.util.List; * @updateTime 2020-07-28 * @throws : */ +@Slf4j @Service public class DealDataServiceImpl implements DealDataService { + // 添加锁对象管理 + private static final Map LOCK_MAP = new ConcurrentHashMap<>(); private final DealDataMapper dealDataMapper; @@ -36,10 +42,65 @@ public class DealDataServiceImpl implements DealDataService { dealDataMapper.proEnergyData(curDate); } + // 清理锁对象的方法 + private void cleanupLocks() { + if (LOCK_MAP.size() > 1000) { + synchronized (LOCK_MAP) { + if (LOCK_MAP.size() > 1000) { + LOCK_MAP.clear(); + } + } + } + } + + // 判断是否为死锁异常 + private boolean isDeadlockException(Exception e) { + if (e.getMessage() != null) { + String message = e.getMessage().toLowerCase(); + return message.contains("死锁") || message.contains("deadlock"); + } + return false; + } + @Override - public void proDataResult(String curDate,String projectId) { + public void proDataResult(String curDate, String projectId) { + String lockKey = "proDataResult_" + projectId; + Object lockObject = LOCK_MAP.computeIfAbsent(lockKey, k -> new Object()); + + synchronized (lockObject) { + int retryCount = 0; + final int maxRetries = 3; + + while (true) { + try { + dealDataMapper.proDataResult(curDate, projectId); + break; // 成功执行则跳出循环 + } catch (Exception e) { + retryCount++; + if (retryCount >= maxRetries) { + log.error("proDataResult执行失败,已重试" + maxRetries + "次"); + break; + } + + // 检查是否为死锁异常 + if (isDeadlockException(e)) { + log.error("检测到死锁,进行第{}次重试,项目ID:{}", retryCount, projectId); + // 随机延迟后重试 + try { + Thread.sleep(10); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new RuntimeException(ie); + } + } else { + throw new RuntimeException("非死锁异常,项目ID:" + projectId, e); + } + } + } + } - dealDataMapper.proDataResult(curDate,projectId); + // 清理锁对象 + cleanupLocks(); } @Override diff --git a/user-service/src/main/java/com/mh/user/service/impl/DataResultServiceImpl.java b/user-service/src/main/java/com/mh/user/service/impl/DataResultServiceImpl.java index ae69c48..f7344bb 100644 --- a/user-service/src/main/java/com/mh/user/service/impl/DataResultServiceImpl.java +++ b/user-service/src/main/java/com/mh/user/service/impl/DataResultServiceImpl.java @@ -409,9 +409,10 @@ public class DataResultServiceImpl implements DataResultService { } else { count = dataResultFiveMiCount(projectId, startDate, curDate, momentCoolingStr, registerAddr); } - new Thread(() ->{ - dealDataExceptionService.dealFifteenInstantaneousExceptionData(mergedRecords); - }).start(); + // 屏蔽异常处理 +// new Thread(() ->{ +// dealDataExceptionService.dealFifteenInstantaneousExceptionData(mergedRecords); +// }).start(); return HttpResult.ok(count, mergedRecords); } // else if (dateType.equals("12小时")) { diff --git a/user-service/src/main/java/com/mh/user/service/impl/ProDataResultServiceImpl.java b/user-service/src/main/java/com/mh/user/service/impl/ProDataResultServiceImpl.java new file mode 100644 index 0000000..c1442f8 --- /dev/null +++ b/user-service/src/main/java/com/mh/user/service/impl/ProDataResultServiceImpl.java @@ -0,0 +1,328 @@ +package com.mh.user.service.impl; + +import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; +import com.mh.user.entity.DataResultChEntity; +import com.mh.user.entity.DataResultFifteenMiEntity; +import com.mh.user.entity.DataResultFiveMiEntity; +import com.mh.user.entity.DataResultOneMiEntity; +import com.mh.user.mapper.DataResultChMapper; +import com.mh.user.mapper.DataResultFifteenMiMapper; +import com.mh.user.mapper.DataResultFiveMiMapper; +import com.mh.user.mapper.DataResultOneMiMapper; +import com.mh.user.service.ProDataResultService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.BeanUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import java.sql.Timestamp; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +/** + * @author LJF + * @version 1.0 + * @project mh_esi + * @description pro_data_result改成java + * @date 2025-08-26 09:07:39 + */ +@Slf4j +@Service +@Transactional(rollbackFor = Exception.class) +public class ProDataResultServiceImpl implements ProDataResultService { + + private static final List DEVICE_CODES = Arrays.asList("0020", "0073", "0075", "0006", "0010", "0012", "0014"); + + @Autowired + private DataResultChMapper dataResultChMapper; + @Autowired + private DataResultOneMiMapper oneMiMapper; + @Autowired + private DataResultFiveMiMapper fiveMiMapper; + @Autowired + private DataResultFifteenMiMapper fifteenMiMapper; + + @Override + public void processDailySummary(String curDateStr, String projectId) throws ParseException { + // 时间格式标准化 + SimpleDateFormat inputSdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + Date curDate = inputSdf.parse(curDateStr); + SimpleDateFormat outputSdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:00"); + String standardDate = outputSdf.format(curDate); + log.info("处理日期:{}", standardDate); + + // 计算时间窗口参数 + int minute = curDate.getMinutes(); + int intervalMinute = (minute / 5) * 5; + String minuteStr = String.format("%02d", intervalMinute); + log.info("分钟区间:{}", minuteStr); + + // 遍历设备编码 + for (String deviceCode : DEVICE_CODES) { + log.info("处理设备编码:{}", deviceCode); + + // 检查是否存在待处理数据 + Map countParams = new HashMap<>(); +// countParams.put("start", standardDate); +// countParams.put("end", addMinutes(standardDate, 5)); + countParams.put("register_addr", deviceCode); + countParams.put("project_id", projectId); + int recordCount = Math.toIntExact(dataResultChMapper.selectCount(new QueryWrapper() + .allEq(countParams) + .ge("cur_date", standardDate) + .lt("cur_date", addMinutes(standardDate, 5)))); + + if (recordCount == 0) continue; + + // 获取当前设备编码下的所有设备地址 + List deviceAddrs = dataResultChMapper.selectObjs(new QueryWrapper() + .select("DISTINCT device_addr") + .eq("register_addr", deviceCode) + .eq("project_id", projectId) + .between("cur_date", standardDate, addMinutes(standardDate, 5))) + .stream().map(Object::toString).collect(Collectors.toList()); + + // 处理每个设备地址 + for (String deviceAddr : deviceAddrs) { +// processIntervalData(standardDate, deviceCode, projectId, deviceAddr, 1); + processIntervalData(standardDate, deviceCode, projectId, deviceAddr, 5, minuteStr); + processIntervalData(standardDate, deviceCode, projectId, deviceAddr, 15, minuteStr); + } + } + } + + private void processIntervalData(String baseDate, String registerAddr, String projectId, + String deviceAddr, int interval, String... extraParams) throws ParseException { + // 计算时间窗口 + String startTime = baseDate; + String endTime = addMinutes(baseDate, interval); + + // 获取最新数据 + Map params = new HashMap<>(); + params.put("start", startTime); + params.put("end", endTime); + params.put("registerAddr", registerAddr); + params.put("projectId", projectId); + params.put("deviceAddr", deviceAddr); + DataResultChEntity latest = dataResultChMapper.selectLatestByCondition(params); + + if (latest == null) return; + + // 数值有效性校验 + String curValue = isNumeric(latest.getCurValue()) ? latest.getCurValue() : "0.00"; + if (!isNumeric(latest.getCurValue())) { + log.info("无效数值,已替换为0.00:{}", latest.getCurValue()); + } + + // 计算汇总时间点 + String summaryTime = extraParams.length > 0 ? + baseDate.substring(0, 13) + ":" + extraParams[0] + ":00" : + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(latest.getCurDate()); + + // 处理不同时间间隔的汇总 + switch (interval) { + case 1: + handleOneMinute(latest, deviceAddr, projectId, summaryTime, curValue); + break; + case 5: + handleFiveMinute(latest, deviceAddr, projectId, summaryTime, curValue); + break; + case 15: + handleFifteenMinute(latest, deviceAddr, projectId, summaryTime, curValue); + break; + } + } + + // 在类中添加静态锁对象 + private static final Map LOCK_MAP = new ConcurrentHashMap<>(); + + private void handleOneMinute(DataResultChEntity source, String deviceAddr, String projectId, + String summaryTime, String curValue) { + // 创建针对特定设备和项目的锁key + String lockKey = deviceAddr + "_" + projectId; + Object lockObject = LOCK_MAP.computeIfAbsent(lockKey, k -> new Object()); + + try { + // 使用特定锁,减少锁竞争 + synchronized (lockObject) { + // 添加重试机制避免死锁 + int retryCount = 0; + final int maxRetries = 3; + + while (retryCount < maxRetries) { + try { + // 查询历史数据 + int count = oneMiMapper.selectCountByTimeDeviceProject(summaryTime, deviceAddr, projectId, source.getRegisterAddr(), source.getDeviceType()); + if (count == 0) { + // 插入新数据 + DataResultOneMiEntity record = new DataResultOneMiEntity(); + BeanUtils.copyProperties(source, record); + record.setCurDate(Timestamp.valueOf(summaryTime)); + record.setCurValue(curValue); + oneMiMapper.saveDataResultOneMi(record); + } + break; // 成功执行则跳出循环 + + } catch (Exception e) { + retryCount++; + if (retryCount >= maxRetries) { + throw e; // 达到最大重试次数后抛出异常 + } + + // 检查是否为死锁异常 + if (e.getMessage() != null && + (e.getMessage().contains("死锁") || e.getMessage().contains("deadlock"))) { + log.warn("检测到死锁,进行第{}次重试,设备地址:{},项目ID:{}", + retryCount, deviceAddr, projectId); + // 随机延迟后重试 + try { + Thread.sleep(new Random().nextInt(200) + 100); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new RuntimeException(ie); + } + } else { + throw e; // 非死锁异常直接抛出 + } + } + } + } + } finally { + // 清理长时间不用的锁对象,避免内存泄漏 + cleanupLocks(); + } + } + + // 清理锁对象的方法 + private void cleanupLocks() { + // 简单的清理策略:当锁数量超过一定阈值时,清空所有锁 + // 实际项目中可以根据需要实现更复杂的清理逻辑 + if (LOCK_MAP.size() > 1000) { + synchronized (LOCK_MAP) { + if (LOCK_MAP.size() > 1000) { + LOCK_MAP.clear(); + } + } + } + } + + private void handleFiveMinute(DataResultChEntity source, String deviceAddr, String projectId, + String summaryTime, String curValue) { + // 创建针对特定设备和项目的锁key + String lockKey = "five_" + deviceAddr + "_" + projectId; + Object lockObject = LOCK_MAP.computeIfAbsent(lockKey, k -> new Object()); + + try { + // 使用特定锁,减少锁竞争 + synchronized (lockObject) { + // 添加重试机制避免死锁 + executeWithRetry(() -> { + Long existId = fiveMiMapper.selectExistId(summaryTime, deviceAddr, projectId); + + DataResultFiveMiEntity record = new DataResultFiveMiEntity(); + BeanUtils.copyProperties(source, record); + record.setCurDate(Timestamp.valueOf(summaryTime)); + record.setCurValue(curValue); + + if (existId != null) { + fiveMiMapper.updateByIdCustomize(existId, curValue, source.getGrade()); + } else { + fiveMiMapper.saveDataResultFiveMi(record); + } + }); + } + } finally { + // 清理长时间不用的锁对象,避免内存泄漏 + cleanupLocks(); + } + } + + private void handleFifteenMinute(DataResultChEntity source, String deviceAddr, String projectId, + String summaryTime, String curValue) { + // 创建针对特定设备和项目的锁key + String lockKey = "fifteen_" + deviceAddr + "_" + projectId; + Object lockObject = LOCK_MAP.computeIfAbsent(lockKey, k -> new Object()); + + try { + // 使用特定锁,减少锁竞争 + synchronized (lockObject) { + // 添加重试机制避免死锁 + executeWithRetry(() -> { + Long existId = fifteenMiMapper.selectExistId(summaryTime, deviceAddr, projectId); + + DataResultFifteenMiEntity record = new DataResultFifteenMiEntity(); + BeanUtils.copyProperties(source, record); + record.setCurDate(Timestamp.valueOf(summaryTime)); + record.setCurValue(curValue); + + if (existId != null) { + fifteenMiMapper.updateByIdCustomize(existId, curValue, source.getGrade()); + } else { + fifteenMiMapper.saveDataResultFifteenMi(record); + } + }); + } + } finally { + // 清理长时间不用的锁对象,避免内存泄漏 + cleanupLocks(); + } + } + + // 通用重试方法 + private void executeWithRetry(Runnable operation) { + int retryCount = 0; + final int maxRetries = 3; + + while (retryCount < maxRetries) { + try { + operation.run(); + break; + } catch (Exception e) { + retryCount++; + if (retryCount >= maxRetries) { + throw new RuntimeException("操作失败,已重试" + maxRetries + "次", e); + } + + // 检查是否为死锁异常 + if (isDeadlockException(e)) { + log.warn("检测到死锁,进行第{}次重试", retryCount); + // 随机延迟后重试 + try { + Thread.sleep(new Random().nextInt(200) + 100); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new RuntimeException(ie); + } + } else { + throw new RuntimeException("非死锁异常", e); // 非死锁异常直接抛出 + } + } + } + } + + // 判断是否为死锁异常 + private boolean isDeadlockException(Exception e) { + if (e.getMessage() != null) { + String message = e.getMessage().toLowerCase(); + return message.contains("死锁") || message.contains("deadlock"); + } + return false; + } + + + private String addMinutes(String baseDate, int minutes) throws ParseException { + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + Date date = sdf.parse(baseDate); + date.setTime(date.getTime() + minutes * 60 * 1000L); + return sdf.format(date); + } + + private boolean isNumeric(String str) { + return str != null && str.matches("-?\\d+(\\.\\d+)?"); + } + +} diff --git a/user-service/src/main/java/com/mh/user/strategy/ModbusProtocolStrategy.java b/user-service/src/main/java/com/mh/user/strategy/ModbusProtocolStrategy.java index 4ce4296..16ce5d0 100644 --- a/user-service/src/main/java/com/mh/user/strategy/ModbusProtocolStrategy.java +++ b/user-service/src/main/java/com/mh/user/strategy/ModbusProtocolStrategy.java @@ -15,6 +15,7 @@ import java.math.BigDecimal; import java.math.RoundingMode; import java.text.ParseException; import java.text.SimpleDateFormat; +import java.util.Calendar; import java.util.Date; import java.util.concurrent.ThreadPoolExecutor; @@ -110,10 +111,9 @@ public class ModbusProtocolStrategy implements ProtocolStrategy { sValue = String.valueOf(Long.parseLong(sValue)); } else if (deviceCodeParamEntity.getDigit() < 0) { sValue = String.valueOf(Long.parseLong(sValue)) + ExchangeStringUtil.addZeroForNum("0", -deviceCodeParamEntity.getDigit()); - } - else { + } else { // 保留位数 - sValue = (new BigDecimal(sValue)).divide(new BigDecimal(ExchangeStringUtil.rightAddZeroForNum("1", deviceCodeParamEntity.getDigit()+1)), 2, RoundingMode.HALF_UP).toString(); + sValue = (new BigDecimal(sValue)).divide(new BigDecimal(ExchangeStringUtil.rightAddZeroForNum("1", deviceCodeParamEntity.getDigit() + 1)), 2, RoundingMode.HALF_UP).toString(); } break; case 2: @@ -189,81 +189,77 @@ public class ModbusProtocolStrategy implements ProtocolStrategy { } } -/** - * 格式化数据 - * - * @param deviceCodeParam - * @param dataResultCh - * @param chillerAddr - * @param date - * @param data - * @throws ParseException - */ -public void initialDataResultCh(DeviceCodeParamEntity deviceCodeParam, DataResultChEntity dataResultCh, String chillerAddr, Date date, String data) throws ParseException { - dataResultCh.setDeviceAddr(chillerAddr); - dataResultCh.setDeviceType(deviceCodeParam.getDeviceType()); - dataResultCh.setCurDate(date); - dataResultCh.setCurValue(data); - dataResultCh.setRegisterAddr(deviceCodeParam.getRegisterAddr()); - dataResultCh.setRegisterName(deviceCodeParam.getRegisterName()); - dataResultCh.setGrade(deviceCodeParam.getGrade()); - dataResultCh.setFunCode(deviceCodeParam.getFunCode()); - dataResultCh.setProjectId(deviceCodeParam.getProjectId()); - String projectName = projectInfoService.selectName(deviceCodeParam.getProjectId()); - log.info("冷水机:{},状态:{},项目名称:{}", chillerAddr, data, projectName); - dataResultService.saveDataResultChiller(dataResultCh); - // dataResultService.deleteDataResultNow(deviceCodeParam.getDeviceAddr(), deviceCodeParam.getDeviceType(), deviceCodeParam.getRegisterAddr(), deviceCodeParam.getProjectId()); - log.info("冷水机保存成功!项目名称:{}", projectName); -} + /** + * 格式化数据 + * + * @param deviceCodeParam + * @param dataResultCh + * @param chillerAddr + * @param date + * @param data + * @throws ParseException + */ + public void initialDataResultCh(DeviceCodeParamEntity deviceCodeParam, DataResultChEntity dataResultCh, String chillerAddr, Date date, String data) throws ParseException { + dataResultCh.setDeviceAddr(chillerAddr); + dataResultCh.setDeviceType(deviceCodeParam.getDeviceType()); + dataResultCh.setCurDate(date); + dataResultCh.setCurValue(data); + dataResultCh.setRegisterAddr(deviceCodeParam.getRegisterAddr()); + dataResultCh.setRegisterName(deviceCodeParam.getRegisterName()); + dataResultCh.setGrade(deviceCodeParam.getGrade()); + dataResultCh.setFunCode(deviceCodeParam.getFunCode()); + dataResultCh.setProjectId(deviceCodeParam.getProjectId()); + String projectName = projectInfoService.selectName(deviceCodeParam.getProjectId()); + log.info("冷水机:{},状态:{},项目名称:{}", chillerAddr, data, projectName); + dataResultService.saveDataResultChiller(dataResultCh); + // dataResultService.deleteDataResultNow(deviceCodeParam.getDeviceAddr(), deviceCodeParam.getDeviceType(), deviceCodeParam.getRegisterAddr(), deviceCodeParam.getProjectId()); + log.info("冷水机保存成功!项目名称:{}", projectName); + } -//解析冷量表 -public void analysisCloudOrder485(final String data, final DeviceCodeParamEntity deviceCodeParam) { -// threadPoolService.execute(() -> { - //创建SimpleDateFormat对象,指定样式 2019-05-13 22:39:30 + //解析冷量表 + public void analysisCloudOrder485(final String data, final DeviceCodeParamEntity deviceCodeParam) { Date date = new Date(); - String dateStr = sdf1.format(date); String cloudId = deviceCodeParam.getDeviceAddr(); DataResultChEntity dataResultCh = new DataResultChEntity(); DataResultClEntity dataResultCl = new DataResultClEntity(); String registerAddr = deviceCodeParam.getRegisterAddr(); int grade = deviceCodeParam.getGrade(); + + // 使用Calendar进行更安全的时间处理 + Calendar cal = Calendar.getInstance(); + cal.setTime(date); + if (ExchangeStringUtil.isInDate(date, "00:00:00", "00:00:30")) { - dateStr = dateStr.substring(0, 17) + "00"; - System.out.println("插入时间00" + dateStr); + cal.set(Calendar.SECOND, 0); + date = cal.getTime(); + log.info("插入时间00: {}", sdf1.format(date)); } else if (ExchangeStringUtil.isInDate(date, "00:00:30", "00:00:59")) { - dateStr = dateStr.substring(0, 17) + "30"; - System.out.println("插入时间30" + dateStr); + cal.set(Calendar.SECOND, 30); + date = cal.getTime(); + log.info("插入时间30: {}", sdf1.format(date)); } + try { -// if (registerAddr.equals("0004") -// || registerAddr.equals("0020") -// || registerAddr.equals("0073") -// || registerAddr.equals("0075")) - if (grade>=140 && grade<=149) { + if (grade >= 140 && grade <= 149) { dataResultCh.setDeviceAddr(cloudId); dataResultCh.setDeviceType(deviceCodeParam.getDeviceType()); dataResultCh.setFunCode(deviceCodeParam.getFunCode()); - - dataResultCh.setCurDate(sdf1.parse(dateStr)); + dataResultCh.setCurDate(date); // 直接使用Date对象,避免再次解析 dataResultCh.setCurValue(data); dataResultCh.setRegisterAddr(deviceCodeParam.getRegisterAddr()); dataResultCh.setRegisterName(deviceCodeParam.getRegisterName()); dataResultCh.setGrade(deviceCodeParam.getGrade()); dataResultCh.setProjectId(deviceCodeParam.getProjectId()); - dataResultCh.setGrade(deviceCodeParam.getGrade()); String projectName = projectInfoService.selectName(deviceCodeParam.getProjectId()); log.info("冷量计==>{},寄存器地址==>{},读数==>{},项目名称==>{}", cloudId, registerAddr, dataResultCh.getCurValue(), projectName); dataResultService.saveDataResultCh(dataResultCh); log.info("冷量计瞬时冷量/流量保存数据库成功!项目名称:{}", projectName); - } -// else if (registerAddr.equals("0080") || registerAddr.equals("0077")) - else if (grade >= 40 && grade <= 49) - { + } else if (grade >= 40 && grade <= 49) { dataResultCl.setDeviceAddr(cloudId); dataResultCl.setDeviceType(deviceCodeParam.getDeviceType()); - dataResultCl.setCurDate(sdf1.parse(dateStr)); + dataResultCl.setCurDate(date); // 直接使用Date对象,避免再次解析 BigDecimal lData = new BigDecimal(data); - dataResultCl.setCurValue(lData);//字符串转整型 + dataResultCl.setCurValue(lData); dataResultCl.setRegisterAddr(deviceCodeParam.getRegisterAddr()); dataResultCl.setRegisterName(deviceCodeParam.getRegisterName()); dataResultCl.setGrade(deviceCodeParam.getGrade()); @@ -281,7 +277,7 @@ public void analysisCloudOrder485(final String data, final DeviceCodeParamEntity } catch (Exception e) { log.error("保存冷量计数据失败!", e); } -// }); -} + } + } diff --git a/user-service/src/main/resources/application-dev.yml b/user-service/src/main/resources/application-dev.yml index 1260637..d861a96 100644 --- a/user-service/src/main/resources/application-dev.yml +++ b/user-service/src/main/resources/application-dev.yml @@ -5,7 +5,7 @@ spring: name: jnd-user-service datasource: #添加allowMultiQueries=true 在批量更新时才不会出错 - url: jdbc:sqlserver://127.0.0.1:1433;DatabaseName=mh_jnd;allowMultiQueries=true + url: jdbc:sqlserver://127.0.0.1:1433;DatabaseName=mh_jnd2;allowMultiQueries=true driver-class-name: com.microsoft.sqlserver.jdbc.SQLServerDriver username: sa password: mh@803 diff --git a/user-service/src/test/java/com/mh/user/DealDataTest.java b/user-service/src/test/java/com/mh/user/DealDataTest.java index dfa2fdd..67c818e 100644 --- a/user-service/src/test/java/com/mh/user/DealDataTest.java +++ b/user-service/src/test/java/com/mh/user/DealDataTest.java @@ -3,6 +3,7 @@ package com.mh.user; import com.github.pagehelper.PageInfo; import com.mh.common.page.PageResult; import com.mh.user.dto.EnergyMomYoyDataDTO; +import com.mh.user.job.DealDataJob; import com.mh.user.mapper.chillers.DealDataMapper; import com.mh.user.model.EnergyModel; import com.mh.user.service.EnergyDataService; @@ -27,6 +28,23 @@ public class DealDataTest extends UserServiceApplicationTests { @Autowired private DealDataService dealDataService; + @Autowired + private DealDataJob dealDataJob; + + @Test + public void dealDataJob() throws InterruptedException { +// for (int i = 0; i < 10; i++) { +// dealDataJob.proDataResult("2025-08-26 05:45:00"); +// dealDataJob.proDataResult("2025-08-26 05:46:00"); +// dealDataJob.proDataResult("2025-08-26 05:47:00"); +// dealDataJob.proDataResult("2025-08-26 05:48:00"); +// dealDataJob.proDataResult("2025-08-26 05:49:00"); +// dealDataJob.proDataResult("2025-08-26 05:50:00"); +// Thread.sleep(60000); +// } + } + + @Test public void dealChillersData() { try {