package com.mh.user.job; import com.mh.user.constants.Constant; import com.mh.user.entity.DeviceCodeParamEntity; import com.mh.user.model.BuildingModel; import com.mh.user.serialport.SerialPortThread; import com.mh.user.service.BuildingService; import com.mh.user.service.DealDataService; import com.mh.user.service.HistoryDataPreService; import com.mh.user.utils.CacheUtil; import com.mh.user.utils.ComThreadPoolService; import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.springframework.util.StopWatch; import java.text.SimpleDateFormat; import java.util.Date; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ThreadPoolExecutor; import java.util.stream.Collectors; /** * @author ljf * @title :定时处理采集回来的历史数据 * @description : * @updateTime 2020-07-28 * @throws : */ @Slf4j @Component public class DealDataJob { private final DealDataService dealDataService; private final BuildingService buildingService; private final HistoryDataPreService historyDataPreService; public DealDataJob(DealDataService dealDataService, BuildingService buildingService, HistoryDataPreService historyDataPreService) { this.dealDataService = dealDataService; this.buildingService = buildingService; this.historyDataPreService = historyDataPreService; } ThreadPoolExecutor comThreadPool = ComThreadPoolService.getInstance(); /** * 定时处理汇总数据:每15分钟处理一次,十分钟(0 0/10 * * * ?) */ // @Scheduled(cron = "0 0/15 * * * ?") public void ProEnergy() { try { SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:00:00"); Date date = new Date(); String curDate = sdf1.format(date); String name = dealDataService.customName(); if (name != null && (name.contains(Constant.CUSTOM_NAME_HUAXIA))) { dealDataService.proEnergy2(curDate); } else { dealDataService.proEnergy(curDate); //yyyy-MM-dd HH:00:00 } dealDataService.proGatewayState(); //判断网关在线状态:在线或离线 log.info("进入定时调试数据库过程汇总数据!yyyy-MM-dd HH:00:00"); } catch (Exception e) { log.error("定时处理数据汇总异常==>", e); } } /** * 采集 */ @Scheduled(cron = "35 0/2 * * * ?") // @Scheduled(cron = "0/10 * * * * ?") 0 0/5 * * * ? // @Scheduled(cron = "0 0/5 * * * ?") //5分钟 public void collect() { try { log.info("------定时采集开始>>>>Constant.FLAG==" + Constant.FLAG + "------"); if (!Constant.FLAG) { if (!Constant.WEB_FLAG) { Constant.FLAG = true; log.info("------Constant.WEB_FLAG==" + false + "------"); CacheUtil cacheUtil = CacheUtil.getInstance(); for (int i = 1; i <= 4; i++) { if (Constant.WEB_FLAG) { break; } String threadName; if (i == 1 || i == 3) { threadName = "1"; log.info("------采集水位、水温!" + i + "------"); } else if (i == 2) { threadName = "2"; log.info("------采集水、电、运行状态!" + i + "------"); } else { threadName = "3"; log.info("------采集设定温度、设定水位、故障状态!" + i + "------"); } // 从缓存中获取到对应的采集内容 List deviceParamsByType = cacheUtil.getDeviceParamsByType(threadName); // 分组data_com Map> dataComMap = deviceParamsByType.stream().collect(Collectors.groupingBy(DeviceCodeParamEntity::getDataCom)); int batchSize = 10; // 定义一个批次大小 int index = 0; for (int k = 0; k < dataComMap.size(); ) { if (Constant.WEB_FLAG) { break; } CountDownLatch countDownLatch = new CountDownLatch(Math.min(batchSize, dataComMap.size() - k)); index = k; for (int j = 0; j < Math.min(batchSize, dataComMap.size() - k); j++) { if (Constant.WEB_FLAG) { break; } // 获取data_com口 String dataCom = dataComMap.keySet().toArray(new String[0])[index]; SerialPortThread myThread = new SerialPortThread(); myThread.setName(threadName, dataCom.toLowerCase().replace("com", ""), countDownLatch); Thread thread = new Thread(myThread); comThreadPool.execute(thread); index++; } // 等待执行完成 countDownLatch.await(); // 释放资源 countDownLatch = null; // 检查是否需要继续创建新的线程(未达到数据总数量且WEB_FLAG为false) if (k + batchSize < dataComMap.size() && !Constant.WEB_FLAG) { k += batchSize; } else { break; } } } } } } catch (Exception e) { log.error("定时采集异常==>", e); } finally { Constant.FLAG = false; log.info("------定时采集结束>>>>Constant.FLAG==" + Constant.FLAG + "------"); } } /** * 定时处理数据:每十五分钟处理一次 */ // @Scheduled(cron = "0 0/15 * * * ?") public void dealData() { try { StopWatch stopWatch = new StopWatch(); stopWatch.start(); SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd"); Date date = new Date(); String curDate = sdf1.format(date); String name = dealDataService.customName(); if (name != null && (name.contains(Constant.CUSTOM_NAME_HUAXIA))) { dealDataService.proEnergySum2(curDate); } else { dealDataService.proEnergySum(curDate); } dealDataService.proMaintainSum(curDate); //汇总维修数 dealDataService.proAlarmManage(curDate); //报警信息 dealDataService.proAlarmInfoSum(curDate); //汇总相关警报数 dealDataService.proAnalysisMonth(curDate); dealDataService.proAnalysisYear(curDate); dealDataService.proDeviceState(curDate); //汇总设备状态 dealDataService.proTotalPumpMinutes(curDate); //统计周\月热泵运行时长 log.info("进入定时调试数据库过程汇总数据!yyyy-MM-dd"); // 开始预测数据 List buildingModels = buildingService.selectBuildingName(); for (BuildingModel buildingModel : buildingModels) { String buildingId = String.valueOf(buildingModel.getBuildingId()); try { // 训练数据 historyDataPreService.startTrainData(buildingId); // 预测数据 historyDataPreService.startPredictData(buildingId, curDate); } catch (Exception e) { log.error("定时处理数据以及预测数据异常==>", e); } } stopWatch.stop(); log.info("定时处理数据以及预测数据结束!耗时:" + stopWatch.getTotalTimeSeconds() + "秒"); } catch (Exception e) { log.error("定时处理数据异常==>", e); } } /** * 定时删除历史流水记录(删除前三个月的记录) */ // @Scheduled(cron = "0 0 0 1 1/1 ? ") // public void deleteDataHistory() { // try { // dealDataService.deleteChillersDataHistory(); // } catch (Exception e) { // e.printStackTrace(); // } // } }