diff --git a/user-service/src/main/java/com/mh/user/dto/HotWaterBackPumpControlVO.java b/user-service/src/main/java/com/mh/user/dto/HotWaterBackPumpControlVO.java index 4472602..e2ec933 100644 --- a/user-service/src/main/java/com/mh/user/dto/HotWaterBackPumpControlVO.java +++ b/user-service/src/main/java/com/mh/user/dto/HotWaterBackPumpControlVO.java @@ -122,6 +122,9 @@ public class HotWaterBackPumpControlVO { private int manualAutoSwitch; private String manualAutoSwitchId; + // 选择两台回水泵启动 + private int twoPumpStart; + private String twoPumpStartId; @Override public String toString() { diff --git a/user-service/src/main/java/com/mh/user/dto/HotWaterHotPumpControlVO.java b/user-service/src/main/java/com/mh/user/dto/HotWaterHotPumpControlVO.java index 08dca63..03a3ec2 100644 --- a/user-service/src/main/java/com/mh/user/dto/HotWaterHotPumpControlVO.java +++ b/user-service/src/main/java/com/mh/user/dto/HotWaterHotPumpControlVO.java @@ -172,6 +172,10 @@ public class HotWaterHotPumpControlVO { // 去掉pump后的类名 private int startStopControl; private String startStopControlId; + // 热泵_手自动切换 -> 去掉pump前缀 + private int manualAutoSwitch; + private String manualAutoSwitchId; + // 热泵_12启停控制 private int startStopControlOne; private String startStopControlOneId; @@ -179,6 +183,14 @@ public class HotWaterHotPumpControlVO { // 去掉pump后的类名 // 热泵_34启停控制 private int startStopControlTwo; private String startStopControlTwoId; + // 热泵_12手自动切换 + private int manualAutoSwitchOne; + private String manualAutoSwitchOneId; + + // 热泵_34手自动切换 + private int manualAutoSwitchTwo; + private String manualAutoSwitchTwoId; + // 热泵_设定温度 -> 去掉pump前缀 @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "0") diff --git a/user-service/src/main/java/com/mh/user/mapper/CollectionParamsManageMapper.java b/user-service/src/main/java/com/mh/user/mapper/CollectionParamsManageMapper.java index 1adffa0..ba561db 100644 --- a/user-service/src/main/java/com/mh/user/mapper/CollectionParamsManageMapper.java +++ b/user-service/src/main/java/com/mh/user/mapper/CollectionParamsManageMapper.java @@ -3,6 +3,7 @@ package com.mh.user.mapper; import com.mh.user.dto.HotWaterControlListVO; import com.mh.user.entity.CollectionParamsManageEntity; import com.mh.user.entity.DeviceInstallEntity; +import com.mh.user.model.SanShiFengDatas; import org.apache.ibatis.annotations.*; import tk.mybatis.mapper.common.BaseMapper; @@ -82,6 +83,27 @@ public interface CollectionParamsManageMapper extends BaseMapper" + + "WITH BatchData AS (" + + "" + + "SELECT #{item.name} AS other_name, #{item.value} AS cur_value" + + "" + + ") " + + "MERGE collection_params_manage AS target " + + "USING BatchData AS source " + + "ON (target.other_name = source.other_name AND target.building_id = #{buildingId}) " + + "WHEN MATCHED THEN " + + " UPDATE SET " + + " cur_value = source.cur_value, " + + " cur_time = #{time}, " + + " quality = #{quality};" + + "") + void updateBatchCPMByOtherName(@Param("batch") List batch, + @Param("time") String time, + @Param("quality") String quality, + @Param("buildingId") String buildingId); + + @Select("select top 1 * from collection_params_manage where other_name = #{name} and building_id = #{buildingId} ") @Results({ @Result(column = "id", property = "id"), diff --git a/user-service/src/main/java/com/mh/user/service/CollectionParamsManageService.java b/user-service/src/main/java/com/mh/user/service/CollectionParamsManageService.java index cbc40e9..add5e05 100644 --- a/user-service/src/main/java/com/mh/user/service/CollectionParamsManageService.java +++ b/user-service/src/main/java/com/mh/user/service/CollectionParamsManageService.java @@ -4,6 +4,7 @@ import com.mh.user.dto.HotWaterControlDTO; import com.mh.user.dto.HotWaterNowDataDTO; import com.mh.user.entity.CollectionParamsManageEntity; import com.mh.user.entity.DeviceInstallEntity; +import com.mh.user.model.SanShiFengDatas; import java.math.BigDecimal; import java.util.List; @@ -36,4 +37,6 @@ public interface CollectionParamsManageService { List monitorList(String buildingId); List operateList(String floorId); + + void getBatchUpdateCollectionParams(List batch, String sn, String plcName, String projectName, String time, String buildingId); } diff --git a/user-service/src/main/java/com/mh/user/service/impl/CollectionParamsManageServiceImpl.java b/user-service/src/main/java/com/mh/user/service/impl/CollectionParamsManageServiceImpl.java index 7b756f2..049bfc1 100644 --- a/user-service/src/main/java/com/mh/user/service/impl/CollectionParamsManageServiceImpl.java +++ b/user-service/src/main/java/com/mh/user/service/impl/CollectionParamsManageServiceImpl.java @@ -6,7 +6,9 @@ import com.mh.user.dto.*; import com.mh.user.entity.CollectionParamsManageEntity; import com.mh.user.entity.DeviceInstallEntity; import com.mh.user.mapper.CollectionParamsManageMapper; +import com.mh.user.model.SanShiFengDatas; import com.mh.user.service.CollectionParamsManageService; +import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import java.math.BigDecimal; @@ -21,6 +23,7 @@ import java.util.stream.Collectors; * @description 采集参数设备实现类 * @date 2025-12-10 11:30:54 */ +@Slf4j @Service public class CollectionParamsManageServiceImpl implements CollectionParamsManageService { @@ -248,10 +251,10 @@ public class CollectionParamsManageServiceImpl implements CollectionParamsManage break; case "1": // 启停控制 - if (item.getOtherName().equals("12")) { + if (item.getOtherName().contains("12") || item.getOtherName().contains("13")) { hotPumpVo.setStartStopControlOne(item.getCurValue().intValue()); hotPumpVo.setStartStopControlOneId(item.getCpmId()); - } else if (item.getOtherName().equals("34")) { + } else if (item.getOtherName().contains("34") || item.getOtherName().contains("24")) { hotPumpVo.setStartStopControlTwo(item.getCurValue().intValue()); hotPumpVo.setStartStopControlTwoId(item.getCpmId()); } else { @@ -298,6 +301,19 @@ public class CollectionParamsManageServiceImpl implements CollectionParamsManage // 温度设定 hotPumpVo.setSetTemp(item.getCurValue()); hotPumpVo.setSetTempId(item.getCpmId()); + break; + case "22": + // 手自动切换 + if (item.getOtherName().contains("12") || item.getOtherName().contains("13")) { + hotPumpVo.setManualAutoSwitchOne(item.getCurValue().intValue()); + hotPumpVo.setManualAutoSwitchOneId(item.getCpmId()); + } else if (item.getOtherName().contains("34") || item.getOtherName().contains("24")) { + hotPumpVo.setManualAutoSwitchTwo(item.getCurValue().intValue()); + hotPumpVo.setManualAutoSwitchTwoId(item.getCpmId()); + } else { + hotPumpVo.setManualAutoSwitch(item.getCurValue().intValue()); + hotPumpVo.setManualAutoSwitchId(item.getCpmId()); + } default: break; } @@ -565,6 +581,11 @@ public class CollectionParamsManageServiceImpl implements CollectionParamsManage backPumpVo.setManualAutoSwitch(item.getCurValue().intValue()); backPumpVo.setManualAutoSwitchId(item.getCpmId()); break; + case "25": + // 两台回水泵启动 + backPumpVo.setTwoPumpStart(item.getCurValue().intValue()); + backPumpVo.setTwoPumpStartId(item.getCpmId()); + break; default: break; } @@ -755,6 +776,15 @@ public class CollectionParamsManageServiceImpl implements CollectionParamsManage } } + @Override + public void getBatchUpdateCollectionParams(List batch, String sn, String plcName, String projectName, String time, String buildingId) { + try { + collectionParamsManageMapper.updateBatchCPMByOtherName(batch, time, "0", buildingId); + } catch (Exception e) { + log.error("批量更新参数失败", e); + } + } + @Override public List selectCPMList(String buildingId, String deviceInstallId, String otherName, Integer pageNum, Integer pageSize) { return collectionParamsManageMapper.selectCPMList(buildingId, deviceInstallId, otherName, pageNum, pageSize); diff --git a/user-service/src/main/java/com/mh/user/service/mqtt/service/impl/EventsServiceImpl.java b/user-service/src/main/java/com/mh/user/service/mqtt/service/impl/EventsServiceImpl.java index 6427f33..8be4548 100644 --- a/user-service/src/main/java/com/mh/user/service/mqtt/service/impl/EventsServiceImpl.java +++ b/user-service/src/main/java/com/mh/user/service/mqtt/service/impl/EventsServiceImpl.java @@ -3,6 +3,7 @@ package com.mh.user.service.mqtt.service.impl; import com.alibaba.fastjson2.JSONObject; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; +import com.github.benmanes.caffeine.cache.Cache; import com.mh.common.utils.StringUtils; import com.mh.user.constants.ChannelName; import com.mh.user.constants.Constant; @@ -18,10 +19,14 @@ import com.mh.user.service.GatewayManageService; import com.mh.user.service.mqtt.service.IEventsService; import com.mh.user.strategy.DeviceStrategy; import com.mh.user.strategy.DeviceStrategyFactory; +import com.mh.user.utils.CacheUtil; +import com.mh.user.utils.DateUtil; +import com.mh.user.utils.SpringBeanUtil; import io.netty.util.CharsetUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.context.ApplicationContext; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.messaging.MessageHeaders; import org.springframework.stereotype.Service; @@ -56,6 +61,9 @@ public class EventsServiceImpl implements IEventsService { @Autowired private DeviceInstallService deviceInstallService; + @Autowired + private Cache caffeineCache; + @ServiceActivator(inputChannel = ChannelName.EVENTS_UPLOAD_INBOUND) @Override public void handleInboundUpload(byte[] receiver, MessageHeaders headers) { @@ -107,35 +115,55 @@ public class EventsServiceImpl implements IEventsService { return; } // 修复类型转换问题 - List rawDataList = datas.getDatas(); + List rawDataList = datas.getDatas(); if (rawDataList == null || rawDataList.isEmpty()) { log.warn("数据列表为空,SN: {}", sn); return; } - // 先批量更新collectionParam - rawDataList.parallelStream().forEach(rawData -> { - try { - processDataUpdateCpmItem(rawData, sn, plcName, projectName, time, buildingId); - } catch (Exception e) { - log.error("处理单个数据项失败: {}", rawData, e); - } - }); - // 并行处理数据列表,主线程等待处理完 - rawDataList.parallelStream().forEach(rawData -> { - try { - processDataItem(rawData, sn, plcName, projectName, time, buildingId); - } catch (Exception e) { - log.error("处理单个数据项失败: {}", rawData, e); + // rawDataList进行批量更新,100条数据进行批量处理 + int batchSize = 100; + for (int i = 0; i < rawDataList.size(); i += batchSize) { + int endIndex = Math.min(i + batchSize, rawDataList.size()); + List batch = rawDataList.subList(i, endIndex); + collectionParamManageService.getBatchUpdateCollectionParams(batch, sn, plcName, projectName, time, buildingId); + } + +// // 先批量更新collectionParam +// rawDataList.parallelStream().forEach(rawData -> { +// try { +// processDataUpdateCpmItem(rawData, sn, plcName, projectName, time, buildingId); +// } catch (Exception e) { +// log.error("处理单个数据项失败: {}", rawData, e); +// } +// }); + // 通过判断当前time跟上一个time相差30s才存储进入队列 + if (caffeineCache.getIfPresent(sn+"_time") != null) { + String lastTime = (String)caffeineCache.getIfPresent(sn+"_time"); + // yyyy-MM-dd HH:mm:ss格式转为秒的时间戳 + long lastTimeStamp = DateUtil.getTimeStamp(lastTime); + long currentTimeStamp = DateUtil.getTimeStamp(time); + // 判断时间间隔 + if (!StringUtils.isBlank(lastTime) && Math.abs(currentTimeStamp -lastTimeStamp) >= 60000) { + // 并行处理数据列表,主线程不阻塞 + rawDataList.parallelStream().forEach(rawData -> { + try { + processDataItem(rawData, sn, plcName, projectName, time, buildingId); + } catch (Exception e) { + log.error("处理单个数据项失败: {}", rawData, e); + } + }); } - }); + } else { + caffeineCache.put(sn+"_time", time); + } } catch (IOException e) { log.error("处理数据时发生错误: ", e); } } - private void processDataUpdateCpmItem(Object rawData, String sn, String plcName, String projectName, String time, String buildingId) { + private void processDataUpdateCpmItem(SanShiFengDatas data, String sn, String plcName, String projectName, String time, String buildingId) { // 安全地转换对象 - SanShiFengDatas data = convertDataItem(rawData); +// SanShiFengDatas data = convertDataItem(rawData); if (data == null) { log.warn("数据转换失败,跳过处理"); return; @@ -145,9 +173,9 @@ public class EventsServiceImpl implements IEventsService { // 获取点位参数名称 String name = data.getName(); // 获取点位值 - BigDecimal value = new BigDecimal(0); + BigDecimal value; try { - value = new BigDecimal(String.valueOf(data.getValue())); + value = data.getValue(); } catch (Exception e) { value = BigDecimal.ZERO; } @@ -177,7 +205,20 @@ public class EventsServiceImpl implements IEventsService { //collectionParamManageService.updateCPMByOtherName(name, value, time, buildingId); // 查询device_install表,走之前的逻辑 CollectionParamsManageEntity collectionParamsManageEntity = collectionParamManageService.selectDeviceInstallByOtherName(name, buildingId); - if (null != collectionParamsManageEntity && collectionParamsManageEntity.getDeviceInstallId() != null) { + if (null != collectionParamsManageEntity + && collectionParamsManageEntity.getDeviceInstallId() != null + && collectionParamsManageEntity.getParamTypeId() != 0 + && collectionParamsManageEntity.getParamTypeId() != 4 + && collectionParamsManageEntity.getParamTypeId() != 15 + && collectionParamsManageEntity.getParamTypeId() != 16 + && collectionParamsManageEntity.getParamTypeId() != 17 + && collectionParamsManageEntity.getParamTypeId() != 18 + && collectionParamsManageEntity.getParamTypeId() != 19 + && collectionParamsManageEntity.getParamTypeId() != 21 + && collectionParamsManageEntity.getParamTypeId() != 22 + && collectionParamsManageEntity.getParamTypeId() != 23 + && collectionParamsManageEntity.getParamTypeId() != 24 + ) { DeviceInstallEntity deviceInstallEntity = deviceInstallService.selectDeviceById(collectionParamsManageEntity.getDeviceInstallId()); if (deviceInstallEntity != null) { // 开始走策略判断 diff --git a/user-service/src/main/java/com/mh/user/strategy/HeatPumpStrategy.java b/user-service/src/main/java/com/mh/user/strategy/HeatPumpStrategy.java index 9942242..31a97e7 100644 --- a/user-service/src/main/java/com/mh/user/strategy/HeatPumpStrategy.java +++ b/user-service/src/main/java/com/mh/user/strategy/HeatPumpStrategy.java @@ -735,6 +735,8 @@ public class HeatPumpStrategy implements DeviceStrategy { } else { sValue = "有故障"; } + } else { + return null; } if (!StringUtils.isBlank(sValue)) { nowDataService.saveNowHistoryData(deviceInstallEntity.getDeviceAddr(), "热泵", sValue, dataType.toString(), deviceInstallEntity.getBuildingId()); diff --git a/user-service/src/main/java/com/mh/user/strategy/PressureTransStrategy.java b/user-service/src/main/java/com/mh/user/strategy/PressureTransStrategy.java index 8ab87bb..0a78ca9 100644 --- a/user-service/src/main/java/com/mh/user/strategy/PressureTransStrategy.java +++ b/user-service/src/main/java/com/mh/user/strategy/PressureTransStrategy.java @@ -11,6 +11,7 @@ import com.mh.user.utils.SpringBeanUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.context.ApplicationContext; +import java.math.BigDecimal; import java.text.DecimalFormat; /** @@ -134,7 +135,7 @@ public class PressureTransStrategy implements DeviceStrategy { public String analysisMQTTReceiveData(String dateStr, String registerAddr, String dataStr, String operateType, DeviceInstallEntity deviceInstallEntity, CollectionParamsManageEntity collectionParamsManageEntity) { String result = "fail"; - if (Integer.parseInt(dataStr) < 0) { + if (new BigDecimal(dataStr).compareTo(BigDecimal.ZERO) < 0) { return result; } if (operateType.equalsIgnoreCase(Constant.READ)) {// 读 diff --git a/user-service/src/main/java/com/mh/user/strategy/TempControlStrategy.java b/user-service/src/main/java/com/mh/user/strategy/TempControlStrategy.java index 23517b7..8fa8257 100644 --- a/user-service/src/main/java/com/mh/user/strategy/TempControlStrategy.java +++ b/user-service/src/main/java/com/mh/user/strategy/TempControlStrategy.java @@ -13,6 +13,8 @@ import com.mh.user.utils.SpringBeanUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.context.ApplicationContext; +import java.math.BigDecimal; + /** * @author LJF * @version 1.0 @@ -120,7 +122,7 @@ public class TempControlStrategy implements DeviceStrategy { public String analysisMQTTReceiveData(String dateStr, String registerAddr, String dataStr, String operateType, DeviceInstallEntity deviceInstallEntity, CollectionParamsManageEntity collectionParamsManageEntity) { String result = Constant.FAIL; - if (Integer.parseInt(dataStr) < 0) { + if (new BigDecimal(dataStr).compareTo(BigDecimal.ZERO) < 0) { log.info("温控报文检验失败: " + dataStr); return result; } diff --git a/user-service/src/main/java/com/mh/user/strategy/WtMeterStrategy.java b/user-service/src/main/java/com/mh/user/strategy/WtMeterStrategy.java index 018a779..e264bbd 100644 --- a/user-service/src/main/java/com/mh/user/strategy/WtMeterStrategy.java +++ b/user-service/src/main/java/com/mh/user/strategy/WtMeterStrategy.java @@ -13,6 +13,7 @@ import com.mh.user.utils.SpringBeanUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.context.ApplicationContext; +import java.math.BigDecimal; import java.util.Date; /** @@ -140,7 +141,7 @@ public class WtMeterStrategy implements DeviceStrategy { public String analysisMQTTReceiveData(String dateStr, String registerAddr, String dataStr, String operateType, DeviceInstallEntity deviceInstallEntity, CollectionParamsManageEntity collectionParamsManageEntity) { String data = Constant.FAIL; - if (Integer.parseInt(dataStr) < 0) { + if (new BigDecimal(dataStr).compareTo(BigDecimal.ZERO) < 0) { return data; } log.info("水表表号: " + deviceInstallEntity.getDeviceAddr() + ",水表读数:" + dataStr); diff --git a/user-service/src/main/java/com/mh/user/utils/DateUtil.java b/user-service/src/main/java/com/mh/user/utils/DateUtil.java index 41ef900..c530f2f 100644 --- a/user-service/src/main/java/com/mh/user/utils/DateUtil.java +++ b/user-service/src/main/java/com/mh/user/utils/DateUtil.java @@ -308,4 +308,20 @@ public class DateUtil { } return null; } + + /** + * 获取时间戳 + * + * @param lastTime + * @return + */ + public static long getTimeStamp(String lastTime) { + SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + try { + return format.parse(lastTime).getTime(); + } catch (ParseException e) { + e.printStackTrace(); + } + return 0; + } }