package com.mh.user.job; import com.mh.user.constants.Constant; import com.mh.user.entity.CollectionParamsManageEntity; import com.mh.user.entity.DeviceInstallEntity; import com.mh.user.entity.GatewayManageEntity; import com.mh.user.mapper.CollectionParamsManageMapper; import com.mh.user.mapper.DeviceInstallMapper; import com.mh.user.mapper.GatewayManageMapper; import com.mh.user.mapper.NowDataMapper; import com.mh.user.s7.S7ConnectorUtil; import com.mh.user.utils.DateUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.math.BigDecimal; import java.math.RoundingMode; import java.util.Date; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; /** * S7 PLC定时采集任务 * 支持M、VB、VW、VD等地址类型的读写操作 * * @author System * @date 2026-06-23 */ @Slf4j @Component public class S7PlcCollectionJob { private final GatewayManageMapper gatewayManageMapper; private final CollectionParamsManageMapper collectionParamsManageMapper; // 缓存S7连接器,避免频繁创建连接 private static final Map connectorCache = new ConcurrentHashMap<>(); private final DeviceInstallMapper deviceInstallMapper; private final NowDataMapper nowDataMapper; public S7PlcCollectionJob(GatewayManageMapper gatewayManageMapper, CollectionParamsManageMapper collectionParamsManageMapper, DeviceInstallMapper deviceInstallMapper, NowDataMapper nowDataMapper, NowDataMapper nowDataMapper1) { this.gatewayManageMapper = gatewayManageMapper; this.collectionParamsManageMapper = collectionParamsManageMapper; this.deviceInstallMapper = deviceInstallMapper; this.nowDataMapper = nowDataMapper1; } /** * 定时采集S7 PLC数据 * 每5分钟执行一次,可根据实际需求调整 * 优先处理手动操作:如果Constant.WEB_PLC_FLAG为true,则跳过本次采集 */ @Scheduled(cron = "0 0/5 * * * ?") public void collectS7Data() { log.info("------S7 PLC定时采集开始>>>>Constant.FLAG=={}------", Constant.PLC_FLAG); try { // 检查是否有手动操作正在进行 if (Constant.PLC_FLAG || Constant.WEB_PLC_FLAG) { log.info("存在手动操作,跳过本次S7 PLC采集"); return; } Constant.PLC_FLAG = true; // 查询所有在线的S7网关 List s7Gateways = gatewayManageMapper.queryS7Gateways(); if (s7Gateways == null || s7Gateways.isEmpty()) { log.info("未找到在线的S7网关"); return; } log.info("找到{}个在线S7网关", s7Gateways.size()); // 遍历每个S7网关 for (GatewayManageEntity gateway : s7Gateways) { try { processGateway(gateway); } catch (Exception e) { log.error("处理S7网关异常: gatewayName={}, dataCom={}", gateway.getGatewayName(), gateway.getDataCom(), e); } } } catch (Exception e) { log.error("S7 PLC定时采集异常", e); } finally { Constant.PLC_FLAG = false; log.info("------S7 PLC定时采集结束>>>>Constant.FLAG=={}------", Constant.PLC_FLAG); } } /** * 处理单个S7网关的数据采集 */ private void processGateway(GatewayManageEntity gateway) { String dataCom = gateway.getDataCom(); if (dataCom == null || dataCom.isEmpty()) { log.warn("网关dataCom为空: {}", gateway.getGatewayName()); return; } // 获取或创建S7连接器 S7ConnectorUtil connector = getOrCreateConnector(gateway); if (connector == null) { log.error("无法创建S7连接器: {}", gateway.getGatewayName()); return; } // 查询该网关对应的采集参数 List params = collectionParamsManageMapper.selectCPMByDataCom(dataCom); if (params == null || params.isEmpty()) { log.info("网关{}没有配置采集参数", gateway.getGatewayName()); return; } log.info("开始采集网关{}的{}个点位", gateway.getGatewayName(), params.size()); // 过滤掉重复的采集参数 params = params.stream().distinct().collect(Collectors.toList()); String dateStr = DateUtil.dateToString(new Date(), "yyyy-MM-dd HH:mm:ss"); // 遍历采集参数并读取数据 for (CollectionParamsManageEntity param : params) { try { // 再次检查是否有手动操作 if (Constant.WEB_PLC_FLAG) { log.info("检测到手动操作,中断采集"); break; } readAndSaveData(connector, param, dateStr); } catch (Exception e) { log.error("采集点位异常: registerAddr={}, otherName={}", param.getRegisterAddr(), param.getOtherName(), e); } } // 遍历完全之后更新回水状态,因为有多箱和单箱电磁阀 // params遍历得出多少个buildingId分组 List buildingIds = params.stream().map(CollectionParamsManageEntity::getBuildingId).distinct().collect(Collectors.toList()); for (Long buildingId : buildingIds) { List> backWaterStates = collectionParamsManageMapper.selectBackWaterState(buildingId); // map值有cur_value,cur_time,通过stream判断cur_time是否是当前时间,然后cur_value如果存在一天记录等于1的,back_water_state=运行,否则back_water_state=不运行 backWaterStates.forEach(backWaterState -> { if (backWaterState.get("cur_time").toString().substring(0, 10).equals(dateStr.substring(0, 10))) { if (backWaterState.get("cur_value").equals(1)) { nowDataMapper.updateBackWaterState(buildingId, "运行"); } else { nowDataMapper.updateBackWaterState(buildingId, "不运行"); } } }); } } /** * 读取并保存数据 */ private void readAndSaveData(S7ConnectorUtil connector, CollectionParamsManageEntity param, String dateStr) { String registerAddr = param.getRegisterAddr(); if (registerAddr == null || registerAddr.isEmpty()) { log.warn("点位寄存器地址为空: id={}", param.getId()); return; } // 读取数据 Object value = connector.readData(registerAddr); DeviceInstallEntity deviceInstallEntity = deviceInstallMapper.selectDeviceById(param.getDeviceInstallId()); if (value == null) { log.warn("读取数据为空: registerAddr={}", registerAddr); // 更新deviceInstall离线 deviceInstallMapper.updateNotOnlineById(param.getDeviceInstallId()); // 更新now_data离线 nowDataMapper.updateRunState(deviceInstallEntity.getBuildingId(), deviceInstallEntity.getDeviceAddr(), "2", deviceInstallEntity.getDeviceName()); // 在判断设备类型,如果是供水泵,up_water_state=运行 // 如果是补水电磁阀开,use_water_state=运行, // 如果是单箱电磁阀或者多箱电磁阀开,back_water_state=运行 if (deviceInstallEntity.getDeviceType().equals("供水泵")) { nowDataMapper.updateUpWaterState(deviceInstallEntity.getBuildingId(), deviceInstallEntity.getDeviceAddr(), "2", deviceInstallEntity.getDeviceName()); } else if (deviceInstallEntity.getDeviceType().equals("补水电磁阀")) { nowDataMapper.updateUseWaterState(deviceInstallEntity.getBuildingId(), deviceInstallEntity.getDeviceAddr(), "2", deviceInstallEntity.getDeviceName()); } return; } // 转换值为BigDecimal BigDecimal curValue; try { // BigDecimal 构造函数可以直接处理 String 和 Number.toString() curValue = new BigDecimal(value.toString()); } catch (NumberFormatException e) { log.warn("无法转换为数字: registerAddr={}, value={}", "", value); return; } // 应用倍率和初始值 if (param.getMtRatio() != null && param.getMtRatio() != 1) { curValue = curValue.multiply(new BigDecimal(param.getMtRatio())); } if (param.getMtInitValue() != null) { curValue = curValue.add(param.getMtInitValue()); } // 应用小数点位数 if (param.getDigits() != null && param.getDigits() > 0) { curValue = curValue.setScale(param.getDigits(), BigDecimal.ROUND_HALF_UP); } // 更新数据库 collectionParamsManageMapper.updateCollectionParamsManage( param.getDeviceInstallId().intValue(), registerAddr, curValue.toString(), dateStr, param.getBuildingId() != null ? param.getBuildingId().toString() : null ); // 在同步更新device_install表 deviceInstallMapper.updateLastValueByOtherParam(param.getDeviceInstallId(), curValue.toString()); if (deviceInstallEntity != null) { // 更新设备安装表中的now_date字段,根据param.getParamTypeId()的值进行判断 // 在对now_date进行更新 // 查询当前点位是否是运行状态、压力、液位、液位设置、回水温度、故障状态 log.error("进入nowData设置==>{}", param.toString()); switch(param.getParamTypeId()) { case 2: // 运行状态 nowDataMapper.updateRunState(deviceInstallEntity.getBuildingId(), deviceInstallEntity.getDeviceAddr(), String.valueOf(curValue.intValue()), deviceInstallEntity.getDeviceName()); // 在判断设备类型,如果是供水泵,up_water_state=运行 // 如果是补水电磁阀开,use_water_state=运行, // 如果是单箱电磁阀或者多箱电磁阀开,back_water_state=运行 if (deviceInstallEntity.getDeviceType().equals("供水泵")) { nowDataMapper.updateUpWaterState(deviceInstallEntity.getBuildingId(), deviceInstallEntity.getDeviceAddr(), String.valueOf(curValue.intValue()), deviceInstallEntity.getDeviceName()); } else if (deviceInstallEntity.getDeviceType().equals("补水电磁阀")) { nowDataMapper.updateUseWaterState(deviceInstallEntity.getBuildingId(), deviceInstallEntity.getDeviceAddr(), String.valueOf(curValue.intValue()), deviceInstallEntity.getDeviceName()); } break; case 5: // 压力 if (param.getOtherName().contains("供水压力")) { nowDataMapper.updatePressure(deviceInstallEntity.getBuildingId(), null, curValue.setScale(1, RoundingMode.HALF_UP).toString(), null); } break; case 31: // 液位 if (param.getOtherName().contains("单箱") && deviceInstallEntity.getIsSingleBox() == 1) { // 更新单箱液位 nowDataMapper.updateBoxLevel(deviceInstallEntity.getBuildingId(), null, curValue.setScale(1, RoundingMode.HALF_UP).toString(), null, 1); } else { // 获取多箱的液位 nowDataMapper.updateBoxLevel(deviceInstallEntity.getBuildingId(), null, curValue.setScale(1, RoundingMode.HALF_UP).toString(), null, 0); } break; case 26: // 液位设置 if (param.getOtherName().contains("单箱液位") && param.getOtherName().contains("上限") && deviceInstallEntity.getIsSingleBox() == 1) { // 更新单箱液位 nowDataMapper.updateBoxLevelSet(deviceInstallEntity.getBuildingId(), null, curValue.setScale(1, RoundingMode.HALF_UP).toString(), null, 1); } if (param.getOtherName().contains("多箱液位") && param.getOtherName().contains("上限") && deviceInstallEntity.getIsSingleBox() == 0) { // 获取多箱的液位 nowDataMapper.updateBoxLevelSet(deviceInstallEntity.getBuildingId(), null, curValue.setScale(1, RoundingMode.HALF_UP).toString(), null, 0); } break; case 32: // 压力设置 nowDataMapper.updatePressureSet(deviceInstallEntity.getBuildingId(), null, curValue.setScale(1, RoundingMode.HALF_UP).toString(), null); break; case 12: // 回水温度 nowDataMapper.updateBackWaterTemp(deviceInstallEntity.getBuildingId(), null, curValue.setScale(1, RoundingMode.HALF_UP).toString(), null); break; case 3: // 故障状态 nowDataMapper.updatePressureSet(deviceInstallEntity.getBuildingId(), deviceInstallEntity.getDeviceAddr(), curValue.intValue() == 0 ? "正常" : "故障", deviceInstallEntity.getDeviceName()); break; case 41: // 回水阀控制模式:0单箱,1多箱 nowDataMapper.updateBackControlModel(deviceInstallEntity.getBuildingId(), null, String.valueOf(curValue.intValue()), null); break; } // 根据deviceInstall查询对应的deviceInstall表数据 // 根据查询出来的deviceInstall表数据,根据device_addr和device_name值进行更新 } log.debug("采集成功: registerAddr={}, value={}, otherName={}", registerAddr, curValue, param.getOtherName()); } /** * 获取或创建S7连接器 */ private S7ConnectorUtil getOrCreateConnector(GatewayManageEntity gateway) { String cacheKey = gateway.getDataCom(); // 从缓存中获取 S7ConnectorUtil connector = connectorCache.get(cacheKey); if (connector != null) { return connector; } // 创建新连接器 try { String ipAddress = gateway.getGatewayIP(); if (ipAddress == null || ipAddress.isEmpty()) { log.error("网关IP地址为空: {}", gateway.getGatewayName()); return null; } // 解析rack和slot,默认为0和1 int rack = 0; int slot = 1; if (gateway.getGatewayPort() != null && !gateway.getGatewayPort().isEmpty()) { try { String[] parts = gateway.getGatewayPort().split(","); if (parts.length >= 2) { rack = Integer.parseInt(parts[0].trim()); slot = Integer.parseInt(parts[1].trim()); } } catch (NumberFormatException e) { log.warn("解析rack/slot失败,使用默认值: {}", gateway.getGatewayPort()); } } connector = new S7ConnectorUtil(ipAddress, rack, slot); connectorCache.put(cacheKey, connector); log.info("创建S7连接器成功: IP={}, rack={}, slot={}", ipAddress, rack, slot); return connector; } catch (Exception e) { log.error("创建S7连接器失败: {}", gateway.getGatewayName(), e); return null; } } /** * 手动写入数据到PLC(供Controller调用) * * @param cpmId 采集参数ID * @param value 要写入的值 * @return 是否成功 */ public boolean writeData(Long cpmId, Object value) { try { // 查询采集参数 CollectionParamsManageEntity param = collectionParamsManageMapper.selectById(cpmId.toString()); if (param == null) { log.error("采集参数不存在: cpmId={}", cpmId); return false; } // 查询网关信息 String dataCom = getDataComByDeviceId(param.getDeviceInstallId()); if (dataCom == null) { log.error("无法获取设备对应的dataCom: deviceInstallId={}", param.getDeviceInstallId()); return false; } GatewayManageEntity gateway = getGatewayByDataCom(dataCom); if (gateway == null) { log.error("无法获取网关信息: dataCom={}", dataCom); return false; } // 获取连接器 S7ConnectorUtil connector = getOrCreateConnector(gateway); if (connector == null) { log.error("无法获取S7连接器"); return false; } // 写入数据 connector.writeData(param.getRegisterAddr(), value); // 更新数据库 String dateStr = DateUtil.dateToString(new Date(), "yyyy-MM-dd HH:mm:ss"); // BigDecimal curValue; // if (value instanceof Number) { // curValue = new BigDecimal(value.toString()); // } else { // curValue = new BigDecimal(value.toString()); // } collectionParamsManageMapper.updateCollectionParamsManageById( cpmId, value.toString(), dateStr ); log.info("手动写入成功: cpmId={}, registerAddr={}, value={}", cpmId, param.getRegisterAddr(), value); return true; } catch (Exception e) { log.error("手动写入失败: cpmId={}", cpmId, e); return false; } } /** * 根据deviceInstallId获取dataCom */ private String getDataComByDeviceId(Long deviceInstallId) { return gatewayManageMapper.getDataComByDeviceInstallId(deviceInstallId); } /** * 根据dataCom获取网关信息 */ private GatewayManageEntity getGatewayByDataCom(String dataCom) { List gateways = gatewayManageMapper.queryS7Gateways(); if (gateways != null) { for (GatewayManageEntity gw : gateways) { if (dataCom.equals(gw.getDataCom())) { return gw; } } } return null; } /** * 清理连接器缓存(可选,用于重启或维护) */ public void clearConnectorCache() { for (Map.Entry entry : connectorCache.entrySet()) { try { entry.getValue().disconnect(); } catch (Exception e) { log.error("断开S7连接异常: {}", entry.getKey(), e); } } connectorCache.clear(); log.info("S7连接器缓存已清理"); } }