package com.mh.quartz.task; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONObject; import com.fasterxml.jackson.databind.ObjectMapper; import com.mh.common.constant.Constants; import com.mh.common.core.domain.entity.CollectionParamsManage; import com.mh.common.model.request.AdvantechDatas; import com.mh.common.model.request.AdvantechReceiver; import com.mh.common.utils.DateUtils; import com.mh.common.utils.http.HttpUtils; import com.mh.framework.rabbitmq.producer.SendMsgByTopic; import com.mh.system.service.device.ICollectionParamsManageService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.math.BigDecimal; import java.util.ArrayList; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** * @author LJF * @version 1.0 * @project EEMCS * @description 获取第三方数据定时任务 * @date 2026-06-05 10:57:49 */ @Slf4j @Component("getOtherSysDataJob") public class GetOtherSysDataJob { @Autowired private ICollectionParamsManageService collectionParamsManageService; @Autowired private ObjectMapper mapper; @Autowired private SendMsgByTopic sendMsgByTopic; // 创建线程池用于并发请求 private static final ExecutorService executor = Executors.newFixedThreadPool(2, r -> { Thread thread = new Thread(r); thread.setName("BSD-Data-Fetcher"); thread.setDaemon(true); return thread; }); //TODO 机房总电表 private final String[] totalMeters = new String[]{ "Power", // 机房实时功率 "PowerTotal" // 机房累计读数 }; //TODO 主机总电表 private final String[] powerMeters = new String[]{ "ChPower", // 主机实时功率 "PowerTotal_CH" // 主机累计读数 }; //TODO 冷冻泵总电表 private final String[] freezePumpMeters = new String[]{ "PriChWPPower", // 冷冻泵实时功率 "PowerTotal_ChWPump" // 冷冻泵累计读数 }; //TODO 冷却泵总电表 private final String[] coolPumpMeters = new String[]{ "CWPPower", // 冷却泵实时功率 "PowerTotal_CWPump" // 冷却泵累计读数 }; //TODO 冷却塔总电表 private final String[] towerMeters = new String[]{ "CTPower", // 冷却塔实时功率 "PowerTotal_CT" // 冷却塔累计读数 }; public void getBSDData(String realUrl, String realValuesUrl) { log.info("获取BSD数据"); try { // 获取实时读数\累计读数的所有点位 HashMap params = new HashMap<>(); params.put("systemType", "0"); params.put("paramType", "16"); List collectionParamsManages = collectionParamsManageService.selectListByParams(params); // 分离grade=40(累计读数)和grade=140(实时功率)的点位 List cumulativeMtCodes = new ArrayList<>(); // grade=40 List powerMtCodes = new ArrayList<>(); // grade=140 HashMap mtCodeToOtherNameMap = new HashMap<>(); // mtCode到otherName的映射 for (CollectionParamsManage param : collectionParamsManages) { if (param.getGrade() != null && param.getMtCode() != null) { // 建立mtCode到otherName的映射关系 if (param.getOtherName() != null && !param.getOtherName().isEmpty()) { mtCodeToOtherNameMap.put(param.getMtCode(), param.getOtherName()); } // 判断如果等于totalMeters、powerMeters、freezePumpMeters、coolPumpMeters、towerMeters的点位,就下一个循环 if (totalMeters[0].equals(param.getMtCode()) || totalMeters[1].equals(param.getMtCode()) || powerMeters[0].equals(param.getMtCode()) || powerMeters[1].equals(param.getMtCode()) || freezePumpMeters[0].equals(param.getMtCode()) || freezePumpMeters[1].equals(param.getMtCode()) || coolPumpMeters[0].equals(param.getMtCode()) || coolPumpMeters[1].equals(param.getMtCode()) || towerMeters[0].equals(param.getMtCode()) || towerMeters[1].equals(param.getMtCode())) { continue; } if (param.getGrade() == 40) { cumulativeMtCodes.add(param.getMtCode()); } else if (param.getGrade() == 140) { powerMtCodes.add(param.getMtCode()); } } } List allAdvantechDatas = new ArrayList<>(); // 使用CompletableFuture并发请求累计读数和功率读数 CompletableFuture> cumulativeFuture = null; CompletableFuture> powerFuture = null; // 并发请求累计读数 (grade=40) if (!cumulativeMtCodes.isEmpty()) { final List finalCumulativeMtCodes = new ArrayList<>(cumulativeMtCodes); final HashMap finalMtCodeToOtherNameMap = new HashMap<>(mtCodeToOtherNameMap); cumulativeFuture = CompletableFuture.supplyAsync(() -> { log.info("[线程-{}] 请求累计读数,点位数量: {}", Thread.currentThread().getName(), finalCumulativeMtCodes.size()); // http://192.168.1.80:5002/data/get_real return fetchCumulativeData(finalCumulativeMtCodes, finalMtCodeToOtherNameMap, realUrl); }, executor); } // 并发请求功率读数 (grade=140) if (!powerMtCodes.isEmpty()) { final List finalPowerMtCodes = new ArrayList<>(powerMtCodes); final HashMap finalMtCodeToOtherNameMap = new HashMap<>(mtCodeToOtherNameMap); powerFuture = CompletableFuture.supplyAsync(() -> { log.info("[线程-{}] 请求功率读数,点位数量: {}", Thread.currentThread().getName(), finalPowerMtCodes.size()); // http://192.168.1.80:5002/data/get_real_values return fetchPowerData(finalPowerMtCodes, finalMtCodeToOtherNameMap, realValuesUrl); }, executor); } // 等待两个任务完成并合并结果 try { if (cumulativeFuture != null) { List cumulativeDatas = cumulativeFuture.get(30, TimeUnit.SECONDS); // 累计读数 // cumulativeDatas.遍历计算得出机房总累计读数、主机总累计读数、冷冻泵总累计读数、冷却泵总累计读数、冷却塔总累计读数 // 新建一个主机总电表的AdvantechDatas对象,tag为mtCodeToOtherNameMap,key值为"PowerTotal_CH"的value值,quality为0,value为cumulativeDatas中tag包含"主机*累计读数"的value和值 // 新建一个冷冻泵总电表的AdvantechDatas对象,tag为mtCodeToOtherNameMap,key值为"PowerTotal_ChWPump"的value值,quality为0,value为cumulativeDatas中tag包含"冷冻泵*累计读数"的value和值 // 新建一个冷却泵总电表的AdvantechDatas对象,tag为mtCodeToOtherNameMap,key值为"PowerTotal_CWPump"的value值,quality为0,value为cumulativeDatas中tag包含"冷却泵*累计读数"的value和值 // 新建一个冷却塔总电表的AdvantechDatas对象,tag为mtCodeToOtherNameMap,key值为"PowerTotal_CT"的value值,quality为0,value为cumulativeDatas中tag包含"冷却塔*累计读数"的value和值 // 新建一个机房总电表的AdvantechDatas对象,tag为mtCodeToOtherNameMap,key值为"PowerTotal"的value值,quality为0,value为cumulativeDatas中tag包含"累计读数"的value和值 // 瞬时功率 // cumulativeDatas.遍历计算得出机房总瞬时功率、主机总瞬时功率、冷冻泵总瞬时功率、冷却泵总瞬时功率、冷却塔总瞬时功率 // 新建一个主机总电表的AdvantechDatas对象,tag为mtCodeToOtherNameMap,key值为"ChPower"的value值,quality为0,value为cumulativeDatas中tag包含"主机*瞬时功率"的value和值 // 新建一个冷冻泵总电表的AdvantechDatas对象,tag为mtCodeToOtherNameMap,key值为"PriChWPPower"的value值,quality为0,value为cumulativeDatas中tag包含"冷冻泵*瞬时功率"的value和值 // 新建一个冷却泵总电表的AdvantechDatas对象,tag为mtCodeToOtherNameMap,key值为"CWPPower"的value值,quality为0,value为cumulativeDatas中tag包含"冷却泵*瞬时功率"的value和值 // 新建一个冷却塔总电表的AdvantechDatas对象,tag为mtCodeToOtherNameMap,key值为"CTPower"的value值,quality为0,value为cumulativeDatas中tag包含"冷却塔*瞬时功率"的value和值 // 新建一个机房总电表的AdvantechDatas对象,tag为mtCodeToOtherNameMap,key值为"Power"的value值,quality为0,value为cumulativeDatas中tag包含"瞬时功率"的value和值 // 再插入到cumulativeDatas中 try { // 计算汇总数据并添加到cumulativeDatas calculateAndAddSummaryData(cumulativeDatas, mtCodeToOtherNameMap); } catch (Exception e) { log.error("累计读数汇总计算异常,但不影响后续流程", e); } allAdvantechDatas.addAll(cumulativeDatas); log.info("累计读数获取完成,数据点数: {}", cumulativeDatas.size()); } if (powerFuture != null) { List powerDatas = powerFuture.get(30, TimeUnit.SECONDS); allAdvantechDatas.addAll(powerDatas); // 实时功率 log.info("功率读数获取完成,数据点数: {}", powerDatas.size()); } } catch (Exception e) { log.error("等待异步任务完成时异常", e); } // 封装成研华网关格式并发送到消息队列 if (!allAdvantechDatas.isEmpty()) { AdvantechReceiver advantechReceiver = new AdvantechReceiver(); advantechReceiver.setTs(DateUtils.dateToString(new Date(), Constants.DATE_FORMAT)); advantechReceiver.setD(allAdvantechDatas); String message = JSONObject.toJSONString(advantechReceiver); log.info("发送BSD数据到消息队列,数据点数: {}", allAdvantechDatas.size()); sendMsgByTopic.sendToDeviceMQ(message); log.info("BSD数据发送成功"); } else { log.warn("没有获取到任何BSD数据"); } } catch (Exception e) { log.error("获取BSD数据异常", e); } } /** * 计算并添加汇总数据到cumulativeDatas * @param cumulativeDatas 累计读数数据列表 * @param mtCodeToOtherNameMap mtCode到otherName的映射关系 */ private void calculateAndAddSummaryData(List cumulativeDatas, HashMap mtCodeToOtherNameMap) { // 定义汇总类型枚举 enum SummaryType { CH_CUMULATIVE("主机", "累计读数", "PowerTotal_CH", "主机总累计读数"), CHWP_CUMULATIVE("冷冻泵", "累计读数", "PowerTotal_ChWPump", "冷冻泵总累计读数"), CWP_CUMULATIVE("冷却泵", "累计读数", "PowerTotal_CWPump", "冷却泵总累计读数"), CT_CUMULATIVE("冷却塔", "累计读数", "PowerTotal_CT", "冷却塔总累计读数"), ALL_CUMULATIVE(null, "累计读数", "PowerTotal", "机房总累计读数"), CH_POWER("主机", "瞬时功率", "ChPower", "主机总瞬时功率"), CHWP_POWER("冷冻泵", "瞬时功率", "PriChWPPower", "冷冻泵总瞬时功率"), CWP_POWER("冷却泵", "瞬时功率", "CWPPower", "冷却泵总瞬时功率"), CT_POWER("冷却塔", "瞬时功率", "CTPower", "冷却塔总瞬时功率"), ALL_POWER(null, "瞬时功率", "Power", "机房总瞬时功率"); private final String deviceKeyword; private final String dataKeyword; private final String mtCode; private final String description; SummaryType(String deviceKeyword, String dataKeyword, String mtCode, String description) { this.deviceKeyword = deviceKeyword; this.dataKeyword = dataKeyword; this.mtCode = mtCode; this.description = description; } } // 一次性遍历cumulativeDatas,分类累加各类型数据 Map summaryMap = new HashMap<>(); // 初始化所有汇总类型 for (SummaryType type : SummaryType.values()) { summaryMap.put(type, BigDecimal.ZERO); } for (AdvantechDatas data : cumulativeDatas) { if (data.getTag() == null || data.getValue() == null) { continue; } String tag = data.getTag(); BigDecimal value = new BigDecimal(data.getValue().toString()); // 遍历所有汇总类型进行匹配 for (SummaryType type : SummaryType.values()) { boolean matchesDataKeyword = tag.contains(type.dataKeyword); boolean matchesDeviceKeyword = type.deviceKeyword == null || tag.contains(type.deviceKeyword); if (matchesDataKeyword && matchesDeviceKeyword) { summaryMap.merge(type, value, BigDecimal::add); } } } // 创建并添加汇总数据点 int successCount = 0; for (SummaryType type : SummaryType.values()) { try { String targetTag = mtCodeToOtherNameMap.get(type.mtCode); if (targetTag != null && !targetTag.isEmpty()) { AdvantechDatas summaryData = new AdvantechDatas(); summaryData.setTag(targetTag); summaryData.setValue(summaryMap.get(type)); summaryData.setQuality(0); cumulativeDatas.add(summaryData); log.debug("添加{}: {} = {}", type.description, targetTag, summaryMap.get(type)); successCount++; } } catch (Exception e) { log.error("添加{}失败", type.description, e); } } log.info("累计读数汇总完成,成功添加{}/{}个汇总数据点", successCount, SummaryType.values().length); } /** * 获取累计读数数据 * @param mtCodes 点位编码列表 * @param mtCodeToOtherNameMap mtCode到otherName的映射关系 * @param realUrl 请求URL * @return 解析后的数据列表 */ private List fetchCumulativeData(List mtCodes, HashMap mtCodeToOtherNameMap, String realUrl) { List dataList = new ArrayList<>(); try { JSONObject cumulativeRequest = new JSONObject(); cumulativeRequest.put("names", mtCodes); JSONArray pointProperty = new JSONArray(); pointProperty.add("point_value"); cumulativeRequest.put("point_property", pointProperty); cumulativeRequest.put("proj_id", "1"); String cumulativeResponse = HttpUtils.sendPost(realUrl, cumulativeRequest.toJSONString()); log.info("[线程-{}] 累计读数响应长度: {}", Thread.currentThread().getName(), cumulativeResponse != null ? cumulativeResponse.length() : 0); if (cumulativeResponse != null && !cumulativeResponse.isEmpty()) { JSONObject cumulativeJson = JSON.parseObject(cumulativeResponse); if (cumulativeJson.getBooleanValue("success") && cumulativeJson.containsKey("data")) { JSONObject dataObj = cumulativeJson.getJSONObject("data"); for (String mtCode : mtCodes) { if (dataObj.containsKey(mtCode)) { JSONArray valueArray = dataObj.getJSONArray(mtCode); if (valueArray != null && !valueArray.isEmpty()) { String valueStr = valueArray.getString(0); try { BigDecimal value = new BigDecimal(valueStr); AdvantechDatas datas = new AdvantechDatas(); // 使用otherName作为tag,如果不存在则使用mtCode String tag = mtCodeToOtherNameMap.getOrDefault(mtCode, mtCode); datas.setTag(tag); datas.setValue(value); datas.setQuality(0); dataList.add(datas); } catch (NumberFormatException e) { log.error("解析累计读数失败,mtCode: {}, value: {}", mtCode, valueStr, e); } } } } } } } catch (Exception e) { log.error("获取累计读数异常", e); } return dataList; } /** * 获取功率读数数据 * * @param mtCodes 点位编码列表 * @param mtCodeToOtherNameMap mtCode到otherName的映射关系 * @param realValuesUrl 请求URL * @return 解析后的数据列表 */ private List fetchPowerData(List mtCodes, HashMap mtCodeToOtherNameMap, String realValuesUrl) { List dataList = new ArrayList<>(); try { JSONObject powerRequest = new JSONObject(); powerRequest.put("expresss", mtCodes); powerRequest.put("proj_id", "1"); String powerResponse = HttpUtils.sendPost(realValuesUrl, powerRequest.toJSONString()); log.info("[线程-{}] 功率读数响应长度: {}", Thread.currentThread().getName(), powerResponse != null ? powerResponse.length() : 0); if (powerResponse != null && !powerResponse.isEmpty()) { JSONObject powerJson = JSON.parseObject(powerResponse); if (powerJson.getBooleanValue("success") && powerJson.containsKey("data")) { JSONObject dataObj = powerJson.getJSONObject("data"); for (String mtCode : mtCodes) { if (dataObj.containsKey(mtCode)) { String valueStr = dataObj.getString(mtCode); try { BigDecimal value = new BigDecimal(valueStr); AdvantechDatas datas = new AdvantechDatas(); // 使用otherName作为tag,如果不存在则使用mtCode String tag = mtCodeToOtherNameMap.getOrDefault(mtCode, mtCode); datas.setTag(tag); datas.setValue(value); datas.setQuality(0); dataList.add(datas); } catch (NumberFormatException e) { log.error("解析功率读数失败,mtCode: {}, value: {}", mtCode, valueStr, e); } } } } } } catch (Exception e) { log.error("获取功率读数异常", e); } return dataList; } }