diff --git a/mh-framework/src/main/java/com/mh/framework/dealdata/DataProcessService.java b/mh-framework/src/main/java/com/mh/framework/dealdata/DataProcessService.java index b6bdfac..2918d97 100644 --- a/mh-framework/src/main/java/com/mh/framework/dealdata/DataProcessService.java +++ b/mh-framework/src/main/java/com/mh/framework/dealdata/DataProcessService.java @@ -1,6 +1,7 @@ package com.mh.framework.dealdata; import com.mh.common.core.domain.entity.ChillersEntity; +import com.mh.common.core.domain.entity.CollectionParamsManage; import com.mh.common.core.domain.entity.DeviceReportEntity; import com.mh.common.model.request.AdvantechReceiver; @@ -93,7 +94,7 @@ public interface DataProcessService { * 插入主机历史流水表 * @param cacheList */ - void insertChillerReport(List cacheList); + void insertChillerReport(List cacheList); /** * 批量更新运行时长 diff --git a/mh-framework/src/main/java/com/mh/framework/dealdata/impl/DataProcessServiceImpl.java b/mh-framework/src/main/java/com/mh/framework/dealdata/impl/DataProcessServiceImpl.java index 1678581..53ceb5a 100644 --- a/mh-framework/src/main/java/com/mh/framework/dealdata/impl/DataProcessServiceImpl.java +++ b/mh-framework/src/main/java/com/mh/framework/dealdata/impl/DataProcessServiceImpl.java @@ -13,6 +13,7 @@ import com.mh.common.utils.DateUtils; import com.mh.common.utils.EnergyThreadPoolService; import com.mh.common.utils.StringUtils; import com.mh.framework.dealdata.DataProcessService; +import com.mh.system.mapper.device.DataProcessMapper; import com.mh.system.mapper.device.DatabaseMapper; import com.mh.system.service.device.ICollectionParamsManageService; import jakarta.annotation.Resource; @@ -62,6 +63,9 @@ public class DataProcessServiceImpl implements DataProcessService { @Autowired private ICollectionParamsManageService collectionParamsManageService; + @Autowired + private DataProcessMapper dataProcessMapper; + ThreadPoolExecutor threadPoolService = EnergyThreadPoolService.getInstance(); @Override @@ -190,8 +194,21 @@ public class DataProcessServiceImpl implements DataProcessService { } @Override - public void insertChillerReport(List cacheList) { - + public void insertChillerReport(List cacheList) { + // 根据时间排序 + cacheList.sort(Comparator.comparing(CollectionParamsManage::getCurTime)); + // 批量插入到chillers_data_min表 + Calendar calendar = Calendar.getInstance(); + calendar.setTime(new Date()); + int year = calendar.get(Calendar.YEAR); + String tableName = "chillers_data_min" + year; + int batchSize = 10; + // 分页查询并插入数据 + for (int i = 0; i < cacheList.size(); i += batchSize) { + List batchList = cacheList.subList(i, Math.min(i + batchSize, cacheList.size())); + // 执行插入语句 + dataProcessMapper.batchInsertChiller(batchList, tableName); + } } @Override diff --git a/mh-quartz/pom.xml b/mh-quartz/pom.xml index 4bdddac..29176e5 100644 --- a/mh-quartz/pom.xml +++ b/mh-quartz/pom.xml @@ -35,6 +35,11 @@ mh-common + + com.mh + mh-framework + + \ No newline at end of file diff --git a/mh-quartz/src/main/java/com/mh/quartz/task/DealDataTask.java b/mh-quartz/src/main/java/com/mh/quartz/task/DealDataTask.java new file mode 100644 index 0000000..02809fd --- /dev/null +++ b/mh-quartz/src/main/java/com/mh/quartz/task/DealDataTask.java @@ -0,0 +1,64 @@ +package com.mh.quartz.task; + +import com.mh.common.core.domain.entity.CollectionParamsManage; +import com.mh.common.core.redis.RedisCache; +import com.mh.framework.dealdata.DataProcessService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.time.Duration; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.*; +import java.util.stream.Collectors; + +/** + * @author LJF + * @version 1.0 + * @project EEMCS + * @description 处理上来的数据报文 + * @date 2025-02-10 14:19:36 + */ +@Slf4j +@Component("dealDataTask") +public class DealDataTask { + + private final RedisCache redisCache; + + private final DataProcessService dataProcessService; + + @Autowired + public DealDataTask(RedisCache redisCache, DataProcessService dataProcessService) { + this.redisCache = redisCache; + this.dataProcessService = dataProcessService; + } + + + public void dealDeviceData() { + List cacheList = redisCache.getCacheList("CHILLERS", CollectionParamsManage.class); + if (null == cacheList || cacheList.isEmpty()) { + return; + } + //清空redis + redisCache.deleteObject("CHILLERS"); + //处理chillers数据 + try { + //todo 处理没有对象curValue和curTime的异常 + dealChillersCollect(cacheList); + } catch (Exception e) { + log.error("处理主机参数异常:{}", e); + } + } + + /** + * 处理主机秒级数据,再计算主机运行时间 + * + * @param cacheList + */ + private void dealChillersCollect(List cacheList) throws Exception { + //插入报表,将历史数据插入chillers_data_min + //1.插入register_id,当前值,当前时间,名字 + dataProcessService.insertChillerReport(cacheList); + } +} diff --git a/mh-system/src/main/java/com/mh/system/mapper/device/DataProcessMapper.java b/mh-system/src/main/java/com/mh/system/mapper/device/DataProcessMapper.java new file mode 100644 index 0000000..3bfb4be --- /dev/null +++ b/mh-system/src/main/java/com/mh/system/mapper/device/DataProcessMapper.java @@ -0,0 +1,30 @@ +package com.mh.system.mapper.device; + +import com.mh.common.core.domain.entity.CollectionParamsManage; +import org.apache.ibatis.annotations.*; + +import java.util.List; + +/** + * @Author : ljf + * @date : 2025-02-10 + */ +@Mapper +public interface DataProcessMapper { + + /** + * 批量插入冷水机组分钟记录保存表 + * + * @param batchList + * @param tableName + */ + @Select("") + void batchInsertChiller(@Param("batchList") List batchList, @Param("tableName") String tableName); + +} diff --git a/mh-system/src/main/java/com/mh/system/mapper/device/GatewayManageMapper.java b/mh-system/src/main/java/com/mh/system/mapper/device/GatewayManageMapper.java index d1cb6d6..2de5540 100644 --- a/mh-system/src/main/java/com/mh/system/mapper/device/GatewayManageMapper.java +++ b/mh-system/src/main/java/com/mh/system/mapper/device/GatewayManageMapper.java @@ -16,6 +16,7 @@ import org.apache.ibatis.annotations.Update; @Mapper public interface GatewayManageMapper extends BaseMapper { - @Update("update gateway_manage set status = 0 where id = #{gatewayId}") + @Update("update gateway_manage set status = 0, connect_time = CURRENT_TIMESTAMP where id = #{gatewayId}") void updateOnlineStatus(@Param("gatewayId") String gatewayId); + }