|
|
@ -1,15 +1,23 @@ |
|
|
|
package com.mh.quartz.task; |
|
|
|
package com.mh.quartz.task; |
|
|
|
|
|
|
|
|
|
|
|
import com.mh.common.core.domain.entity.CollectionParamsManage; |
|
|
|
import com.mh.common.core.domain.entity.CollectionParamsManage; |
|
|
|
|
|
|
|
import com.mh.common.core.domain.entity.DeviceReportEntity; |
|
|
|
import com.mh.common.core.redis.RedisCache; |
|
|
|
import com.mh.common.core.redis.RedisCache; |
|
|
|
|
|
|
|
import com.mh.common.enums.ComputeEnum; |
|
|
|
|
|
|
|
import com.mh.common.utils.DateUtils; |
|
|
|
import com.mh.framework.dealdata.DataProcessService; |
|
|
|
import com.mh.framework.dealdata.DataProcessService; |
|
|
|
import lombok.extern.slf4j.Slf4j; |
|
|
|
import lombok.extern.slf4j.Slf4j; |
|
|
|
|
|
|
|
import org.springframework.beans.BeanUtils; |
|
|
|
import org.springframework.beans.factory.annotation.Autowired; |
|
|
|
import org.springframework.beans.factory.annotation.Autowired; |
|
|
|
|
|
|
|
import org.springframework.scheduling.annotation.Scheduled; |
|
|
|
import org.springframework.stereotype.Component; |
|
|
|
import org.springframework.stereotype.Component; |
|
|
|
|
|
|
|
import org.springframework.util.ObjectUtils; |
|
|
|
|
|
|
|
|
|
|
|
import java.time.Duration; |
|
|
|
import java.math.BigDecimal; |
|
|
|
import java.time.LocalDateTime; |
|
|
|
import java.time.*; |
|
|
|
import java.time.format.DateTimeFormatter; |
|
|
|
import java.time.format.DateTimeFormatter; |
|
|
|
|
|
|
|
import java.time.temporal.ChronoUnit; |
|
|
|
|
|
|
|
import java.time.temporal.TemporalUnit; |
|
|
|
import java.util.*; |
|
|
|
import java.util.*; |
|
|
|
import java.util.stream.Collectors; |
|
|
|
import java.util.stream.Collectors; |
|
|
|
|
|
|
|
|
|
|
@ -34,8 +42,550 @@ public class DealDataTask { |
|
|
|
this.dataProcessService = dataProcessService; |
|
|
|
this.dataProcessService = dataProcessService; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
|
|
|
* |
|
|
|
|
|
|
|
* 每隔5分钟处理生成秒级报表存入data_min中 |
|
|
|
|
|
|
|
* 不要修改时间!!!不要修改时间!!!5分钟 |
|
|
|
|
|
|
|
*/ |
|
|
|
public void dealDeviceData() { |
|
|
|
public void dealDeviceData() { |
|
|
|
|
|
|
|
List<CollectionParamsManage> cacheList = redisCache.getCacheList("DEVICES", CollectionParamsManage.class); |
|
|
|
|
|
|
|
if (null == cacheList || cacheList.isEmpty()) { |
|
|
|
|
|
|
|
return; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
//清空redis
|
|
|
|
|
|
|
|
// redisCache.delete Object("DEVICES");
|
|
|
|
|
|
|
|
//处理chillers数据
|
|
|
|
|
|
|
|
try { |
|
|
|
|
|
|
|
//todo 处理没有对象curValue和curTime的异常
|
|
|
|
|
|
|
|
dealDeviceCollect(cacheList); |
|
|
|
|
|
|
|
} catch (Exception e) { |
|
|
|
|
|
|
|
log.error("处理主机参数异常:{}", e); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private void dealDeviceCollect(List<CollectionParamsManage> cacheList) { |
|
|
|
|
|
|
|
// 先根据表具类型进行分组
|
|
|
|
|
|
|
|
Map<String, List<CollectionParamsManage>> groupMap = cacheList |
|
|
|
|
|
|
|
.stream() |
|
|
|
|
|
|
|
.collect(Collectors.groupingBy(CollectionParamsManage::getMtType)); |
|
|
|
|
|
|
|
// 开始进行数据遍历
|
|
|
|
|
|
|
|
for (Map.Entry<String, List<CollectionParamsManage>> entry : groupMap.entrySet()) { |
|
|
|
|
|
|
|
String mtType = entry.getKey(); |
|
|
|
|
|
|
|
List<CollectionParamsManage> dataList = entry.getValue(); |
|
|
|
|
|
|
|
// 进行数据处理入库操作等
|
|
|
|
|
|
|
|
try { |
|
|
|
|
|
|
|
dealAndInsert(dataList, mtType); |
|
|
|
|
|
|
|
} catch (Exception e) { |
|
|
|
|
|
|
|
throw new RuntimeException(e); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
|
|
|
* 处理数据并添加到data_min中 |
|
|
|
|
|
|
|
* @param dataList |
|
|
|
|
|
|
|
* @param deviceType |
|
|
|
|
|
|
|
*/ |
|
|
|
|
|
|
|
private void dealAndInsert(List<CollectionParamsManage> dataList, String deviceType) { |
|
|
|
|
|
|
|
// 格式化时间点,然后取同样时间点的最大值
|
|
|
|
|
|
|
|
Map<Date, Optional<CollectionParamsManage>> collect = dataList.stream() |
|
|
|
|
|
|
|
.peek(val -> val.setCurTime(DateUtils.stringToDate(DateUtils.getTimeMin(val.getCurTime(), 5), "yyyy-MM-dd HH:mm:ss"))) |
|
|
|
|
|
|
|
.collect( |
|
|
|
|
|
|
|
Collectors.groupingBy( |
|
|
|
|
|
|
|
CollectionParamsManage::getCurTime, |
|
|
|
|
|
|
|
Collectors.maxBy(Comparator.comparing(CollectionParamsManage::getCurValue))) |
|
|
|
|
|
|
|
); |
|
|
|
|
|
|
|
collect = sortMapByDate(collect); |
|
|
|
|
|
|
|
List<DeviceReportEntity> dataMinList = new ArrayList<>(); |
|
|
|
|
|
|
|
//value循环处理数据
|
|
|
|
|
|
|
|
if (collect.size() > 0) { |
|
|
|
|
|
|
|
CollectionParamsManage entity = collect.values().stream().findFirst().get().get(); |
|
|
|
|
|
|
|
int deviceGrade = entity.getGrade(); |
|
|
|
|
|
|
|
//从数据库取值,当前的年表,当前年表没有,查询上一次年表
|
|
|
|
|
|
|
|
DeviceReportEntity lastData = dataProcessService.queryLastValue(entity.getMtNum(), "min"); |
|
|
|
|
|
|
|
int ratio = dataProcessService.queryRatio(entity.getMtNum()) == null ? 1 : dataProcessService.queryRatio(entity.getMtNum()); |
|
|
|
|
|
|
|
if (ObjectUtils.isEmpty(lastData) || ObjectUtils.isEmpty(lastData.getLastValue())) { |
|
|
|
|
|
|
|
//从device_manage取出初始值
|
|
|
|
|
|
|
|
String initValue = dataProcessService.queryInitValue(entity.getMtNum()); |
|
|
|
|
|
|
|
DeviceReportEntity firstEntity = new DeviceReportEntity(); |
|
|
|
|
|
|
|
firstEntity.setLastValue(initValue); |
|
|
|
|
|
|
|
firstEntity.setLastTime(entity.getCurTime()); |
|
|
|
|
|
|
|
firstEntity.setCurTime(entity.getCurTime()); |
|
|
|
|
|
|
|
firstEntity.setCurValue(entity.getCurValue().toString()); |
|
|
|
|
|
|
|
firstEntity.setDeviceNum(entity.getMtNum()); |
|
|
|
|
|
|
|
firstEntity.setDeviceCode(entity.getMtCode()); |
|
|
|
|
|
|
|
firstEntity.setDeviceType(deviceType); |
|
|
|
|
|
|
|
firstEntity.setRatio(ratio); |
|
|
|
|
|
|
|
double usedValue = entity.getCurValue().doubleValue() - Double.parseDouble(initValue); |
|
|
|
|
|
|
|
firstEntity.setUsedValue(String.valueOf(usedValue)); |
|
|
|
|
|
|
|
//区分瞬时值
|
|
|
|
|
|
|
|
if ((deviceGrade >= 100 && deviceGrade < 200) || (deviceGrade >= 1200 && deviceGrade < 1300) ) { |
|
|
|
|
|
|
|
firstEntity.setCalcValue(String.valueOf(entity.getCurValue().doubleValue() * ratio)); |
|
|
|
|
|
|
|
firstEntity.setGrade(1); |
|
|
|
|
|
|
|
} else { |
|
|
|
|
|
|
|
firstEntity.setCalcValue(String.valueOf(usedValue * ratio)); |
|
|
|
|
|
|
|
firstEntity.setGrade(0); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
dataMinList.add(firstEntity); |
|
|
|
|
|
|
|
} else { |
|
|
|
|
|
|
|
Date lastDate = lastData.getLastTime(); |
|
|
|
|
|
|
|
String lastValue = lastData.getLastValue(); |
|
|
|
|
|
|
|
DeviceReportEntity firstEntity = new DeviceReportEntity(); |
|
|
|
|
|
|
|
firstEntity.setLastValue(lastValue); |
|
|
|
|
|
|
|
firstEntity.setLastTime(lastDate); |
|
|
|
|
|
|
|
firstEntity.setCurTime(entity.getCurTime()); |
|
|
|
|
|
|
|
firstEntity.setCurValue(entity.getCurValue().toString()); |
|
|
|
|
|
|
|
firstEntity.setDeviceNum(entity.getMtNum()); |
|
|
|
|
|
|
|
firstEntity.setDeviceCode(entity.getMtCode()); |
|
|
|
|
|
|
|
firstEntity.setDeviceType(deviceType); |
|
|
|
|
|
|
|
firstEntity.setRatio(ratio); |
|
|
|
|
|
|
|
double usedValue = entity.getCurValue().doubleValue() - Double.parseDouble(lastValue); |
|
|
|
|
|
|
|
firstEntity.setUsedValue(String.valueOf(usedValue)); |
|
|
|
|
|
|
|
//区分瞬时值
|
|
|
|
|
|
|
|
//区分瞬时值
|
|
|
|
|
|
|
|
if ((deviceGrade >= 100 && deviceGrade < 200) || (deviceGrade >= 1200 && deviceGrade < 1300) ) { |
|
|
|
|
|
|
|
firstEntity.setCalcValue(String.valueOf(entity.getCurValue().doubleValue() * ratio)); |
|
|
|
|
|
|
|
firstEntity.setGrade(1); |
|
|
|
|
|
|
|
} else { |
|
|
|
|
|
|
|
firstEntity.setCalcValue(String.valueOf(usedValue * ratio)); |
|
|
|
|
|
|
|
firstEntity.setGrade(0); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
dataMinList.add(firstEntity); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
int i = 0, j = 0; |
|
|
|
|
|
|
|
for (Map.Entry<Date, Optional<CollectionParamsManage>> entry : collect.entrySet()) { |
|
|
|
|
|
|
|
if (entry.getValue().isPresent()) { |
|
|
|
|
|
|
|
if (i == 0) { |
|
|
|
|
|
|
|
i++; |
|
|
|
|
|
|
|
continue; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
DeviceReportEntity temp = new DeviceReportEntity(); |
|
|
|
|
|
|
|
DeviceReportEntity dataJ = dataMinList.get(j); |
|
|
|
|
|
|
|
CollectionParamsManage dataI = entry.getValue().get(); |
|
|
|
|
|
|
|
//复制实体类
|
|
|
|
|
|
|
|
// BeanUtils.copyProperties(dataI, temp);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
temp.setDeviceNum(dataI.getMtNum()); |
|
|
|
|
|
|
|
temp.setDeviceCode(dataI.getMtCode()); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
temp.setLastValue(dataJ.getCurValue()); |
|
|
|
|
|
|
|
temp.setLastTime(dataJ.getCurTime()); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
temp.setCurTime(dataI.getCurTime()); |
|
|
|
|
|
|
|
temp.setCurValue(dataI.getCurValue().toString()); |
|
|
|
|
|
|
|
double usedValue = Double.parseDouble(temp.getCurValue()) - Double.parseDouble(temp.getLastValue()); |
|
|
|
|
|
|
|
temp.setUsedValue(String.valueOf(usedValue)); |
|
|
|
|
|
|
|
//区分瞬时值
|
|
|
|
|
|
|
|
if ((deviceGrade >= 100 && deviceGrade < 200) || (deviceGrade >= 1200 && deviceGrade < 1300) ) { |
|
|
|
|
|
|
|
temp.setCalcValue(String.valueOf(Double.parseDouble(temp.getCurValue()) * ratio)); |
|
|
|
|
|
|
|
temp.setGrade(1); |
|
|
|
|
|
|
|
} else { |
|
|
|
|
|
|
|
temp.setCalcValue(String.valueOf(usedValue * ratio)); |
|
|
|
|
|
|
|
temp.setGrade(0); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
temp.setDeviceType(deviceType); |
|
|
|
|
|
|
|
temp.setRatio(ratio); |
|
|
|
|
|
|
|
dataMinList.add(temp); |
|
|
|
|
|
|
|
i++; |
|
|
|
|
|
|
|
j++; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
//完全处理好数据,添加到data_min中
|
|
|
|
|
|
|
|
dataProcessService.insertDatabase(dataMinList); |
|
|
|
|
|
|
|
// 这里计算COP
|
|
|
|
|
|
|
|
for (Map.Entry<Date, Optional<CollectionParamsManage>> entry : collect.entrySet()) { |
|
|
|
|
|
|
|
if (entry.getValue().isPresent()) { |
|
|
|
|
|
|
|
try { |
|
|
|
|
|
|
|
dataProcessService.calculateCopByTime(DateUtils.dateToString(entry.getKey(), "yyyy-MM-dd HH:mm:ss")); |
|
|
|
|
|
|
|
} catch (Exception e) { |
|
|
|
|
|
|
|
log.error("计算COP失败", e); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
|
|
|
* 排序 |
|
|
|
|
|
|
|
* @param map |
|
|
|
|
|
|
|
* @return |
|
|
|
|
|
|
|
*/ |
|
|
|
|
|
|
|
public static Map<Date, Optional<CollectionParamsManage>> sortMapByDate(Map<Date, Optional<CollectionParamsManage>> map) { |
|
|
|
|
|
|
|
return map.entrySet().stream() |
|
|
|
|
|
|
|
.sorted(Map.Entry.comparingByKey()) |
|
|
|
|
|
|
|
.collect(Collectors.toMap( |
|
|
|
|
|
|
|
Map.Entry::getKey, |
|
|
|
|
|
|
|
Map.Entry::getValue, |
|
|
|
|
|
|
|
(oldValue, newValue) -> oldValue, |
|
|
|
|
|
|
|
LinkedHashMap::new |
|
|
|
|
|
|
|
)); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
|
|
|
* 每隔一个小时处理一次小时表数据,从data_min表中取值,grade=0的数据 |
|
|
|
|
|
|
|
* 加工聚合处理完成,插入data_hour表中 |
|
|
|
|
|
|
|
*/ |
|
|
|
|
|
|
|
// @Scheduled(cron = "0 30 0/1 * * ?")
|
|
|
|
|
|
|
|
public void dealDeviceDataHour() { |
|
|
|
|
|
|
|
Map<String, Object> map = dataProcessService.queryUntreatedData("min"); |
|
|
|
|
|
|
|
//需要处理的数据
|
|
|
|
|
|
|
|
List<DeviceReportEntity> dataList = (List<DeviceReportEntity>) map.get("data"); |
|
|
|
|
|
|
|
//处理完成后需要将数据设置为已处理,grade = 1,
|
|
|
|
|
|
|
|
Map<String, List<Long>> idMap = (Map<String, List<Long>>) map.get("idMap"); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
//遍历分组
|
|
|
|
|
|
|
|
Map<String, Map<LocalDateTime, List<DeviceReportEntity>>> collect = getMeterNumMap(dataList, ChronoUnit.HOURS); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
//小时表数据集合
|
|
|
|
|
|
|
|
List<DeviceReportEntity> hourList = new ArrayList<>(); |
|
|
|
|
|
|
|
//TODO 分组好的数据计算后插入数据表
|
|
|
|
|
|
|
|
for (Map.Entry<String, Map<LocalDateTime, List<DeviceReportEntity>>> entry : collect.entrySet()) { |
|
|
|
|
|
|
|
//表号
|
|
|
|
|
|
|
|
String key = entry.getKey(); |
|
|
|
|
|
|
|
Map<LocalDateTime, List<DeviceReportEntity>> value = entry.getValue(); |
|
|
|
|
|
|
|
Set<Map.Entry<LocalDateTime, List<DeviceReportEntity>>> entries = value.entrySet(); |
|
|
|
|
|
|
|
String deviceType = null; |
|
|
|
|
|
|
|
for (Map.Entry<LocalDateTime, List<DeviceReportEntity>> listEntry : entries) { |
|
|
|
|
|
|
|
deviceType = listEntry.getValue().get(0).getDeviceType(); |
|
|
|
|
|
|
|
break; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
//分组计算,得到一组当前表号的以时间分组的数据,循环计算第一条的当前读数是上一条的上次读数
|
|
|
|
|
|
|
|
ComputeEnum computeEnum = ComputeEnum.get(Integer.parseInt(deviceType)); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
assert deviceType != null; |
|
|
|
|
|
|
|
List<Map<LocalDateTime, DeviceReportEntity>> dealDataList = |
|
|
|
|
|
|
|
ComputeEnum.get(Integer.parseInt(deviceType)).getDataList(entry); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
List<Map<LocalDateTime, DeviceReportEntity>> sortedList = dealDataList.stream() |
|
|
|
|
|
|
|
.sorted(Comparator.comparing(temMap -> temMap.keySet().iterator().next())) |
|
|
|
|
|
|
|
.collect(Collectors.toList()); |
|
|
|
|
|
|
|
//计算上次值,当前值
|
|
|
|
|
|
|
|
hourList = calcListData(hourList, key, "hour", sortedList); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
//批量插入小时报表
|
|
|
|
|
|
|
|
try { |
|
|
|
|
|
|
|
if (hourList.isEmpty() || idMap.isEmpty()) { |
|
|
|
|
|
|
|
return; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
dataProcessService.batchInsertOrUpdate(hourList, "hour"); |
|
|
|
|
|
|
|
} catch (Exception e) { |
|
|
|
|
|
|
|
log.error("小时数据表处理异常:" + e); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
//TODO 修改分钟表的状态,grade=1
|
|
|
|
|
|
|
|
dataProcessService.batchUpdateGrade(idMap, "min"); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
|
|
|
* 对数据集合计算上一次值,当前值 |
|
|
|
|
|
|
|
* |
|
|
|
|
|
|
|
* @param hourList |
|
|
|
|
|
|
|
* @param key |
|
|
|
|
|
|
|
* @param dealDataList |
|
|
|
|
|
|
|
*/ |
|
|
|
|
|
|
|
private List<DeviceReportEntity> calcListData(List<DeviceReportEntity> hourList, |
|
|
|
|
|
|
|
String key, String tableType, |
|
|
|
|
|
|
|
List<Map<LocalDateTime, DeviceReportEntity>> dealDataList) { |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
ArrayList<DeviceReportEntity> tempList = new ArrayList<>(); |
|
|
|
|
|
|
|
for (int i = 0; i < dealDataList.size(); i++) { |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if (i == 0) { |
|
|
|
|
|
|
|
Map<LocalDateTime, DeviceReportEntity> reportEntityMap = dealDataList.get(i); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
//由key已知是哪个表号
|
|
|
|
|
|
|
|
//判断第一个是否有历史数据,当年表有历史数据则取当年表,无则取上一年表,去年和今年表都无则取初始值
|
|
|
|
|
|
|
|
//TODO 从历史表查询上一次的读数,抄表时间
|
|
|
|
|
|
|
|
String lastValue = null; |
|
|
|
|
|
|
|
Date lastDate = null; |
|
|
|
|
|
|
|
//查询历史记录
|
|
|
|
|
|
|
|
DeviceReportEntity hourEntity = dataProcessService.queryLastValue(key, tableType); |
|
|
|
|
|
|
|
if (ObjectUtils.isEmpty(hourEntity)) { |
|
|
|
|
|
|
|
//查询设备信息初始值
|
|
|
|
|
|
|
|
lastValue = dataProcessService.queryInitValue(key); |
|
|
|
|
|
|
|
} else { |
|
|
|
|
|
|
|
lastValue = hourEntity.getLastValue(); |
|
|
|
|
|
|
|
lastDate = hourEntity.getLastTime(); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Set<Map.Entry<LocalDateTime, DeviceReportEntity>> entrySet = reportEntityMap.entrySet(); |
|
|
|
|
|
|
|
for (Map.Entry<LocalDateTime, DeviceReportEntity> entityEntry : entrySet) { |
|
|
|
|
|
|
|
LocalDateTime curTime = entityEntry.getKey(); |
|
|
|
|
|
|
|
DeviceReportEntity entity = entityEntry.getValue(); |
|
|
|
|
|
|
|
ZonedDateTime zonedDateTime = curTime.atZone(ZoneId.systemDefault()); |
|
|
|
|
|
|
|
Date date = Date.from(zonedDateTime.toInstant()); |
|
|
|
|
|
|
|
entity.setCurTime(date); |
|
|
|
|
|
|
|
entity.setLastValue(lastValue); |
|
|
|
|
|
|
|
if (lastDate == null) { |
|
|
|
|
|
|
|
entity.setLastTime(date); |
|
|
|
|
|
|
|
} else { |
|
|
|
|
|
|
|
entity.setLastTime(lastDate); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
BigDecimal curValue = new BigDecimal(entity.getCurValue()); |
|
|
|
|
|
|
|
BigDecimal newLastValue = new BigDecimal(entity.getLastValue()); |
|
|
|
|
|
|
|
BigDecimal usedValue = curValue.subtract(newLastValue); |
|
|
|
|
|
|
|
int ratio = entity.getRatio(); |
|
|
|
|
|
|
|
entity.setRatio(ratio); |
|
|
|
|
|
|
|
BigDecimal calcValue = usedValue.multiply(BigDecimal.valueOf(ratio)); |
|
|
|
|
|
|
|
entity.setUsedValue(String.valueOf(usedValue)); |
|
|
|
|
|
|
|
entity.setCalcValue(String.valueOf(calcValue)); |
|
|
|
|
|
|
|
hourList.add(entity); |
|
|
|
|
|
|
|
tempList.add(entity); |
|
|
|
|
|
|
|
//只需要第一个实体类
|
|
|
|
|
|
|
|
break; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} else { |
|
|
|
|
|
|
|
//从上一条取当前读数和当前时间作为当前对象的上一次读数
|
|
|
|
|
|
|
|
int lastIndex = i - 1; |
|
|
|
|
|
|
|
DeviceReportEntity lastEntity = tempList.get(lastIndex); |
|
|
|
|
|
|
|
Map<LocalDateTime, DeviceReportEntity> curMap = dealDataList.get(i); |
|
|
|
|
|
|
|
Set<Map.Entry<LocalDateTime, DeviceReportEntity>> curEntrySet = curMap.entrySet(); |
|
|
|
|
|
|
|
for (Map.Entry<LocalDateTime, DeviceReportEntity> curEntry : curEntrySet) { |
|
|
|
|
|
|
|
LocalDateTime curTime = curEntry.getKey(); |
|
|
|
|
|
|
|
DeviceReportEntity curEntity = curEntry.getValue(); |
|
|
|
|
|
|
|
ZonedDateTime zonedDateTime = curTime.atZone(ZoneId.systemDefault()); |
|
|
|
|
|
|
|
Date date = Date.from(zonedDateTime.toInstant()); |
|
|
|
|
|
|
|
curEntity.setCurTime(date); |
|
|
|
|
|
|
|
curEntity.setLastTime(lastEntity.getCurTime()); |
|
|
|
|
|
|
|
curEntity.setLastValue(lastEntity.getCurValue()); |
|
|
|
|
|
|
|
BigDecimal curValue = new BigDecimal(curEntity.getCurValue()); |
|
|
|
|
|
|
|
BigDecimal lastValue = new BigDecimal(curEntity.getLastValue()); |
|
|
|
|
|
|
|
BigDecimal usedValue = curValue.subtract(lastValue); |
|
|
|
|
|
|
|
int ratio = lastEntity.getRatio(); |
|
|
|
|
|
|
|
curEntity.setRatio(ratio); |
|
|
|
|
|
|
|
BigDecimal calcValue = usedValue.multiply(BigDecimal.valueOf(ratio)); |
|
|
|
|
|
|
|
curEntity.setUsedValue(String.valueOf(usedValue)); |
|
|
|
|
|
|
|
curEntity.setCalcValue(String.valueOf(calcValue)); |
|
|
|
|
|
|
|
hourList.add(curEntity); |
|
|
|
|
|
|
|
tempList.add(curEntity); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
return hourList; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
|
|
|
* 遍历数据分组,按照表号分组,再根据时间戳分组 |
|
|
|
|
|
|
|
* 分组最大只能做到日分组,无法做到月和年,需要另外一个函数实现 |
|
|
|
|
|
|
|
* |
|
|
|
|
|
|
|
* @param dataList 需要处理的数据集 |
|
|
|
|
|
|
|
* @param interval 时间间隔 |
|
|
|
|
|
|
|
* @return |
|
|
|
|
|
|
|
*/ |
|
|
|
|
|
|
|
private Map<String, Map<LocalDateTime, List<DeviceReportEntity>>> getMeterNumMap(List<DeviceReportEntity> dataList, |
|
|
|
|
|
|
|
TemporalUnit interval) { |
|
|
|
|
|
|
|
// 获取 MinuteData 对象集合,其中的时间戳是 java.util.Date 类型
|
|
|
|
|
|
|
|
//1:数据格式为先按照表号分组,设备类型里的数据再按照小时分组
|
|
|
|
|
|
|
|
//2:小时分组的数据就是DeviceReportEntity实体类
|
|
|
|
|
|
|
|
return dataList.stream() |
|
|
|
|
|
|
|
.map(data -> { |
|
|
|
|
|
|
|
LocalDateTime dateTime = LocalDateTime.ofInstant(data.getCurTime().toInstant(), ZoneId.systemDefault()); |
|
|
|
|
|
|
|
data.setLocalDateTime(dateTime); |
|
|
|
|
|
|
|
return data; |
|
|
|
|
|
|
|
}) |
|
|
|
|
|
|
|
.sorted(Comparator.comparing(DeviceReportEntity::getLocalDateTime)) |
|
|
|
|
|
|
|
.collect( |
|
|
|
|
|
|
|
//根据表号分组
|
|
|
|
|
|
|
|
Collectors.groupingBy(DeviceReportEntity::getDeviceNum, |
|
|
|
|
|
|
|
//根据时间间隔分组,后续是否需要根据减少分组
|
|
|
|
|
|
|
|
Collectors.groupingBy(report -> report.getLocalDateTime().truncatedTo(interval))) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/*** |
|
|
|
|
|
|
|
* 处理生成日月年报表data_day等表 |
|
|
|
|
|
|
|
* 一日处理一次数据 凌晨3点计算 |
|
|
|
|
|
|
|
*/ |
|
|
|
|
|
|
|
// @Scheduled(cron = "0 30 23 * * ?")
|
|
|
|
|
|
|
|
//@Scheduled(cron = "0 0/1 * * * ?")
|
|
|
|
|
|
|
|
public void dealDay2Year() { |
|
|
|
|
|
|
|
Map<String, Object> untreatedList = dataProcessService.queryUntreatedData("hour"); |
|
|
|
|
|
|
|
//需要处理的数据
|
|
|
|
|
|
|
|
List<DeviceReportEntity> dataList = (List<DeviceReportEntity>) untreatedList.get("data"); |
|
|
|
|
|
|
|
//处理完成后需要将数据设置为已处理,grade = 1,
|
|
|
|
|
|
|
|
Map<String, List<Long>> idMap = (Map<String, List<Long>>) untreatedList.get("idMap"); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
//<表号:<时间:List<集合>>> 按照日区分
|
|
|
|
|
|
|
|
Map<String, Map<LocalDateTime, List<DeviceReportEntity>>> dayMap = getMeterNumMap(dataList, ChronoUnit.DAYS); |
|
|
|
|
|
|
|
List<DeviceReportEntity> dayList = new ArrayList<>(); |
|
|
|
|
|
|
|
List<DeviceReportEntity> monthList = new ArrayList<>(); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// 使用流对每个内部的Map进行排序
|
|
|
|
|
|
|
|
dayMap.forEach((key, value) -> { |
|
|
|
|
|
|
|
Map<LocalDateTime, List<DeviceReportEntity>> sortedMap = value.entrySet().stream() |
|
|
|
|
|
|
|
.sorted(Map.Entry.comparingByKey()) |
|
|
|
|
|
|
|
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (oldValue, newValue) -> oldValue, LinkedHashMap::new)); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// 将排序后的Map更新回dayMap
|
|
|
|
|
|
|
|
dayMap.put(key, sortedMap); |
|
|
|
|
|
|
|
}); |
|
|
|
|
|
|
|
dayList = getDeviceReportEntities(dayMap, dayList, "day"); |
|
|
|
|
|
|
|
//存入日表
|
|
|
|
|
|
|
|
try { |
|
|
|
|
|
|
|
dataProcessService.batchInsertOrUpdate(dayList, "day"); |
|
|
|
|
|
|
|
} catch (Exception e) { |
|
|
|
|
|
|
|
log.error("日表数据表处理异常:" + e); |
|
|
|
|
|
|
|
e.printStackTrace(); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// 按照月区分
|
|
|
|
|
|
|
|
//Map<String, Map<LocalDateTime, List<DeviceReportEntity>>> monthMap = getMeterNumMap(dataList, ChronoUnit.MONTHS);
|
|
|
|
|
|
|
|
Map<String, Map<YearMonth, List<DeviceReportEntity>>> monthMap = getMeterNumMapByMonth(dataList, ChronoUnit.MONTHS); |
|
|
|
|
|
|
|
List<DeviceReportEntity> monthInsert = getMonthListData(monthList, monthMap); |
|
|
|
|
|
|
|
dataProcessService.batchInsertOrUpdate(monthInsert, "month"); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// 按照年区分
|
|
|
|
|
|
|
|
//Map<String, Map<YearMonth, List<DeviceReportEntity>>> yearMap = getMeterNumMapByMonth(dataList, ChronoUnit.YEARS);
|
|
|
|
|
|
|
|
//List<DeviceReportEntity> yearInsert = getMonthListData(monthList, yearMap);
|
|
|
|
|
|
|
|
dataProcessService.batchInsertOrUpdate(monthInsert, "year"); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
//TODO 修改分钟表的状态,grade=1
|
|
|
|
|
|
|
|
dataProcessService.batchUpdateGrade(idMap, "hour"); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
|
|
|
* 计算月和年报表 |
|
|
|
|
|
|
|
* |
|
|
|
|
|
|
|
* @param monthList |
|
|
|
|
|
|
|
* @param monthMap |
|
|
|
|
|
|
|
* @return |
|
|
|
|
|
|
|
*/ |
|
|
|
|
|
|
|
private List<DeviceReportEntity> getMonthListData(List<DeviceReportEntity> monthList, Map<String, Map<YearMonth, List<DeviceReportEntity>>> monthMap) { |
|
|
|
|
|
|
|
Set<Map.Entry<String, Map<YearMonth, List<DeviceReportEntity>>>> entries = monthMap.entrySet(); |
|
|
|
|
|
|
|
for (Map.Entry<String, Map<YearMonth, List<DeviceReportEntity>>> entry : entries) { |
|
|
|
|
|
|
|
String deviceNum = entry.getKey(); |
|
|
|
|
|
|
|
Map<YearMonth, List<DeviceReportEntity>> value = entry.getValue(); |
|
|
|
|
|
|
|
List<DeviceReportEntity> deviceList = new ArrayList<>(); |
|
|
|
|
|
|
|
List<DeviceReportEntity> monthInsert = new ArrayList<>(); |
|
|
|
|
|
|
|
List<DeviceReportEntity> tempList = new ArrayList<>(); |
|
|
|
|
|
|
|
Set<Map.Entry<YearMonth, List<DeviceReportEntity>>> monthEntries = value.entrySet(); |
|
|
|
|
|
|
|
for (Map.Entry<YearMonth, List<DeviceReportEntity>> monthEntry : monthEntries) { |
|
|
|
|
|
|
|
//月份
|
|
|
|
|
|
|
|
YearMonth key = monthEntry.getKey(); |
|
|
|
|
|
|
|
//集合
|
|
|
|
|
|
|
|
List<DeviceReportEntity> monthEntryValue = monthEntry.getValue(); |
|
|
|
|
|
|
|
//筛选出最大值和最小值
|
|
|
|
|
|
|
|
DeviceReportEntity max = Collections.max(monthEntryValue, Comparator.comparing(DeviceReportEntity::getLocalDateTime)); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
deviceList.add(max); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
deviceList.sort(Comparator.comparing(DeviceReportEntity::getLocalDateTime)); |
|
|
|
|
|
|
|
for (int i = 0; i < deviceList.size(); i++) { |
|
|
|
|
|
|
|
String lastValue; |
|
|
|
|
|
|
|
Date lastTime; |
|
|
|
|
|
|
|
if (i == 0) { |
|
|
|
|
|
|
|
DeviceReportEntity entity = deviceList.get(i); |
|
|
|
|
|
|
|
DeviceReportEntity lastEntity = dataProcessService.queryLastValue(deviceNum, "month"); |
|
|
|
|
|
|
|
if (ObjectUtils.isEmpty(lastEntity)) { |
|
|
|
|
|
|
|
lastValue = dataProcessService.queryInitValue(deviceNum); |
|
|
|
|
|
|
|
lastTime = entity.getCurTime(); |
|
|
|
|
|
|
|
} else { |
|
|
|
|
|
|
|
lastValue = lastEntity.getLastValue(); |
|
|
|
|
|
|
|
lastTime = lastEntity.getLastTime(); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
entity.setLastTime(lastTime); |
|
|
|
|
|
|
|
entity.setLastValue(lastValue); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
BigDecimal curValue = new BigDecimal(entity.getCurValue()); |
|
|
|
|
|
|
|
BigDecimal bigDecimal = new BigDecimal(entity.getLastValue()); |
|
|
|
|
|
|
|
int ratio = entity.getRatio(); |
|
|
|
|
|
|
|
BigDecimal usedValue = curValue.subtract(bigDecimal); |
|
|
|
|
|
|
|
entity.setUsedValue(usedValue.toString()); |
|
|
|
|
|
|
|
BigDecimal calcValue = usedValue.multiply(BigDecimal.valueOf(ratio)); |
|
|
|
|
|
|
|
entity.setCalcValue(calcValue.toString()); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
monthInsert.add(entity); |
|
|
|
|
|
|
|
tempList.add(entity); |
|
|
|
|
|
|
|
} else { |
|
|
|
|
|
|
|
//从上一条取当前读数和当前时间作为当前对象的上一次读数
|
|
|
|
|
|
|
|
int lastIndex = i - 1; |
|
|
|
|
|
|
|
DeviceReportEntity lastEntity = tempList.get(lastIndex); |
|
|
|
|
|
|
|
DeviceReportEntity curEntity = deviceList.get(i); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
curEntity.setLastTime(lastEntity.getCurTime()); |
|
|
|
|
|
|
|
curEntity.setLastValue(lastEntity.getCurValue()); |
|
|
|
|
|
|
|
BigDecimal curValue = new BigDecimal(curEntity.getCurValue()); |
|
|
|
|
|
|
|
BigDecimal bigDecimal = new BigDecimal(curEntity.getLastValue()); |
|
|
|
|
|
|
|
BigDecimal usedValue = curValue.subtract(bigDecimal); |
|
|
|
|
|
|
|
int ratio = lastEntity.getRatio(); |
|
|
|
|
|
|
|
curEntity.setRatio(ratio); |
|
|
|
|
|
|
|
BigDecimal calcValue = usedValue.multiply(BigDecimal.valueOf(ratio)); |
|
|
|
|
|
|
|
curEntity.setUsedValue(String.valueOf(usedValue)); |
|
|
|
|
|
|
|
curEntity.setCalcValue(String.valueOf(calcValue)); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
monthInsert.add(curEntity); |
|
|
|
|
|
|
|
tempList.add(curEntity); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
monthList.addAll(tempList); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return monthList; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
|
|
|
* 循环处理 |
|
|
|
|
|
|
|
* |
|
|
|
|
|
|
|
* @param dayMap |
|
|
|
|
|
|
|
* @param dayList |
|
|
|
|
|
|
|
* @param tableType |
|
|
|
|
|
|
|
* @return |
|
|
|
|
|
|
|
*/ |
|
|
|
|
|
|
|
private List<DeviceReportEntity> getDeviceReportEntities(Map<String, Map<LocalDateTime, List<DeviceReportEntity>>> dayMap, |
|
|
|
|
|
|
|
List<DeviceReportEntity> dayList, |
|
|
|
|
|
|
|
String tableType) { |
|
|
|
|
|
|
|
for (Map.Entry<String, Map<LocalDateTime, List<DeviceReportEntity>>> entry : dayMap.entrySet()) { |
|
|
|
|
|
|
|
//表号
|
|
|
|
|
|
|
|
String key = entry.getKey(); |
|
|
|
|
|
|
|
Map<LocalDateTime, List<DeviceReportEntity>> value = entry.getValue(); |
|
|
|
|
|
|
|
String deviceType = null; |
|
|
|
|
|
|
|
for (Map.Entry<LocalDateTime, List<DeviceReportEntity>> listEntry : value.entrySet()) { |
|
|
|
|
|
|
|
deviceType = listEntry.getValue().get(0).getDeviceType(); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
//取到的最大值
|
|
|
|
|
|
|
|
assert deviceType != null; |
|
|
|
|
|
|
|
List<Map<LocalDateTime, DeviceReportEntity>> dayMapList = |
|
|
|
|
|
|
|
ComputeEnum.get(Integer.parseInt(deviceType)).getDataList(entry); |
|
|
|
|
|
|
|
dayList = calcListData(dayList, key, tableType, dayMapList); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
return dayList; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
|
|
|
* 按照月、年分组 |
|
|
|
|
|
|
|
* |
|
|
|
|
|
|
|
* @param dataList |
|
|
|
|
|
|
|
* @param interval |
|
|
|
|
|
|
|
* @return |
|
|
|
|
|
|
|
*/ |
|
|
|
|
|
|
|
private Map<String, Map<YearMonth, List<DeviceReportEntity>>> getMeterNumMapByMonth(List<DeviceReportEntity> dataList, |
|
|
|
|
|
|
|
TemporalUnit interval) { |
|
|
|
|
|
|
|
return dataList.stream() |
|
|
|
|
|
|
|
.map(data -> { |
|
|
|
|
|
|
|
LocalDateTime dateTime = LocalDateTime.ofInstant(data.getCurTime().toInstant(), ZoneId.systemDefault()); |
|
|
|
|
|
|
|
data.setLocalDateTime(dateTime); |
|
|
|
|
|
|
|
return data; |
|
|
|
|
|
|
|
}) |
|
|
|
|
|
|
|
.collect(Collectors.groupingBy(DeviceReportEntity::getDeviceNum, |
|
|
|
|
|
|
|
Collectors.groupingBy( |
|
|
|
|
|
|
|
report -> YearMonth.from(report.getLocalDateTime()), |
|
|
|
|
|
|
|
LinkedHashMap::new, |
|
|
|
|
|
|
|
Collectors.toList() |
|
|
|
|
|
|
|
))); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
|
|
|
* 处理冷水机组数据获取只要实时数据的设备 |
|
|
|
|
|
|
|
*/ |
|
|
|
|
|
|
|
public void dealChillersData() { |
|
|
|
List<CollectionParamsManage> cacheList = redisCache.getCacheList("CHILLERS", CollectionParamsManage.class); |
|
|
|
List<CollectionParamsManage> cacheList = redisCache.getCacheList("CHILLERS", CollectionParamsManage.class); |
|
|
|
if (null == cacheList || cacheList.isEmpty()) { |
|
|
|
if (null == cacheList || cacheList.isEmpty()) { |
|
|
|
return; |
|
|
|
return; |
|
|
|