diff --git a/mh-admin/src/test/java/com/mh/MHApplicationTest.java b/mh-admin/src/test/java/com/mh/MHApplicationTest.java index 02d694a..4937322 100644 --- a/mh-admin/src/test/java/com/mh/MHApplicationTest.java +++ b/mh-admin/src/test/java/com/mh/MHApplicationTest.java @@ -1,6 +1,7 @@ package com.mh; import com.mh.common.core.domain.entity.SysUser; +import com.mh.quartz.task.DealDataTask; import com.mh.system.service.ISysUserService; import com.mh.system.service.device.IDeviceQrManageService; import org.checkerframework.checker.units.qual.A; @@ -30,4 +31,22 @@ public class MHApplicationTest { System.out.println(sysUser); deviceQrManageService.createQrCode(10, "admin"); } + + @Autowired + private DealDataTask dealDeviceData; + + @Test + public void dealDeviceData() { + dealDeviceData.dealDeviceData(); + } + + @Test + public void dealDeviceDataHour() { + dealDeviceData.dealDeviceDataHour(); + } + + @Test + public void dealDay2Year() { + dealDeviceData.dealDay2Year(); + } } diff --git a/mh-common/src/main/java/com/mh/common/enums/ComputeEnum.java b/mh-common/src/main/java/com/mh/common/enums/ComputeEnum.java new file mode 100644 index 0000000..6f5aeeb --- /dev/null +++ b/mh-common/src/main/java/com/mh/common/enums/ComputeEnum.java @@ -0,0 +1,114 @@ +package com.mh.common.enums; + +import com.mh.common.core.domain.entity.DeviceReportEntity; + +import java.time.LocalDateTime; +import java.util.*; + +/** + * @Author : Rainbow + * @date : 2023/6/19 + */ +public enum ComputeEnum implements ComputeService { + + /** + * 电表 + */ + DEVICES("计量设备数据处理", 5) { + @Override + public ArrayList> getDataList( + Map.Entry>> entry) { + + ArrayList> result = new ArrayList<>(); + + //获取到电表的数据,按照表号分组分组,紧接着再按照小时分组。需要计算分组后的数据取出最大值 + Map> deviceMap = entry.getValue(); + String deviceNum = entry.getKey(); + Set>> groupEntryList = deviceMap.entrySet(); + for (Map.Entry> listEntry : groupEntryList) { + LocalDateTime key = listEntry.getKey(); + List value = listEntry.getValue(); + DeviceReportEntity maxEntity = value.stream() + .max(Comparator.comparing(obj -> Double.valueOf(obj.getCurValue()))) + .orElse(null); + HashMap map = new HashMap<>(); + map.put(key, maxEntity); + result.add(map); + } + + return result; + } + }, + /** + * 冷量计 + */ + CLOUD("冷量计数据处理", 2) { + @Override + public ArrayList> getDataList( + Map.Entry>> entry) { + ArrayList> result = new ArrayList<>(); + + //获取到电表的数据,按照表号分组分组,紧接着再按照小时分组。需要计算分组后的数据取出最大值 + Map> deviceMap = entry.getValue(); + String deviceNum = entry.getKey(); + Set>> groupEntryList = deviceMap.entrySet(); + for (Map.Entry> listEntry : groupEntryList) { + LocalDateTime key = listEntry.getKey(); + List value = listEntry.getValue(); + DeviceReportEntity maxEntity = value.stream() + .max(Comparator.comparing(obj -> Double.valueOf(obj.getCurValue()))) + .orElse(null); + HashMap map = new HashMap<>(); + map.put(key, maxEntity); + result.add(map); + } + + return result; + } + }, + /** + * 温度计 + */ + COLD("温度计数据处理", 1) { + @Override + public ArrayList> getDataList( + Map.Entry>> entry) { + return null; + } + }; + + private String des; + private int key; + + private static final Map lookup = new HashMap<>(); + + static { + for (ComputeEnum c : ComputeEnum.values()) { + lookup.put(c.getKey(), c); + } + } + + ComputeEnum(String des, int key) { + this.des = des; + this.key = key; + } + + public String getDes() { + return des; + } + + public void setDes(String des) { + this.des = des; + } + + public int getKey() { + return key; + } + + public static ComputeEnum get(int key) { + return lookup.get(key); + } + + public abstract ArrayList> getDataList( + Map.Entry>> entry); +} diff --git a/mh-common/src/main/java/com/mh/common/enums/ComputeService.java b/mh-common/src/main/java/com/mh/common/enums/ComputeService.java new file mode 100644 index 0000000..c9c4445 --- /dev/null +++ b/mh-common/src/main/java/com/mh/common/enums/ComputeService.java @@ -0,0 +1,24 @@ +package com.mh.common.enums; + +import com.mh.common.core.domain.entity.DeviceReportEntity; + +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * @Author : Rainbow + * @date : 2023/6/19 + */ +public interface ComputeService { + + + /** + * 处理并返回小时表的数据 + * @param data + * @return + */ + ArrayList> getDataList( + Map.Entry>> data); +} diff --git a/mh-common/src/main/java/com/mh/common/utils/DateUtils.java b/mh-common/src/main/java/com/mh/common/utils/DateUtils.java index 14cddd9..3fc96d9 100644 --- a/mh-common/src/main/java/com/mh/common/utils/DateUtils.java +++ b/mh-common/src/main/java/com/mh/common/utils/DateUtils.java @@ -8,6 +8,7 @@ import java.text.SimpleDateFormat; import java.time.*; import java.time.format.DateTimeFormatter; import java.time.temporal.TemporalAdjusters; +import java.util.Calendar; import java.util.Date; import com.mh.common.core.domain.vo.EnergyConsumptionVO; @@ -331,5 +332,58 @@ public class DateUtils extends org.apache.commons.lang3.time.DateUtils } return timeLen; } + /** + * 把日期格式化为字符串 + * + * @param date 日期 + * @param format 格式 + * @return 返回格式化之后的字符串 + */ + public static Date stringToDate(String date, String format) { + SimpleDateFormat dateFormat = new SimpleDateFormat(format); + try { + return dateFormat.parse(date); + } catch (ParseException e) { + return null; + } + } + + /** + * 获取每隔多少分钟的格式化时间 + * 传入 date "2023-07-31 16:23:00" minuteInter 5分钟 + * 返回 "2023-07-31 16:20:00" 整点 + * + * @param date 时间 + * @param minuteInterval 间隔 + * @return 时间 + */ + public static String getTimeMin(Date date, int minuteInterval) { + if (minuteInterval <= 0 || minuteInterval > 30) { + throw new RuntimeException("minute参数错误"); + } + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + String format = null; + format = sdf.format(date); + Calendar calendar = Calendar.getInstance(); + calendar.setTime(date); + int minute = calendar.get(Calendar.MINUTE); + int num = minute / minuteInterval; + String curDate = format.substring(0, 13) + ":" + (num * minuteInterval) + ":00"; +// System.out.println("curDate = " + curDate); + return curDate; + } + + /** + * 把日期格式化为字符串 + * + * @param date 日期 + * @param format 格式 + * @return 返回格式化之后的字符串 + */ + public static String dateToString(Date date, String format) { + SimpleDateFormat dateFormat = new SimpleDateFormat(format); + return dateFormat.format(date); + } + } diff --git a/mh-framework/src/main/java/com/mh/framework/dealdata/impl/DataProcessServiceImpl.java b/mh-framework/src/main/java/com/mh/framework/dealdata/impl/DataProcessServiceImpl.java index 53ceb5a..385f319 100644 --- a/mh-framework/src/main/java/com/mh/framework/dealdata/impl/DataProcessServiceImpl.java +++ b/mh-framework/src/main/java/com/mh/framework/dealdata/impl/DataProcessServiceImpl.java @@ -155,22 +155,147 @@ public class DataProcessServiceImpl implements DataProcessService { @Override public String queryInitValue(String deviceNum) { - return ""; + return dataProcessMapper.queryInitValue(deviceNum); } @Override public DeviceReportEntity queryLastValue(String deviceNum, String type) { - return null; + //先查询今年的表,今年的表没有,再查上一年的表 + Calendar calendar = Calendar.getInstance(); + calendar.setTime(new Date()); + int year = calendar.get(Calendar.YEAR); + String curTable = ""; + String lastTable = ""; + if ("month".equalsIgnoreCase(type) || "year".equalsIgnoreCase(type)) { + curTable = "data_" + type; + lastTable = "data_" + type; + } else { + curTable = "data_" + type + year; + lastTable = "data_" + type + (year - 1); + } + DeviceReportEntity entity = null; + entity = dataProcessMapper.queryLastValue(deviceNum, curTable); + if (entity == null) { + try { + entity = dataProcessMapper.queryLastValue(deviceNum, lastTable); + } catch (Exception e) { + log.error("没有当前表:{}", lastTable); + e.printStackTrace(); + } + } + return entity; } @Override public void insertDatabase(List dataMinList) { - + log.info("插入data_min数据,数据大小==>{}", dataMinList.size()); + Calendar calendar = Calendar.getInstance(); + //时间格式化0和5结尾的时间 + int batchSize = 10; + // 拿出第一条数据,看看表中是否存在值,存在则进行update操作 + if (dataMinList.size() > 0) { + DeviceReportEntity deviceReportEntity = dataMinList.getFirst(); + String yearStr = "data_min" + DateUtils.dateToString(deviceReportEntity.getCurTime(), "yyyy-MM-dd HH:mm:ss").substring(0, 4); + DeviceReportEntity value = dataProcessMapper.isHaveData(deviceReportEntity,yearStr); + try { + if (value!= null && value.getId() > 0) { + // 存在则进行更新操作 + BigDecimal bigDecimal = new BigDecimal(deviceReportEntity.getCurValue()); + BigDecimal usedValue = bigDecimal.subtract(new BigDecimal(value.getLastValue())).setScale(2, RoundingMode.HALF_UP); + BigDecimal calcValue = usedValue.multiply(new BigDecimal(value.getRatio())).setScale(2, RoundingMode.HALF_UP); + + deviceReportEntity.setId(value.getId()); + int grade = deviceReportEntity.getGrade(); + //区分瞬时值 + if (grade == 1) { + deviceReportEntity.setUsedValue(deviceReportEntity.getCurValue()); + deviceReportEntity.setCalcValue(deviceReportEntity.getCurValue()); + } else { + deviceReportEntity.setUsedValue(String.valueOf(usedValue)); + deviceReportEntity.setCalcValue(String.valueOf(calcValue)); + } + dataProcessMapper.updateDataMinByTime(deviceReportEntity, yearStr); + } else { + // 不存在则进行插入操作 + dataProcessMapper.insertDataMin(dataMinList.subList(0, 1), yearStr); + } + } catch (Exception e) { + log.error("插入dataMin失败: ", e); + } + } + // 分页查询并插入数据 + for (int i = 1; i < dataMinList.size(); i += batchSize) { + List batchList = dataMinList.subList(i, Math.min(i + batchSize, dataMinList.size())); + // 执行插入语句 + calendar.setTime(new Date()); + int year = calendar.get(Calendar.YEAR); + String curTable = "data_min" + year; + try { + dataProcessMapper.insertDataMin(batchList, curTable); + } catch (Exception e) { + log.error("batchList:{},curTable:{},date: {}, Exception: ",batchList,curTable,LocalTime.now(),e); + } + // 如果需要提交事务可以在这里提交 + } } + /** + * 聚合计算生成小时表数据 + * 1:从分钟表按照小时为一个区间把数据分成很多区间,grade=0表示未处理 + * 2:每个区间只取最高值和当前时间,并格式化当前时间为小时格式小时,小时后面全部为零 + * 3:得到每个区间的最高值,从第一个开始遍历计算,判断小时表有无数据,按年分表可能需要连表查询 + * 4:如果小时表中有关于这个设备的数据,取最后一条的当前读数作为上次读数,当前抄表时间为上次抄表时间 + * 5:如果小时表无数据,则表示此设备数据第一次入库,取当前设备初始值:device_manage取initial_value + * 6: 开始循环遍历,第一条数据的上一次读数和上一次抄表时间以小时表的最后一条计算或者初始值计算 + * 7:第二条则以第一条的当前读数作为第二条的上一次读数,直到循环结束,存放到集合中 + * 8:批量将集合插入小时表中 + * + * @return + */ @Override public Map queryUntreatedData(String type) { - return Map.of(); + //判断是否需要连表 + Calendar now = Calendar.getInstance(); + int year = now.get(Calendar.YEAR); + int month = now.get(Calendar.MONTH) + 1; + int day = now.get(Calendar.DATE); + int hour = now.get(Calendar.HOUR_OF_DAY); + String curTable = "data_" + type + year; + List curResults = null; + List lastResults = null; + HashMap resultMap = new HashMap<>(); + if (month == 1 && day == 1 && hour == 0) { + // 是当前年份的1月1日0点到1点,需要拼接表 + System.out.println(year + "年1月1日0点到1点"); + String lastTable = "data_" + type + (year - 1); + curResults = dataProcessMapper.queryUntreatedData(curTable); + lastResults = dataProcessMapper.queryUntreatedData(lastTable); + + // 复制数据 + lastResults.addAll(curResults); + // BeanUtils.copyProperties(curResults, lastResults); + HashMap> map = new HashMap<>(); + List curIdList = curResults.stream().map(DeviceReportEntity::getId).collect(Collectors.toList()); + List lastIdList = lastResults.stream().map(DeviceReportEntity::getId).collect(Collectors.toList()); + map.put("lastIds", lastIdList); + map.put("curIds", curIdList); + + resultMap.put("data", lastResults); + resultMap.put("idMap", map); + + } else { + //不需要拼接表 + System.out.println("不是当前年份的1月1日0点到1点"); + curResults = dataProcessMapper.queryUntreatedData(curTable); + List ids = curResults.stream().map(DeviceReportEntity::getId).collect(Collectors.toList()); + HashMap> map = new HashMap<>(); + map.put("curIds", ids); + + resultMap.put("data", curResults); + resultMap.put("idMap", map); + + } + return resultMap; } @Override @@ -180,12 +305,85 @@ public class DataProcessServiceImpl implements DataProcessService { @Override public void batchUpdateGrade(Map> idMap, String type) { - + Calendar calendar = Calendar.getInstance(); + calendar.setTime(new Date()); + int year = calendar.get(Calendar.YEAR); + Set>> entries = idMap.entrySet(); + for (Map.Entry> entry : entries) { + String key = entry.getKey(); + List value = entry.getValue(); + if (key.equals("curIds")) { + String tableName = "data_" + type + year; + // 分批更新数据 + int batchSize = 1000; + int totalSize = value.size(); + int batchCount = (int) Math.ceil((double) totalSize / batchSize); + for (int i = 0; i < batchCount; i++) { + int startIndex = i * batchSize; + int endIndex = Math.min((i + 1) * batchSize, totalSize); + List batchList = value.subList(startIndex, endIndex); + dataProcessMapper.batchUpdateGrade(batchList, tableName, 1); + } + } else if (key.equals("lastIds")) { + String lastTable = "data_" + type + (year - 1); + // 分批更新数据 + int batchSize = 1000; + int totalSize = value.size(); + int batchCount = (int) Math.ceil((double) totalSize / batchSize); + for (int i = 0; i < batchCount; i++) { + int startIndex = i * batchSize; + int endIndex = Math.min((i + 1) * batchSize, totalSize); + List batchList = value.subList(startIndex, endIndex); + dataProcessMapper.batchUpdateGrade(batchList, lastTable, 1); + } + } + } } @Override - public void batchInsertOrUpdate(List dataList, String tableType) { - + public void batchInsertOrUpdate(List dataList, String type) { + String tableName; + for (DeviceReportEntity data : dataList) { + //不分表 月、年 + tableName = "data_" + type; + int num = 0; + switch (type) { + case "month": + num = dataProcessMapper.selectDataByMM(tableName, data.getCurTime(), data.getDeviceNum()); + break; + case "year": + num = dataProcessMapper.selectDataByYY(tableName, data.getCurTime(), data.getDeviceNum()); + break; + case "day": + tableName = tableName + DateUtils.dateToString(data.getCurTime(), "yyyy"); + num = dataProcessMapper.selectDataByDD(tableName, data.getCurTime(), data.getDeviceNum()); + break; + case "hour": + tableName = tableName + DateUtils.dateToString(data.getCurTime(), "yyyy"); + num = dataProcessMapper.selectDataByHH(tableName, data.getCurTime(), data.getDeviceNum()); + break; + } + if (num == 0) { + //insert + dataProcessMapper.insertTable(data, tableName); + } else { + //update + switch (type) { + case "month": + dataProcessMapper.updateTableMM(data, tableName); + break; + case "year": + dataProcessMapper.updateTableYY(data, tableName); + break; + case "day": + dataProcessMapper.updateTableDD(data, tableName); + break; + case "hour": + dataProcessMapper.updateTableHH(data, tableName); + break; + } + } + } } @Override diff --git a/mh-quartz/src/main/java/com/mh/quartz/task/DealDataTask.java b/mh-quartz/src/main/java/com/mh/quartz/task/DealDataTask.java index 02809fd..a03460b 100644 --- a/mh-quartz/src/main/java/com/mh/quartz/task/DealDataTask.java +++ b/mh-quartz/src/main/java/com/mh/quartz/task/DealDataTask.java @@ -1,15 +1,23 @@ package com.mh.quartz.task; import com.mh.common.core.domain.entity.CollectionParamsManage; +import com.mh.common.core.domain.entity.DeviceReportEntity; import com.mh.common.core.redis.RedisCache; +import com.mh.common.enums.ComputeEnum; +import com.mh.common.utils.DateUtils; import com.mh.framework.dealdata.DataProcessService; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; +import org.springframework.util.ObjectUtils; -import java.time.Duration; -import java.time.LocalDateTime; +import java.math.BigDecimal; +import java.time.*; import java.time.format.DateTimeFormatter; +import java.time.temporal.ChronoUnit; +import java.time.temporal.TemporalUnit; import java.util.*; import java.util.stream.Collectors; @@ -34,8 +42,550 @@ public class DealDataTask { this.dataProcessService = dataProcessService; } - + /** + * + * 每隔5分钟处理生成秒级报表存入data_min中 + * 不要修改时间!!!不要修改时间!!!5分钟 + */ public void dealDeviceData() { + List cacheList = redisCache.getCacheList("DEVICES", CollectionParamsManage.class); + if (null == cacheList || cacheList.isEmpty()) { + return; + } + //清空redis +// redisCache.delete Object("DEVICES"); + //处理chillers数据 + try { + //todo 处理没有对象curValue和curTime的异常 + dealDeviceCollect(cacheList); + } catch (Exception e) { + log.error("处理主机参数异常:{}", e); + } + + } + + private void dealDeviceCollect(List cacheList) { + // 先根据表具类型进行分组 + Map> groupMap = cacheList + .stream() + .collect(Collectors.groupingBy(CollectionParamsManage::getMtType)); + // 开始进行数据遍历 + for (Map.Entry> entry : groupMap.entrySet()) { + String mtType = entry.getKey(); + List dataList = entry.getValue(); + // 进行数据处理入库操作等 + try { + dealAndInsert(dataList, mtType); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + + /** + * 处理数据并添加到data_min中 + * @param dataList + * @param deviceType + */ + private void dealAndInsert(List dataList, String deviceType) { + // 格式化时间点,然后取同样时间点的最大值 + Map> collect = dataList.stream() + .peek(val -> val.setCurTime(DateUtils.stringToDate(DateUtils.getTimeMin(val.getCurTime(), 5), "yyyy-MM-dd HH:mm:ss"))) + .collect( + Collectors.groupingBy( + CollectionParamsManage::getCurTime, + Collectors.maxBy(Comparator.comparing(CollectionParamsManage::getCurValue))) + ); + collect = sortMapByDate(collect); + List dataMinList = new ArrayList<>(); + //value循环处理数据 + if (collect.size() > 0) { + CollectionParamsManage entity = collect.values().stream().findFirst().get().get(); + int deviceGrade = entity.getGrade(); + //从数据库取值,当前的年表,当前年表没有,查询上一次年表 + DeviceReportEntity lastData = dataProcessService.queryLastValue(entity.getMtNum(), "min"); + int ratio = dataProcessService.queryRatio(entity.getMtNum()) == null ? 1 : dataProcessService.queryRatio(entity.getMtNum()); + if (ObjectUtils.isEmpty(lastData) || ObjectUtils.isEmpty(lastData.getLastValue())) { + //从device_manage取出初始值 + String initValue = dataProcessService.queryInitValue(entity.getMtNum()); + DeviceReportEntity firstEntity = new DeviceReportEntity(); + firstEntity.setLastValue(initValue); + firstEntity.setLastTime(entity.getCurTime()); + firstEntity.setCurTime(entity.getCurTime()); + firstEntity.setCurValue(entity.getCurValue().toString()); + firstEntity.setDeviceNum(entity.getMtNum()); + firstEntity.setDeviceCode(entity.getMtCode()); + firstEntity.setDeviceType(deviceType); + firstEntity.setRatio(ratio); + double usedValue = entity.getCurValue().doubleValue() - Double.parseDouble(initValue); + firstEntity.setUsedValue(String.valueOf(usedValue)); + //区分瞬时值 + if ((deviceGrade >= 100 && deviceGrade < 200) || (deviceGrade >= 1200 && deviceGrade < 1300) ) { + firstEntity.setCalcValue(String.valueOf(entity.getCurValue().doubleValue() * ratio)); + firstEntity.setGrade(1); + } else { + firstEntity.setCalcValue(String.valueOf(usedValue * ratio)); + firstEntity.setGrade(0); + } + dataMinList.add(firstEntity); + } else { + Date lastDate = lastData.getLastTime(); + String lastValue = lastData.getLastValue(); + DeviceReportEntity firstEntity = new DeviceReportEntity(); + firstEntity.setLastValue(lastValue); + firstEntity.setLastTime(lastDate); + firstEntity.setCurTime(entity.getCurTime()); + firstEntity.setCurValue(entity.getCurValue().toString()); + firstEntity.setDeviceNum(entity.getMtNum()); + firstEntity.setDeviceCode(entity.getMtCode()); + firstEntity.setDeviceType(deviceType); + firstEntity.setRatio(ratio); + double usedValue = entity.getCurValue().doubleValue() - Double.parseDouble(lastValue); + firstEntity.setUsedValue(String.valueOf(usedValue)); + //区分瞬时值 + //区分瞬时值 + if ((deviceGrade >= 100 && deviceGrade < 200) || (deviceGrade >= 1200 && deviceGrade < 1300) ) { + firstEntity.setCalcValue(String.valueOf(entity.getCurValue().doubleValue() * ratio)); + firstEntity.setGrade(1); + } else { + firstEntity.setCalcValue(String.valueOf(usedValue * ratio)); + firstEntity.setGrade(0); + } + dataMinList.add(firstEntity); + } + + int i = 0, j = 0; + for (Map.Entry> entry : collect.entrySet()) { + if (entry.getValue().isPresent()) { + if (i == 0) { + i++; + continue; + } + DeviceReportEntity temp = new DeviceReportEntity(); + DeviceReportEntity dataJ = dataMinList.get(j); + CollectionParamsManage dataI = entry.getValue().get(); + //复制实体类 +// BeanUtils.copyProperties(dataI, temp); + + temp.setDeviceNum(dataI.getMtNum()); + temp.setDeviceCode(dataI.getMtCode()); + + temp.setLastValue(dataJ.getCurValue()); + temp.setLastTime(dataJ.getCurTime()); + + temp.setCurTime(dataI.getCurTime()); + temp.setCurValue(dataI.getCurValue().toString()); + double usedValue = Double.parseDouble(temp.getCurValue()) - Double.parseDouble(temp.getLastValue()); + temp.setUsedValue(String.valueOf(usedValue)); + //区分瞬时值 + if ((deviceGrade >= 100 && deviceGrade < 200) || (deviceGrade >= 1200 && deviceGrade < 1300) ) { + temp.setCalcValue(String.valueOf(Double.parseDouble(temp.getCurValue()) * ratio)); + temp.setGrade(1); + } else { + temp.setCalcValue(String.valueOf(usedValue * ratio)); + temp.setGrade(0); + } + temp.setDeviceType(deviceType); + temp.setRatio(ratio); + dataMinList.add(temp); + i++; + j++; + } + } + + //完全处理好数据,添加到data_min中 + dataProcessService.insertDatabase(dataMinList); + // 这里计算COP + for (Map.Entry> entry : collect.entrySet()) { + if (entry.getValue().isPresent()) { + try { + dataProcessService.calculateCopByTime(DateUtils.dateToString(entry.getKey(), "yyyy-MM-dd HH:mm:ss")); + } catch (Exception e) { + log.error("计算COP失败", e); + } + } + } + } + } + + /** + * 排序 + * @param map + * @return + */ + public static Map> sortMapByDate(Map> map) { + return map.entrySet().stream() + .sorted(Map.Entry.comparingByKey()) + .collect(Collectors.toMap( + Map.Entry::getKey, + Map.Entry::getValue, + (oldValue, newValue) -> oldValue, + LinkedHashMap::new + )); + } + + /** + * 每隔一个小时处理一次小时表数据,从data_min表中取值,grade=0的数据 + * 加工聚合处理完成,插入data_hour表中 + */ +// @Scheduled(cron = "0 30 0/1 * * ?") + public void dealDeviceDataHour() { + Map map = dataProcessService.queryUntreatedData("min"); + //需要处理的数据 + List dataList = (List) map.get("data"); + //处理完成后需要将数据设置为已处理,grade = 1, + Map> idMap = (Map>) map.get("idMap"); + + //遍历分组 + Map>> collect = getMeterNumMap(dataList, ChronoUnit.HOURS); + + //小时表数据集合 + List hourList = new ArrayList<>(); + //TODO 分组好的数据计算后插入数据表 + for (Map.Entry>> entry : collect.entrySet()) { + //表号 + String key = entry.getKey(); + Map> value = entry.getValue(); + Set>> entries = value.entrySet(); + String deviceType = null; + for (Map.Entry> listEntry : entries) { + deviceType = listEntry.getValue().get(0).getDeviceType(); + break; + } + //分组计算,得到一组当前表号的以时间分组的数据,循环计算第一条的当前读数是上一条的上次读数 + ComputeEnum computeEnum = ComputeEnum.get(Integer.parseInt(deviceType)); + + assert deviceType != null; + List> dealDataList = + ComputeEnum.get(Integer.parseInt(deviceType)).getDataList(entry); + + List> sortedList = dealDataList.stream() + .sorted(Comparator.comparing(temMap -> temMap.keySet().iterator().next())) + .collect(Collectors.toList()); + //计算上次值,当前值 + hourList = calcListData(hourList, key, "hour", sortedList); + } + //批量插入小时报表 + try { + if (hourList.isEmpty() || idMap.isEmpty()) { + return; + } + dataProcessService.batchInsertOrUpdate(hourList, "hour"); + } catch (Exception e) { + log.error("小时数据表处理异常:" + e); + } + //TODO 修改分钟表的状态,grade=1 + dataProcessService.batchUpdateGrade(idMap, "min"); + } + + /** + * 对数据集合计算上一次值,当前值 + * + * @param hourList + * @param key + * @param dealDataList + */ + private List calcListData(List hourList, + String key, String tableType, + List> dealDataList) { + + ArrayList tempList = new ArrayList<>(); + for (int i = 0; i < dealDataList.size(); i++) { + + if (i == 0) { + Map reportEntityMap = dealDataList.get(i); + + //由key已知是哪个表号 + //判断第一个是否有历史数据,当年表有历史数据则取当年表,无则取上一年表,去年和今年表都无则取初始值 + //TODO 从历史表查询上一次的读数,抄表时间 + String lastValue = null; + Date lastDate = null; + //查询历史记录 + DeviceReportEntity hourEntity = dataProcessService.queryLastValue(key, tableType); + if (ObjectUtils.isEmpty(hourEntity)) { + //查询设备信息初始值 + lastValue = dataProcessService.queryInitValue(key); + } else { + lastValue = hourEntity.getLastValue(); + lastDate = hourEntity.getLastTime(); + } + + Set> entrySet = reportEntityMap.entrySet(); + for (Map.Entry entityEntry : entrySet) { + LocalDateTime curTime = entityEntry.getKey(); + DeviceReportEntity entity = entityEntry.getValue(); + ZonedDateTime zonedDateTime = curTime.atZone(ZoneId.systemDefault()); + Date date = Date.from(zonedDateTime.toInstant()); + entity.setCurTime(date); + entity.setLastValue(lastValue); + if (lastDate == null) { + entity.setLastTime(date); + } else { + entity.setLastTime(lastDate); + } + BigDecimal curValue = new BigDecimal(entity.getCurValue()); + BigDecimal newLastValue = new BigDecimal(entity.getLastValue()); + BigDecimal usedValue = curValue.subtract(newLastValue); + int ratio = entity.getRatio(); + entity.setRatio(ratio); + BigDecimal calcValue = usedValue.multiply(BigDecimal.valueOf(ratio)); + entity.setUsedValue(String.valueOf(usedValue)); + entity.setCalcValue(String.valueOf(calcValue)); + hourList.add(entity); + tempList.add(entity); + //只需要第一个实体类 + break; + } + } else { + //从上一条取当前读数和当前时间作为当前对象的上一次读数 + int lastIndex = i - 1; + DeviceReportEntity lastEntity = tempList.get(lastIndex); + Map curMap = dealDataList.get(i); + Set> curEntrySet = curMap.entrySet(); + for (Map.Entry curEntry : curEntrySet) { + LocalDateTime curTime = curEntry.getKey(); + DeviceReportEntity curEntity = curEntry.getValue(); + ZonedDateTime zonedDateTime = curTime.atZone(ZoneId.systemDefault()); + Date date = Date.from(zonedDateTime.toInstant()); + curEntity.setCurTime(date); + curEntity.setLastTime(lastEntity.getCurTime()); + curEntity.setLastValue(lastEntity.getCurValue()); + BigDecimal curValue = new BigDecimal(curEntity.getCurValue()); + BigDecimal lastValue = new BigDecimal(curEntity.getLastValue()); + BigDecimal usedValue = curValue.subtract(lastValue); + int ratio = lastEntity.getRatio(); + curEntity.setRatio(ratio); + BigDecimal calcValue = usedValue.multiply(BigDecimal.valueOf(ratio)); + curEntity.setUsedValue(String.valueOf(usedValue)); + curEntity.setCalcValue(String.valueOf(calcValue)); + hourList.add(curEntity); + tempList.add(curEntity); + } + } + + } + return hourList; + } + + + /** + * 遍历数据分组,按照表号分组,再根据时间戳分组 + * 分组最大只能做到日分组,无法做到月和年,需要另外一个函数实现 + * + * @param dataList 需要处理的数据集 + * @param interval 时间间隔 + * @return + */ + private Map>> getMeterNumMap(List dataList, + TemporalUnit interval) { + // 获取 MinuteData 对象集合,其中的时间戳是 java.util.Date 类型 + //1:数据格式为先按照表号分组,设备类型里的数据再按照小时分组 + //2:小时分组的数据就是DeviceReportEntity实体类 + return dataList.stream() + .map(data -> { + LocalDateTime dateTime = LocalDateTime.ofInstant(data.getCurTime().toInstant(), ZoneId.systemDefault()); + data.setLocalDateTime(dateTime); + return data; + }) + .sorted(Comparator.comparing(DeviceReportEntity::getLocalDateTime)) + .collect( + //根据表号分组 + Collectors.groupingBy(DeviceReportEntity::getDeviceNum, + //根据时间间隔分组,后续是否需要根据减少分组 + Collectors.groupingBy(report -> report.getLocalDateTime().truncatedTo(interval))) + + ); + } + + /*** + * 处理生成日月年报表data_day等表 + * 一日处理一次数据 凌晨3点计算 + */ +// @Scheduled(cron = "0 30 23 * * ?") + //@Scheduled(cron = "0 0/1 * * * ?") + public void dealDay2Year() { + Map untreatedList = dataProcessService.queryUntreatedData("hour"); + //需要处理的数据 + List dataList = (List) untreatedList.get("data"); + //处理完成后需要将数据设置为已处理,grade = 1, + Map> idMap = (Map>) untreatedList.get("idMap"); + + //<表号:<时间:List<集合>>> 按照日区分 + Map>> dayMap = getMeterNumMap(dataList, ChronoUnit.DAYS); + List dayList = new ArrayList<>(); + List monthList = new ArrayList<>(); + + // 使用流对每个内部的Map进行排序 + dayMap.forEach((key, value) -> { + Map> sortedMap = value.entrySet().stream() + .sorted(Map.Entry.comparingByKey()) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (oldValue, newValue) -> oldValue, LinkedHashMap::new)); + + // 将排序后的Map更新回dayMap + dayMap.put(key, sortedMap); + }); + dayList = getDeviceReportEntities(dayMap, dayList, "day"); + //存入日表 + try { + dataProcessService.batchInsertOrUpdate(dayList, "day"); + } catch (Exception e) { + log.error("日表数据表处理异常:" + e); + e.printStackTrace(); + } + + // 按照月区分 + //Map>> monthMap = getMeterNumMap(dataList, ChronoUnit.MONTHS); + Map>> monthMap = getMeterNumMapByMonth(dataList, ChronoUnit.MONTHS); + List monthInsert = getMonthListData(monthList, monthMap); + dataProcessService.batchInsertOrUpdate(monthInsert, "month"); + + // 按照年区分 + //Map>> yearMap = getMeterNumMapByMonth(dataList, ChronoUnit.YEARS); + //List yearInsert = getMonthListData(monthList, yearMap); + dataProcessService.batchInsertOrUpdate(monthInsert, "year"); + + //TODO 修改分钟表的状态,grade=1 + dataProcessService.batchUpdateGrade(idMap, "hour"); + } + + /** + * 计算月和年报表 + * + * @param monthList + * @param monthMap + * @return + */ + private List getMonthListData(List monthList, Map>> monthMap) { + Set>>> entries = monthMap.entrySet(); + for (Map.Entry>> entry : entries) { + String deviceNum = entry.getKey(); + Map> value = entry.getValue(); + List deviceList = new ArrayList<>(); + List monthInsert = new ArrayList<>(); + List tempList = new ArrayList<>(); + Set>> monthEntries = value.entrySet(); + for (Map.Entry> monthEntry : monthEntries) { + //月份 + YearMonth key = monthEntry.getKey(); + //集合 + List monthEntryValue = monthEntry.getValue(); + //筛选出最大值和最小值 + DeviceReportEntity max = Collections.max(monthEntryValue, Comparator.comparing(DeviceReportEntity::getLocalDateTime)); + + deviceList.add(max); + } + deviceList.sort(Comparator.comparing(DeviceReportEntity::getLocalDateTime)); + for (int i = 0; i < deviceList.size(); i++) { + String lastValue; + Date lastTime; + if (i == 0) { + DeviceReportEntity entity = deviceList.get(i); + DeviceReportEntity lastEntity = dataProcessService.queryLastValue(deviceNum, "month"); + if (ObjectUtils.isEmpty(lastEntity)) { + lastValue = dataProcessService.queryInitValue(deviceNum); + lastTime = entity.getCurTime(); + } else { + lastValue = lastEntity.getLastValue(); + lastTime = lastEntity.getLastTime(); + } + entity.setLastTime(lastTime); + entity.setLastValue(lastValue); + + BigDecimal curValue = new BigDecimal(entity.getCurValue()); + BigDecimal bigDecimal = new BigDecimal(entity.getLastValue()); + int ratio = entity.getRatio(); + BigDecimal usedValue = curValue.subtract(bigDecimal); + entity.setUsedValue(usedValue.toString()); + BigDecimal calcValue = usedValue.multiply(BigDecimal.valueOf(ratio)); + entity.setCalcValue(calcValue.toString()); + + monthInsert.add(entity); + tempList.add(entity); + } else { + //从上一条取当前读数和当前时间作为当前对象的上一次读数 + int lastIndex = i - 1; + DeviceReportEntity lastEntity = tempList.get(lastIndex); + DeviceReportEntity curEntity = deviceList.get(i); + + curEntity.setLastTime(lastEntity.getCurTime()); + curEntity.setLastValue(lastEntity.getCurValue()); + BigDecimal curValue = new BigDecimal(curEntity.getCurValue()); + BigDecimal bigDecimal = new BigDecimal(curEntity.getLastValue()); + BigDecimal usedValue = curValue.subtract(bigDecimal); + int ratio = lastEntity.getRatio(); + curEntity.setRatio(ratio); + BigDecimal calcValue = usedValue.multiply(BigDecimal.valueOf(ratio)); + curEntity.setUsedValue(String.valueOf(usedValue)); + curEntity.setCalcValue(String.valueOf(calcValue)); + + monthInsert.add(curEntity); + tempList.add(curEntity); + + } + } + monthList.addAll(tempList); + } + + return monthList; + } + + /** + * 循环处理 + * + * @param dayMap + * @param dayList + * @param tableType + * @return + */ + private List getDeviceReportEntities(Map>> dayMap, + List dayList, + String tableType) { + for (Map.Entry>> entry : dayMap.entrySet()) { + //表号 + String key = entry.getKey(); + Map> value = entry.getValue(); + String deviceType = null; + for (Map.Entry> listEntry : value.entrySet()) { + deviceType = listEntry.getValue().get(0).getDeviceType(); + } + //取到的最大值 + assert deviceType != null; + List> dayMapList = + ComputeEnum.get(Integer.parseInt(deviceType)).getDataList(entry); + dayList = calcListData(dayList, key, tableType, dayMapList); + + } + return dayList; + } + + /** + * 按照月、年分组 + * + * @param dataList + * @param interval + * @return + */ + private Map>> getMeterNumMapByMonth(List dataList, + TemporalUnit interval) { + return dataList.stream() + .map(data -> { + LocalDateTime dateTime = LocalDateTime.ofInstant(data.getCurTime().toInstant(), ZoneId.systemDefault()); + data.setLocalDateTime(dateTime); + return data; + }) + .collect(Collectors.groupingBy(DeviceReportEntity::getDeviceNum, + Collectors.groupingBy( + report -> YearMonth.from(report.getLocalDateTime()), + LinkedHashMap::new, + Collectors.toList() + ))); + } + + + /** + * 处理冷水机组数据获取只要实时数据的设备 + */ + public void dealChillersData() { List cacheList = redisCache.getCacheList("CHILLERS", CollectionParamsManage.class); if (null == cacheList || cacheList.isEmpty()) { return; diff --git a/mh-system/src/main/java/com/mh/system/mapper/device/DataProcessMapper.java b/mh-system/src/main/java/com/mh/system/mapper/device/DataProcessMapper.java index 3bfb4be..4364bba 100644 --- a/mh-system/src/main/java/com/mh/system/mapper/device/DataProcessMapper.java +++ b/mh-system/src/main/java/com/mh/system/mapper/device/DataProcessMapper.java @@ -1,8 +1,10 @@ package com.mh.system.mapper.device; import com.mh.common.core.domain.entity.CollectionParamsManage; +import com.mh.common.core.domain.entity.DeviceReportEntity; import org.apache.ibatis.annotations.*; +import java.util.Date; import java.util.List; /** @@ -27,4 +29,138 @@ public interface DataProcessMapper { "") void batchInsertChiller(@Param("batchList") List batchList, @Param("tableName") String tableName); + /** + * 查询上一次数据 + * + * @param deviceNum + * @param tableName + * @return + */ + @Select("select * from ${tableName} where device_num = #{deviceNum} order by cur_time desc limit 1") + @Results({ + @Result(column = "device_num", property = "deviceNum"), + @Result(column = "device_code", property = "deviceCode"), + @Result(column = "cur_time", property = "lastTime"), + @Result(column = "cur_value", property = "lastValue"), + @Result(column = "ratio", property = "ratio") + }) + DeviceReportEntity queryLastValue(@Param("deviceNum") String deviceNum, @Param("tableName") String tableName); + + @Select("select * from ${tableName} where device_num = #{entity.deviceNum} and cur_time = #{entity.curTime} limit 1") + DeviceReportEntity isHaveData(@Param("entity") DeviceReportEntity deviceReportEntity, @Param("tableName") String tableName); + + @Update("update ${tableName} set " + + " cur_value = cast(#{entity.curValue} as numeric(24,2))," + + " used_value= cast(#{entity.usedValue} as numeric(24,2)), " + + " calc_value = cast(#{entity.calcValue} as numeric(24,2)), " + + " cur_time = #{entity.curTime}," + + " grade = #{entity.grade} where id = #{entity.id} ") + void updateDataMinByTime(@Param("entity") DeviceReportEntity deviceReportEntity, @Param("tableName") String tableName); + + /** + * 批量插入数据表中 + * + * @param batchList + * @param tableName + */ + @Select("") + void insertDataMin(@Param("batchList") List batchList, + @Param("tableName") String tableName); + + /** + * 查询未处理的数据 + * + * @param tableName + * @return + */ + @Select("select * from ${tableName} where grade = 0 order by cur_time desc") + List queryUntreatedData(String tableName); + + @Select("select count(1) from ${tableName} where device_num = #{deviceNum} and EXTRACT(YEAR FROM AGE(cur_time, #{curTime})) = 0;") + Integer selectDataByYY(@Param("tableName") String tableName,@Param("curTime") Date curTime,@Param("deviceNum")String deviceNum); + + @Select("select count(1) from ${tableName} where device_num = #{deviceNum} and EXTRACT(MONTH FROM AGE(cur_time, #{curTime})) = 0;") + Integer selectDataByMM(@Param("tableName") String tableName,@Param("curTime") Date curTime,@Param("deviceNum")String deviceNum); + + @Select("select count(1) from ${tableName} where device_num = #{deviceNum} and EXTRACT(DAY FROM AGE(cur_time, #{curTime})) = 0;") + Integer selectDataByDD(@Param("tableName") String tableName,@Param("curTime") Date curTime,@Param("deviceNum")String deviceNum); + + @Select("select count(1) from ${tableName} where device_num = #{deviceNum} and EXTRACT(HOUR FROM AGE(cur_time, #{curTime})) = 0; ") + Integer selectDataByHH(@Param("tableName") String tableName,@Param("curTime") Date curTime,@Param("deviceNum")String deviceNum); + + @Insert("insert into ${tableName}(device_num, device_code, device_type, last_value, last_time, cur_value,cur_time,calc_value,used_value,ratio,grade) " + + " values(" + + " #{data.deviceNum}," + + " #{data.deviceCode}," + + " #{data.deviceType}," + + " cast(#{data.lastValue} as numeric(24,2))," + + " #{data.lastTime}," + + " cast(#{data.curValue} as numeric(24,2))," + + " #{data.curTime}," + + " cast(#{data.calcValue} as numeric(24,2))," + + " cast(#{data.usedValue} as numeric(24,2))," + + " #{data.ratio}," + + " 0)") + void insertTable(@Param("data") DeviceReportEntity data,@Param("tableName") String tableName); + + @Update("update ${tableName} set cur_time = #{data.curTime},cur_value = #{data.curValue},calc_value = (#{data.curValue} - last_value)*ratio,used_value = #{data.curValue} - last_value " + + "where device_num = #{data.deviceNum} and EXTRACT(MONTH FROM AGE(cur_time, #{curTime})) = 0 ") + void updateTableMM(@Param("data") DeviceReportEntity data,@Param("tableName") String tableName); + + @Update("update ${tableName} set cur_time = #{data.curTime},cur_value = #{data.curValue},calc_value = (#{data.curValue} - last_value)*ratio,used_value = #{data.curValue} - last_value " + + "where device_num = #{data.deviceNum} and EXTRACT(YEAR FROM AGE(cur_time, #{curTime})) = 0 ") + void updateTableYY(@Param("data") DeviceReportEntity data,@Param("tableName") String tableName); + + @Update("update ${tableName} set cur_time = #{data.curTime},cur_value = #{data.curValue},calc_value = (#{data.curValue} - last_value)*ratio,used_value = #{data.curValue} - last_value " + + "where device_num = #{data.deviceNum} and EXTRACT(DAY FROM AGE(cur_time, #{curTime})) = 0 ") + void updateTableDD(@Param("data") DeviceReportEntity data,@Param("tableName") String tableName); + + @Update("update ${tableName} set cur_time = #{data.curTime},cur_value = #{data.curValue},calc_value = (#{data.curValue} - last_value)*ratio,used_value = #{data.curValue} - last_value " + + "where device_num = #{data.deviceNum} and EXTRACT(HOUR FROM AGE(cur_time, #{curTime})) = 0 ") + void updateTableHH(@Param("data") DeviceReportEntity data,@Param("tableName") String tableName); + + /** + * 批量修改grade + * + * @param value 主键ID + * @param tableName 表名 + * @param grade + */ + @Update("") + void batchUpdateGrade(@Param("ids") List value, @Param("tableName") String tableName, @Param("grade") int grade); + + /** + * 查询初始数据 + * + * @param deviceNum + * @return + */ + @Select("select mt_init_value from collection_params_manage where mt_num = #{deviceNum}") + String queryInitValue(String deviceNum); + } diff --git a/sql/添加自增id.sql b/sql/添加自增id.sql index 5977ab4..6df7b1d 100644 --- a/sql/添加自增id.sql +++ b/sql/添加自增id.sql @@ -96,3 +96,21 @@ CREATE SEQUENCE user_id_seq CACHE 1; ALTER TABLE sys_user ALTER COLUMN user_id SET DEFAULT nextval('user_id_seq'); + +CREATE SEQUENCE sys_job_id_seq + START WITH 10000 + INCREMENT BY 1 + NO MINVALUE + NO MAXVALUE + CACHE 1; +ALTER TABLE sys_job ALTER COLUMN job_id + SET DEFAULT nextval('sys_job_id_seq'); + +CREATE SEQUENCE sys_job_log_id_seq + START WITH 10000 + INCREMENT BY 1 + NO MINVALUE + NO MAXVALUE + CACHE 1; +ALTER TABLE sys_job_log ALTER COLUMN job_log_id + SET DEFAULT nextval('sys_job_log_id_seq');