中央热水项目
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

208 lines
8.8 KiB

package com.mh.user.job;
import com.mh.user.constants.Constant;
import com.mh.user.entity.DeviceCodeParamEntity;
import com.mh.user.model.BuildingModel;
import com.mh.user.serialport.SerialPortThread;
import com.mh.user.service.BuildingService;
import com.mh.user.service.DealDataService;
import com.mh.user.service.HistoryDataPreService;
import com.mh.user.utils.CacheUtil;
import com.mh.user.utils.ComThreadPoolService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.StopWatch;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.stream.Collectors;
/**
* @author ljf
* @title :定时处理采集回来的历史数据
* @description :
* @updateTime 2020-07-28
* @throws :
*/
@Slf4j
@Component
public class DealDataJob {
private final DealDataService dealDataService;
private final BuildingService buildingService;
private final HistoryDataPreService historyDataPreService;
public DealDataJob(DealDataService dealDataService, BuildingService buildingService, HistoryDataPreService historyDataPreService) {
this.dealDataService = dealDataService;
this.buildingService = buildingService;
this.historyDataPreService = historyDataPreService;
}
ThreadPoolExecutor comThreadPool = ComThreadPoolService.getInstance();
/**
* 定时处理汇总数据:每15分钟处理一次,十分钟(0 0/10 * * * ?)
*/
@Scheduled(cron = "0 0/15 * * * ?")
public void ProEnergy() {
try {
SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:00:00");
Date date = new Date();
String curDate = sdf1.format(date);
String name = dealDataService.customName();
if (name != null
&& (name.contains(Constant.CUSTOM_NAME_HUAXIA))) {
dealDataService.proEnergy2(curDate);
} else {
dealDataService.proEnergy(curDate); //yyyy-MM-dd HH:00:00
}
dealDataService.proGatewayState(); //判断网关在线状态:在线或离线
log.info("进入定时调试数据库过程汇总数据!yyyy-MM-dd HH:00:00");
} catch (Exception e) {
log.error("定时处理数据汇总异常==>", e);
}
}
/**
* 采集
*/
@Scheduled(cron = "35 0/2 * * * ?")
// @Scheduled(cron = "0/10 * * * * ?") 0 0/5 * * * ?
// @Scheduled(cron = "0 0/5 * * * ?") //5分钟
public void collect() {
try {
log.info("------定时采集开始>>>>Constant.FLAG==" + Constant.FLAG + "------");
if (!Constant.FLAG) {
if (!Constant.WEB_FLAG) {
Constant.FLAG = true;
log.info("------Constant.WEB_FLAG==" + false + "------");
CacheUtil cacheUtil = CacheUtil.getInstance();
for (int i = 1; i <= 4; i++) {
if (Constant.WEB_FLAG) {
break;
}
String threadName;
if (i == 1 || i == 3) {
threadName = "1";
log.info("------采集水位、水温!" + i + "------");
} else if (i == 2) {
threadName = "2";
log.info("------采集水、电、运行状态!" + i + "------");
} else {
threadName = "3";
log.info("------采集设定温度、设定水位、故障状态!" + i + "------");
}
// 从缓存中获取到对应的采集内容
List<DeviceCodeParamEntity> deviceParamsByType = cacheUtil.getDeviceParamsByType(threadName);
// 分组data_com
Map<String, List<DeviceCodeParamEntity>> dataComMap = deviceParamsByType.stream().collect(Collectors.groupingBy(DeviceCodeParamEntity::getDataCom));
int batchSize = 10; // 定义一个批次大小
int index = 0;
for (int k = 0; k < dataComMap.size(); ) {
if (Constant.WEB_FLAG) {
break;
}
CountDownLatch countDownLatch = new CountDownLatch(Math.min(batchSize, dataComMap.size() - k));
index = k;
for (int j = 0; j < Math.min(batchSize, dataComMap.size() - k); j++) {
if (Constant.WEB_FLAG) {
break;
}
// 获取data_com口
String dataCom = dataComMap.keySet().toArray(new String[0])[index];
SerialPortThread myThread = new SerialPortThread();
myThread.setName(threadName, dataCom.toLowerCase().replace("com", ""), countDownLatch);
Thread thread = new Thread(myThread);
comThreadPool.execute(thread);
index++;
}
// 等待执行完成
countDownLatch.await();
// 释放资源
countDownLatch = null;
// 检查是否需要继续创建新的线程(未达到数据总数量且WEB_FLAG为false)
if (k + batchSize < dataComMap.size() && !Constant.WEB_FLAG) {
k += batchSize;
} else {
break;
}
}
}
}
}
} catch (Exception e) {
log.error("定时采集异常==>", e);
} finally {
Constant.FLAG = false;
log.info("------定时采集结束>>>>Constant.FLAG==" + Constant.FLAG + "------");
}
}
/**
* 定时处理数据:每十五分钟处理一次
*/
@Scheduled(cron = "0 0/15 * * * ?")
public void dealData() {
try {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd");
Date date = new Date();
String curDate = sdf1.format(date);
String name = dealDataService.customName();
if (name != null
&& (name.contains(Constant.CUSTOM_NAME_HUAXIA))) {
dealDataService.proEnergySum2(curDate);
} else {
dealDataService.proEnergySum(curDate);
}
dealDataService.proMaintainSum(curDate); //汇总维修数
dealDataService.proAlarmManage(curDate); //报警信息
dealDataService.proAlarmInfoSum(curDate); //汇总相关警报数
dealDataService.proAnalysisMonth(curDate);
dealDataService.proAnalysisYear(curDate);
dealDataService.proDeviceState(curDate); //汇总设备状态
dealDataService.proTotalPumpMinutes(curDate); //统计周\月热泵运行时长
log.info("进入定时调试数据库过程汇总数据!yyyy-MM-dd");
// 开始预测数据
List<BuildingModel> buildingModels = buildingService.selectBuildingName();
for (BuildingModel buildingModel : buildingModels) {
String buildingId = String.valueOf(buildingModel.getBuildingId());
try {
// 训练数据
historyDataPreService.startTrainData(buildingId);
// 预测数据
historyDataPreService.startPredictData(buildingId, curDate);
} catch (Exception e) {
log.error("定时处理数据以及预测数据异常==>", e);
}
}
stopWatch.stop();
log.info("定时处理数据以及预测数据结束!耗时:" + stopWatch.getTotalTimeSeconds() + "秒");
} catch (Exception e) {
log.error("定时处理数据异常==>", e);
}
}
/**
* 定时删除历史流水记录(删除前三个月的记录)
*/
// @Scheduled(cron = "0 0 0 1 1/1 ? ")
// public void deleteDataHistory() {
// try {
// dealDataService.deleteChillersDataHistory();
// } catch (Exception e) {
// e.printStackTrace();
// }
// }
}