package com.mh.quartz.task; import com.mh.common.constant.Constants; import com.mh.common.core.domain.entity.CollectionParamsManage; import com.mh.common.core.domain.entity.DeviceReport; import com.mh.common.core.redis.RedisCache; import com.mh.common.enums.ComputeEnum; import com.mh.common.utils.DateUtils; import com.mh.common.utils.StringUtils; import com.mh.framework.dealdata.DataProcessService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; import java.math.BigDecimal; import java.time.*; import java.time.temporal.ChronoUnit; import java.time.temporal.TemporalUnit; import java.util.*; import java.util.stream.Collectors; /** * @author LJF * @version 1.0 * @project EEMCS * @description 处理上来的数据报文 * @date 2025-02-10 14:19:36 */ @Slf4j @Component("dealDataTask") public class DealDataTask { private final RedisCache redisCache; private final DataProcessService dataProcessService; @Autowired public DealDataTask(RedisCache redisCache, DataProcessService dataProcessService) { this.redisCache = redisCache; this.dataProcessService = dataProcessService; } /** * * 每隔5分钟处理生成秒级报表存入data_min中 * 不要修改时间!!!不要修改时间!!!5分钟 */ public void dealDeviceData() { List cacheList = redisCache.getCacheList(Constants.DEVICE, CollectionParamsManage.class); if (null == cacheList || cacheList.isEmpty()) { return; } //清空redis redisCache.deleteObject(Constants.DEVICE); //处理chillers数据 try { //todo 处理没有对象curValue和curTime的异常 dealDeviceCollect(cacheList); } catch (Exception e) { log.error("处理主机参数异常:{}", e); } } private void dealDeviceCollect(List cacheList) { // 先根据表具类型进行分组 Map> groupMap = cacheList .stream() .collect(Collectors.groupingBy(CollectionParamsManage::getMtType)); // 开始进行数据遍历 for (Map.Entry> entry : groupMap.entrySet()) { String mtType = entry.getKey(); List dataList = entry.getValue(); // 再根据mtNum分组 Map> groupMap1 = dataList .stream() .collect(Collectors.groupingBy(CollectionParamsManage::getMtNum)); // 开始进行数据遍历 for (Map.Entry> entry1 : groupMap1.entrySet()) { List dataList1 = entry1.getValue(); // 进行数据处理入库操作等 try { dealAndInsert(dataList1, mtType); } catch (Exception e) { log.error("处理主机参数异常:{}", e); } } } } /** * 处理数据并添加到data_min中 * @param dataList * @param deviceType */ private void dealAndInsert(List dataList, String deviceType) { // 格式化时间点,然后取同样时间点的最大值 Map> collect = dataList.stream() .peek(val -> val.setCurTime(DateUtils.stringToDate(DateUtils.getTimeMin(val.getCurTime(), 5), "yyyy-MM-dd HH:mm:ss"))) .collect( Collectors.groupingBy( CollectionParamsManage::getCurTime, Collectors.maxBy(Comparator.comparing(CollectionParamsManage::getCurValue))) ); collect = sortMapByDate(collect); List dataMinList = new ArrayList<>(); //value循环处理数据 if (collect.size() > 0) { CollectionParamsManage entity = collect.values().stream().findFirst().get().get(); int deviceGrade = entity.getGrade(); //从数据库取值,当前的年表,当前年表没有,查询上一次年表 DeviceReport lastData = dataProcessService.queryLastValue(entity.getMtNum(), "min"); // int ratio = dataProcessService.queryRatio(entity.getMtNum()) == null ? 1 : dataProcessService.queryRatio(entity.getMtNum()); int ratio = entity.getMtRatio(); if (ObjectUtils.isEmpty(lastData) || ObjectUtils.isEmpty(lastData.getLastValue())) { //从device_manage取出初始值 String initValue = dataProcessService.queryInitValue(entity.getMtNum(), entity.getMtCode(), entity.getRegisterAddr()); if (StringUtils.isEmpty(initValue)) { initValue = "0"; } DeviceReport firstEntity = new DeviceReport(); firstEntity.setLastValue(initValue); firstEntity.setLastTime(entity.getCurTime()); firstEntity.setCurTime(entity.getCurTime()); firstEntity.setCurValue(entity.getCurValue().toString()); firstEntity.setDeviceNum(entity.getMtNum()); firstEntity.setDeviceCode(entity.getMtCode()); // 寄存器地址 firstEntity.setRegisterAddr(entity.getRegisterAddr()); firstEntity.setDeviceType(deviceType); firstEntity.setRatio(ratio); double usedValue = 0; try { usedValue = entity.getCurValue().doubleValue() - Double.parseDouble(initValue); } catch (NumberFormatException e) { log.error("数值格式解析异常:{}", e); } firstEntity.setUsedValue(String.valueOf(usedValue)); //区分瞬时值 if ((deviceGrade >= 100 && deviceGrade < 200) || (deviceGrade >= 1200 && deviceGrade < 1300) ) { firstEntity.setCalcValue(String.valueOf(entity.getCurValue().doubleValue() * ratio)); firstEntity.setGrade(1); } else { firstEntity.setCalcValue(String.valueOf(usedValue * ratio)); firstEntity.setGrade(0); } dataMinList.add(firstEntity); } else { Date lastDate = lastData.getLastTime(); String lastValue = lastData.getLastValue(); DeviceReport firstEntity = new DeviceReport(); firstEntity.setLastValue(lastValue); firstEntity.setLastTime(lastDate); firstEntity.setCurTime(entity.getCurTime()); firstEntity.setCurValue(entity.getCurValue().toString()); firstEntity.setDeviceNum(entity.getMtNum()); firstEntity.setDeviceCode(entity.getMtCode()); // 寄存器地址 firstEntity.setRegisterAddr(entity.getRegisterAddr()); firstEntity.setDeviceType(deviceType); firstEntity.setRatio(ratio); double usedValue = entity.getCurValue().doubleValue() - Double.parseDouble(lastValue); firstEntity.setUsedValue(String.valueOf(usedValue)); //区分瞬时值 //区分瞬时值 if ((deviceGrade >= 100 && deviceGrade < 200) || (deviceGrade >= 1200 && deviceGrade < 1300) ) { firstEntity.setCalcValue(String.valueOf(entity.getCurValue().doubleValue() * ratio)); firstEntity.setGrade(1); } else { firstEntity.setCalcValue(String.valueOf(usedValue * ratio)); firstEntity.setGrade(0); } dataMinList.add(firstEntity); } int i = 0, j = 0; for (Map.Entry> entry : collect.entrySet()) { if (entry.getValue().isPresent()) { if (i == 0) { i++; continue; } DeviceReport temp = new DeviceReport(); DeviceReport dataJ = dataMinList.get(j); CollectionParamsManage dataI = entry.getValue().get(); //复制实体类 // BeanUtils.copyProperties(dataI, temp); temp.setDeviceNum(dataI.getMtNum()); temp.setDeviceCode(dataI.getMtCode()); temp.setLastValue(dataJ.getCurValue()); temp.setLastTime(dataJ.getCurTime()); temp.setCurTime(dataI.getCurTime()); temp.setCurValue(dataI.getCurValue().toString()); double usedValue = Double.parseDouble(temp.getCurValue()) - Double.parseDouble(temp.getLastValue()); temp.setUsedValue(String.valueOf(usedValue)); //区分瞬时值 if ((deviceGrade >= 100 && deviceGrade < 200) || (deviceGrade >= 1200 && deviceGrade < 1300) ) { temp.setCalcValue(String.valueOf(Double.parseDouble(temp.getCurValue()) * ratio)); temp.setGrade(1); } else { temp.setCalcValue(String.valueOf(usedValue * ratio)); temp.setGrade(0); } temp.setDeviceType(deviceType); temp.setRatio(ratio); temp.setRegisterAddr(dataI.getRegisterAddr()); dataMinList.add(temp); i++; j++; } } //完全处理好数据,添加到data_min中 dataProcessService.insertDatabase(dataMinList); // 这里计算COP for (Map.Entry> entry : collect.entrySet()) { if (entry.getValue().isPresent()) { try { dataProcessService.calculateCopByTime(DateUtils.dateToString(entry.getKey(), "yyyy-MM-dd HH:mm:ss")); } catch (Exception e) { log.error("计算COP失败", e); } } } } } /** * 排序 * @param map * @return */ public static Map> sortMapByDate(Map> map) { return map.entrySet().stream() .sorted(Map.Entry.comparingByKey()) .collect(Collectors.toMap( Map.Entry::getKey, Map.Entry::getValue, (oldValue, newValue) -> oldValue, LinkedHashMap::new )); } /** * 每隔一个小时处理一次小时表数据,从data_min表中取值,grade=0的数据 * 加工聚合处理完成,插入data_hour表中 */ // @Scheduled(cron = "0 30 0/1 * * ?") public void dealDeviceDataHour() { Map map = dataProcessService.queryUntreatedData("min"); //需要处理的数据 List dataList = (List) map.get("data"); //处理完成后需要将数据设置为已处理,grade = 1, Map> idMap = (Map>) map.get("idMap"); //遍历分组 Map>> collect = getMeterNumMap(dataList, ChronoUnit.HOURS); //小时表数据集合 List hourList = new ArrayList<>(); //TODO 分组好的数据计算后插入数据表 for (Map.Entry>> entry : collect.entrySet()) { //表号 String key = entry.getKey(); Map> value = entry.getValue(); Set>> entries = value.entrySet(); String deviceType = null; for (Map.Entry> listEntry : entries) { deviceType = listEntry.getValue().get(0).getDeviceType(); break; } //分组计算,得到一组当前表号的以时间分组的数据,循环计算第一条的当前读数是上一条的上次读数 assert deviceType != null; List> dealDataList = ComputeEnum.get(Integer.parseInt(deviceType)).getDataList(entry); List> sortedList = dealDataList.stream() .sorted(Comparator.comparing(temMap -> temMap.keySet().iterator().next())) .collect(Collectors.toList()); //计算上次值,当前值 log.info("计算小时表数据:{}", sortedList); hourList = calcListData(hourList, key, "hour", sortedList); } //批量插入小时报表 try { if (hourList.isEmpty() || idMap.isEmpty()) { return; } dataProcessService.batchInsertOrUpdate(hourList, "hour"); } catch (Exception e) { log.error("小时数据表处理异常:" + e); } //TODO 修改分钟表的状态,grade=1 dataProcessService.batchUpdateGrade(idMap, "min"); } /** * 对数据集合计算上一次值,当前值 * * @param hourList * @param key * @param dealDataList */ private List calcListData(List hourList, String key, String tableType, List> dealDataList) { ArrayList tempList = new ArrayList<>(); for (int i = 0; i < dealDataList.size(); i++) { if (i == 0) { Map reportEntityMap = dealDataList.get(i); //由key已知是哪个表号 //判断第一个是否有历史数据,当年表有历史数据则取当年表,无则取上一年表,去年和今年表都无则取初始值 //TODO 从历史表查询上一次的读数,抄表时间 String lastValue = null; Date lastDate = null; //查询历史记录 DeviceReport hourEntity = dataProcessService.queryLastValue(key, tableType); if (ObjectUtils.isEmpty(hourEntity)) { //查询设备信息初始值 lastValue = dataProcessService.queryInitValue(key, null, null); if (StringUtils.isEmpty(lastValue)) { lastValue = "0"; } } else { lastValue = hourEntity.getLastValue(); lastDate = hourEntity.getLastTime(); } Set> entrySet = reportEntityMap.entrySet(); for (Map.Entry entityEntry : entrySet) { LocalDateTime curTime = entityEntry.getKey(); DeviceReport entity = entityEntry.getValue(); ZonedDateTime zonedDateTime = curTime.atZone(ZoneId.systemDefault()); Date date = Date.from(zonedDateTime.toInstant()); entity.setCurTime(date); entity.setLastValue(lastValue); if (lastDate == null) { entity.setLastTime(date); } else { entity.setLastTime(lastDate); } BigDecimal curValue = new BigDecimal(entity.getCurValue()); BigDecimal newLastValue = new BigDecimal(entity.getLastValue()); BigDecimal usedValue = curValue.subtract(newLastValue); int ratio = entity.getRatio(); entity.setRatio(ratio); BigDecimal calcValue = usedValue.multiply(BigDecimal.valueOf(ratio)); entity.setUsedValue(String.valueOf(usedValue)); entity.setCalcValue(String.valueOf(calcValue)); hourList.add(entity); tempList.add(entity); //只需要第一个实体类 break; } } else { //从上一条取当前读数和当前时间作为当前对象的上一次读数 int lastIndex = i - 1; DeviceReport lastEntity = tempList.get(lastIndex); Map curMap = dealDataList.get(i); Set> curEntrySet = curMap.entrySet(); for (Map.Entry curEntry : curEntrySet) { LocalDateTime curTime = curEntry.getKey(); DeviceReport curEntity = curEntry.getValue(); ZonedDateTime zonedDateTime = curTime.atZone(ZoneId.systemDefault()); Date date = Date.from(zonedDateTime.toInstant()); curEntity.setCurTime(date); curEntity.setLastTime(lastEntity.getCurTime()); curEntity.setLastValue(lastEntity.getCurValue()); BigDecimal curValue = new BigDecimal(curEntity.getCurValue()); BigDecimal lastValue = new BigDecimal(curEntity.getLastValue()); BigDecimal usedValue = curValue.subtract(lastValue); int ratio = lastEntity.getRatio(); curEntity.setRatio(ratio); BigDecimal calcValue = usedValue.multiply(BigDecimal.valueOf(ratio)); curEntity.setUsedValue(String.valueOf(usedValue)); curEntity.setCalcValue(String.valueOf(calcValue)); hourList.add(curEntity); tempList.add(curEntity); } } } return hourList; } /** * 遍历数据分组,按照表号分组,再根据时间戳分组 * 分组最大只能做到日分组,无法做到月和年,需要另外一个函数实现 * * @param dataList 需要处理的数据集 * @param interval 时间间隔 * @return */ private Map>> getMeterNumMap(List dataList, TemporalUnit interval) { // 获取 MinuteData 对象集合,其中的时间戳是 java.util.Date 类型 //1:数据格式为先按照表号分组,设备类型里的数据再按照小时分组 //2:小时分组的数据就是DeviceReportEntity实体类 return dataList.stream() .map(data -> { LocalDateTime dateTime = LocalDateTime.ofInstant(data.getCurTime().toInstant(), ZoneId.systemDefault()); data.setLocalDateTime(dateTime); return data; }) .sorted(Comparator.comparing(DeviceReport::getLocalDateTime)) .collect( //根据表号分组 Collectors.groupingBy(DeviceReport::getDeviceNum, //根据时间间隔分组,后续是否需要根据减少分组 Collectors.groupingBy(report -> report.getLocalDateTime().truncatedTo(interval))) ); } /*** * 处理生成日月年报表data_day等表 * 一日处理一次数据 凌晨3点计算 */ // @Scheduled(cron = "0 30 23 * * ?") //@Scheduled(cron = "0 0/1 * * * ?") public void dealDay2Year() { Map untreatedList = dataProcessService.queryUntreatedData("hour"); //需要处理的数据 List dataList = (List) untreatedList.get("data"); //处理完成后需要将数据设置为已处理,grade = 1, Map> idMap = (Map>) untreatedList.get("idMap"); //<表号:<时间:List<集合>>> 按照日区分 Map>> dayMap = getMeterNumMap(dataList, ChronoUnit.DAYS); List dayList = new ArrayList<>(); List monthList = new ArrayList<>(); // 使用流对每个内部的Map进行排序 dayMap.forEach((key, value) -> { Map> sortedMap = value.entrySet().stream() .sorted(Map.Entry.comparingByKey()) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (oldValue, newValue) -> oldValue, LinkedHashMap::new)); // 将排序后的Map更新回dayMap dayMap.put(key, sortedMap); }); dayList = getDeviceReportEntities(dayMap, dayList, "day"); //存入日表 try { dataProcessService.batchInsertOrUpdate(dayList, "day"); } catch (Exception e) { log.error("日表数据表处理异常:" + e); e.printStackTrace(); } // 按照月区分 //Map>> monthMap = getMeterNumMap(dataList, ChronoUnit.MONTHS); Map>> monthMap = getMeterNumMapByMonth(dataList, ChronoUnit.MONTHS); List monthInsert = getMonthListData(monthList, monthMap); dataProcessService.batchInsertOrUpdate(monthInsert, "month"); // 按照年区分 //Map>> yearMap = getMeterNumMapByMonth(dataList, ChronoUnit.YEARS); //List yearInsert = getMonthListData(monthList, yearMap); dataProcessService.batchInsertOrUpdate(monthInsert, "year"); //TODO 修改分钟表的状态,grade=1 dataProcessService.batchUpdateGrade(idMap, "hour"); } /** * 计算月和年报表 * * @param monthList * @param monthMap * @return */ private List getMonthListData(List monthList, Map>> monthMap) { Set>>> entries = monthMap.entrySet(); for (Map.Entry>> entry : entries) { String deviceNum = entry.getKey(); Map> value = entry.getValue(); List deviceList = new ArrayList<>(); List monthInsert = new ArrayList<>(); List tempList = new ArrayList<>(); Set>> monthEntries = value.entrySet(); for (Map.Entry> monthEntry : monthEntries) { //月份 YearMonth key = monthEntry.getKey(); //集合 List monthEntryValue = monthEntry.getValue(); //筛选出最大值和最小值 DeviceReport max = Collections.max(monthEntryValue, Comparator.comparing(DeviceReport::getLocalDateTime)); deviceList.add(max); } deviceList.sort(Comparator.comparing(DeviceReport::getLocalDateTime)); for (int i = 0; i < deviceList.size(); i++) { String lastValue; Date lastTime; if (i == 0) { DeviceReport entity = deviceList.get(i); DeviceReport lastEntity = dataProcessService.queryLastValue(deviceNum, "month"); if (ObjectUtils.isEmpty(lastEntity)) { lastValue = dataProcessService.queryInitValue(deviceNum, null, null); lastTime = entity.getCurTime(); } else { lastValue = lastEntity.getLastValue(); lastTime = lastEntity.getLastTime(); } entity.setLastTime(lastTime); entity.setLastValue(lastValue); BigDecimal curValue = new BigDecimal(entity.getCurValue()); BigDecimal bigDecimal = new BigDecimal(entity.getLastValue()); int ratio = entity.getRatio(); BigDecimal usedValue = curValue.subtract(bigDecimal); entity.setUsedValue(usedValue.toString()); BigDecimal calcValue = usedValue.multiply(BigDecimal.valueOf(ratio)); entity.setCalcValue(calcValue.toString()); monthInsert.add(entity); tempList.add(entity); } else { //从上一条取当前读数和当前时间作为当前对象的上一次读数 int lastIndex = i - 1; DeviceReport lastEntity = tempList.get(lastIndex); DeviceReport curEntity = deviceList.get(i); curEntity.setLastTime(lastEntity.getCurTime()); curEntity.setLastValue(lastEntity.getCurValue()); BigDecimal curValue = new BigDecimal(curEntity.getCurValue()); BigDecimal bigDecimal = new BigDecimal(curEntity.getLastValue()); BigDecimal usedValue = curValue.subtract(bigDecimal); int ratio = lastEntity.getRatio(); curEntity.setRatio(ratio); BigDecimal calcValue = usedValue.multiply(BigDecimal.valueOf(ratio)); curEntity.setUsedValue(String.valueOf(usedValue)); curEntity.setCalcValue(String.valueOf(calcValue)); monthInsert.add(curEntity); tempList.add(curEntity); } } monthList.addAll(tempList); } return monthList; } /** * 循环处理 * * @param dayMap * @param dayList * @param tableType * @return */ private List getDeviceReportEntities(Map>> dayMap, List dayList, String tableType) { for (Map.Entry>> entry : dayMap.entrySet()) { //表号 String key = entry.getKey(); Map> value = entry.getValue(); String deviceType = null; for (Map.Entry> listEntry : value.entrySet()) { deviceType = listEntry.getValue().get(0).getDeviceType(); } //取到的最大值 assert deviceType != null; List> dayMapList = ComputeEnum.get(Integer.parseInt(deviceType)).getDataList(entry); dayList = calcListData(dayList, key, tableType, dayMapList); } return dayList; } /** * 按照月、年分组 * * @param dataList * @param interval * @return */ private Map>> getMeterNumMapByMonth(List dataList, TemporalUnit interval) { return dataList.stream() .map(data -> { LocalDateTime dateTime = LocalDateTime.ofInstant(data.getCurTime().toInstant(), ZoneId.systemDefault()); data.setLocalDateTime(dateTime); return data; }) .collect(Collectors.groupingBy(DeviceReport::getDeviceNum, Collectors.groupingBy( report -> YearMonth.from(report.getLocalDateTime()), LinkedHashMap::new, Collectors.toList() ))); } /** * 处理冷水机组数据获取只要实时数据的设备 */ public void dealChillersData() { List cacheList = redisCache.getCacheList(Constants.CHILLERS, CollectionParamsManage.class); if (null == cacheList || cacheList.isEmpty()) { return; } //清空redis redisCache.deleteObject(Constants.CHILLERS); //处理chillers数据 try { //todo 处理没有对象curValue和curTime的异常 dealChillersCollect(cacheList); } catch (Exception e) { log.error("处理主机参数异常:{}", e); } } /** * 处理温湿度传感器数据获取进入chillers表 */ public void dealTempData() { List cacheList = redisCache.getCacheList(Constants.TEMP, CollectionParamsManage.class); if (null == cacheList || cacheList.isEmpty()) { return; } //清空redis redisCache.deleteObject(Constants.TEMP); //处理chillers数据 try { //todo 处理没有对象curValue和curTime的异常 dealChillersCollect(cacheList); } catch (Exception e) { log.error("处理主机参数异常:{}", e); } } /** * 处理主机秒级数据,再计算主机运行时间 * * @param cacheList */ private void dealChillersCollect(List cacheList) throws Exception { //插入报表,将历史数据插入chillers_data_min //1.插入register_id,当前值,当前时间,名字 dataProcessService.insertChillerReport(cacheList); } }