Browse Source

1、数据更新处理优化;

dev
25604 3 days ago
parent
commit
8c4fd75c7d
  1. 3
      user-service/src/main/java/com/mh/user/dto/HotWaterBackPumpControlVO.java
  2. 12
      user-service/src/main/java/com/mh/user/dto/HotWaterHotPumpControlVO.java
  3. 22
      user-service/src/main/java/com/mh/user/mapper/CollectionParamsManageMapper.java
  4. 3
      user-service/src/main/java/com/mh/user/service/CollectionParamsManageService.java
  5. 34
      user-service/src/main/java/com/mh/user/service/impl/CollectionParamsManageServiceImpl.java
  6. 69
      user-service/src/main/java/com/mh/user/service/mqtt/service/impl/EventsServiceImpl.java
  7. 2
      user-service/src/main/java/com/mh/user/strategy/HeatPumpStrategy.java
  8. 3
      user-service/src/main/java/com/mh/user/strategy/PressureTransStrategy.java
  9. 4
      user-service/src/main/java/com/mh/user/strategy/TempControlStrategy.java
  10. 3
      user-service/src/main/java/com/mh/user/strategy/WtMeterStrategy.java
  11. 16
      user-service/src/main/java/com/mh/user/utils/DateUtil.java

3
user-service/src/main/java/com/mh/user/dto/HotWaterBackPumpControlVO.java

@ -122,6 +122,9 @@ public class HotWaterBackPumpControlVO {
private int manualAutoSwitch;
private String manualAutoSwitchId;
// 选择两台回水泵启动
private int twoPumpStart;
private String twoPumpStartId;
@Override
public String toString() {

12
user-service/src/main/java/com/mh/user/dto/HotWaterHotPumpControlVO.java

@ -172,6 +172,10 @@ public class HotWaterHotPumpControlVO { // 去掉pump后的类名
private int startStopControl;
private String startStopControlId;
// 热泵_手自动切换 -> 去掉pump前缀
private int manualAutoSwitch;
private String manualAutoSwitchId;
// 热泵_12启停控制
private int startStopControlOne;
private String startStopControlOneId;
@ -179,6 +183,14 @@ public class HotWaterHotPumpControlVO { // 去掉pump后的类名
// 热泵_34启停控制
private int startStopControlTwo;
private String startStopControlTwoId;
// 热泵_12手自动切换
private int manualAutoSwitchOne;
private String manualAutoSwitchOneId;
// 热泵_34手自动切换
private int manualAutoSwitchTwo;
private String manualAutoSwitchTwoId;
// 热泵_设定温度 -> 去掉pump前缀
@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "0")

22
user-service/src/main/java/com/mh/user/mapper/CollectionParamsManageMapper.java

@ -3,6 +3,7 @@ package com.mh.user.mapper;
import com.mh.user.dto.HotWaterControlListVO;
import com.mh.user.entity.CollectionParamsManageEntity;
import com.mh.user.entity.DeviceInstallEntity;
import com.mh.user.model.SanShiFengDatas;
import org.apache.ibatis.annotations.*;
import tk.mybatis.mapper.common.BaseMapper;
@ -82,6 +83,27 @@ public interface CollectionParamsManageMapper extends BaseMapper<CollectionParam
@Update("update collection_params_manage set cur_value = #{value}, cur_time = #{time}, quality = #{quality} where other_name = #{name} and building_id = #{buildingId}")
void updateCPMByOtherName(String name, BigDecimal value, String time, String quality, String buildingId);
@Update("<script>" +
"WITH BatchData AS (" +
"<foreach collection='batch' item='item' separator='UNION ALL'>" +
"SELECT #{item.name} AS other_name, #{item.value} AS cur_value" +
"</foreach>" +
") " +
"MERGE collection_params_manage AS target " +
"USING BatchData AS source " +
"ON (target.other_name = source.other_name AND target.building_id = #{buildingId}) " +
"WHEN MATCHED THEN " +
" UPDATE SET " +
" cur_value = source.cur_value, " +
" cur_time = #{time}, " +
" quality = #{quality};" +
"</script>")
void updateBatchCPMByOtherName(@Param("batch") List<SanShiFengDatas> batch,
@Param("time") String time,
@Param("quality") String quality,
@Param("buildingId") String buildingId);
@Select("select top 1 * from collection_params_manage where other_name = #{name} and building_id = #{buildingId} ")
@Results({
@Result(column = "id", property = "id"),

3
user-service/src/main/java/com/mh/user/service/CollectionParamsManageService.java

@ -4,6 +4,7 @@ import com.mh.user.dto.HotWaterControlDTO;
import com.mh.user.dto.HotWaterNowDataDTO;
import com.mh.user.entity.CollectionParamsManageEntity;
import com.mh.user.entity.DeviceInstallEntity;
import com.mh.user.model.SanShiFengDatas;
import java.math.BigDecimal;
import java.util.List;
@ -36,4 +37,6 @@ public interface CollectionParamsManageService {
List<HotWaterNowDataDTO> monitorList(String buildingId);
List<HotWaterControlDTO> operateList(String floorId);
void getBatchUpdateCollectionParams(List<SanShiFengDatas> batch, String sn, String plcName, String projectName, String time, String buildingId);
}

34
user-service/src/main/java/com/mh/user/service/impl/CollectionParamsManageServiceImpl.java

@ -6,7 +6,9 @@ import com.mh.user.dto.*;
import com.mh.user.entity.CollectionParamsManageEntity;
import com.mh.user.entity.DeviceInstallEntity;
import com.mh.user.mapper.CollectionParamsManageMapper;
import com.mh.user.model.SanShiFengDatas;
import com.mh.user.service.CollectionParamsManageService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.math.BigDecimal;
@ -21,6 +23,7 @@ import java.util.stream.Collectors;
* @description 采集参数设备实现类
* @date 2025-12-10 11:30:54
*/
@Slf4j
@Service
public class CollectionParamsManageServiceImpl implements CollectionParamsManageService {
@ -248,10 +251,10 @@ public class CollectionParamsManageServiceImpl implements CollectionParamsManage
break;
case "1":
// 启停控制
if (item.getOtherName().equals("12")) {
if (item.getOtherName().contains("12") || item.getOtherName().contains("13")) {
hotPumpVo.setStartStopControlOne(item.getCurValue().intValue());
hotPumpVo.setStartStopControlOneId(item.getCpmId());
} else if (item.getOtherName().equals("34")) {
} else if (item.getOtherName().contains("34") || item.getOtherName().contains("24")) {
hotPumpVo.setStartStopControlTwo(item.getCurValue().intValue());
hotPumpVo.setStartStopControlTwoId(item.getCpmId());
} else {
@ -298,6 +301,19 @@ public class CollectionParamsManageServiceImpl implements CollectionParamsManage
// 温度设定
hotPumpVo.setSetTemp(item.getCurValue());
hotPumpVo.setSetTempId(item.getCpmId());
break;
case "22":
// 手自动切换
if (item.getOtherName().contains("12") || item.getOtherName().contains("13")) {
hotPumpVo.setManualAutoSwitchOne(item.getCurValue().intValue());
hotPumpVo.setManualAutoSwitchOneId(item.getCpmId());
} else if (item.getOtherName().contains("34") || item.getOtherName().contains("24")) {
hotPumpVo.setManualAutoSwitchTwo(item.getCurValue().intValue());
hotPumpVo.setManualAutoSwitchTwoId(item.getCpmId());
} else {
hotPumpVo.setManualAutoSwitch(item.getCurValue().intValue());
hotPumpVo.setManualAutoSwitchId(item.getCpmId());
}
default:
break;
}
@ -565,6 +581,11 @@ public class CollectionParamsManageServiceImpl implements CollectionParamsManage
backPumpVo.setManualAutoSwitch(item.getCurValue().intValue());
backPumpVo.setManualAutoSwitchId(item.getCpmId());
break;
case "25":
// 两台回水泵启动
backPumpVo.setTwoPumpStart(item.getCurValue().intValue());
backPumpVo.setTwoPumpStartId(item.getCpmId());
break;
default:
break;
}
@ -755,6 +776,15 @@ public class CollectionParamsManageServiceImpl implements CollectionParamsManage
}
}
@Override
public void getBatchUpdateCollectionParams(List<SanShiFengDatas> batch, String sn, String plcName, String projectName, String time, String buildingId) {
try {
collectionParamsManageMapper.updateBatchCPMByOtherName(batch, time, "0", buildingId);
} catch (Exception e) {
log.error("批量更新参数失败", e);
}
}
@Override
public List<CollectionParamsManageEntity> selectCPMList(String buildingId, String deviceInstallId, String otherName, Integer pageNum, Integer pageSize) {
return collectionParamsManageMapper.selectCPMList(buildingId, deviceInstallId, otherName, pageNum, pageSize);

69
user-service/src/main/java/com/mh/user/service/mqtt/service/impl/EventsServiceImpl.java

@ -3,6 +3,7 @@ package com.mh.user.service.mqtt.service.impl;
import com.alibaba.fastjson2.JSONObject;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.benmanes.caffeine.cache.Cache;
import com.mh.common.utils.StringUtils;
import com.mh.user.constants.ChannelName;
import com.mh.user.constants.Constant;
@ -18,10 +19,14 @@ import com.mh.user.service.GatewayManageService;
import com.mh.user.service.mqtt.service.IEventsService;
import com.mh.user.strategy.DeviceStrategy;
import com.mh.user.strategy.DeviceStrategyFactory;
import com.mh.user.utils.CacheUtil;
import com.mh.user.utils.DateUtil;
import com.mh.user.utils.SpringBeanUtil;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.ApplicationContext;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.MessageHeaders;
import org.springframework.stereotype.Service;
@ -56,6 +61,9 @@ public class EventsServiceImpl implements IEventsService {
@Autowired
private DeviceInstallService deviceInstallService;
@Autowired
private Cache caffeineCache;
@ServiceActivator(inputChannel = ChannelName.EVENTS_UPLOAD_INBOUND)
@Override
public void handleInboundUpload(byte[] receiver, MessageHeaders headers) {
@ -107,20 +115,36 @@ public class EventsServiceImpl implements IEventsService {
return;
}
// 修复类型转换问题
List<?> rawDataList = datas.getDatas();
List<SanShiFengDatas> rawDataList = datas.getDatas();
if (rawDataList == null || rawDataList.isEmpty()) {
log.warn("数据列表为空,SN: {}", sn);
return;
}
// 先批量更新collectionParam
rawDataList.parallelStream().forEach(rawData -> {
try {
processDataUpdateCpmItem(rawData, sn, plcName, projectName, time, buildingId);
} catch (Exception e) {
log.error("处理单个数据项失败: {}", rawData, e);
// rawDataList进行批量更新,100条数据进行批量处理
int batchSize = 100;
for (int i = 0; i < rawDataList.size(); i += batchSize) {
int endIndex = Math.min(i + batchSize, rawDataList.size());
List<SanShiFengDatas> batch = rawDataList.subList(i, endIndex);
collectionParamManageService.getBatchUpdateCollectionParams(batch, sn, plcName, projectName, time, buildingId);
}
});
// 并行处理数据列表,主线程等待处理完
// // 先批量更新collectionParam
// rawDataList.parallelStream().forEach(rawData -> {
// try {
// processDataUpdateCpmItem(rawData, sn, plcName, projectName, time, buildingId);
// } catch (Exception e) {
// log.error("处理单个数据项失败: {}", rawData, e);
// }
// });
// 通过判断当前time跟上一个time相差30s才存储进入队列
if (caffeineCache.getIfPresent(sn+"_time") != null) {
String lastTime = (String)caffeineCache.getIfPresent(sn+"_time");
// yyyy-MM-dd HH:mm:ss格式转为秒的时间戳
long lastTimeStamp = DateUtil.getTimeStamp(lastTime);
long currentTimeStamp = DateUtil.getTimeStamp(time);
// 判断时间间隔
if (!StringUtils.isBlank(lastTime) && Math.abs(currentTimeStamp -lastTimeStamp) >= 60000) {
// 并行处理数据列表,主线程不阻塞
rawDataList.parallelStream().forEach(rawData -> {
try {
processDataItem(rawData, sn, plcName, projectName, time, buildingId);
@ -128,14 +152,18 @@ public class EventsServiceImpl implements IEventsService {
log.error("处理单个数据项失败: {}", rawData, e);
}
});
}
} else {
caffeineCache.put(sn+"_time", time);
}
} catch (IOException e) {
log.error("处理数据时发生错误: ", e);
}
}
private void processDataUpdateCpmItem(Object rawData, String sn, String plcName, String projectName, String time, String buildingId) {
private void processDataUpdateCpmItem(SanShiFengDatas data, String sn, String plcName, String projectName, String time, String buildingId) {
// 安全地转换对象
SanShiFengDatas data = convertDataItem(rawData);
// SanShiFengDatas data = convertDataItem(rawData);
if (data == null) {
log.warn("数据转换失败,跳过处理");
return;
@ -145,9 +173,9 @@ public class EventsServiceImpl implements IEventsService {
// 获取点位参数名称
String name = data.getName();
// 获取点位值
BigDecimal value = new BigDecimal(0);
BigDecimal value;
try {
value = new BigDecimal(String.valueOf(data.getValue()));
value = data.getValue();
} catch (Exception e) {
value = BigDecimal.ZERO;
}
@ -177,7 +205,20 @@ public class EventsServiceImpl implements IEventsService {
//collectionParamManageService.updateCPMByOtherName(name, value, time, buildingId);
// 查询device_install表,走之前的逻辑
CollectionParamsManageEntity collectionParamsManageEntity = collectionParamManageService.selectDeviceInstallByOtherName(name, buildingId);
if (null != collectionParamsManageEntity && collectionParamsManageEntity.getDeviceInstallId() != null) {
if (null != collectionParamsManageEntity
&& collectionParamsManageEntity.getDeviceInstallId() != null
&& collectionParamsManageEntity.getParamTypeId() != 0
&& collectionParamsManageEntity.getParamTypeId() != 4
&& collectionParamsManageEntity.getParamTypeId() != 15
&& collectionParamsManageEntity.getParamTypeId() != 16
&& collectionParamsManageEntity.getParamTypeId() != 17
&& collectionParamsManageEntity.getParamTypeId() != 18
&& collectionParamsManageEntity.getParamTypeId() != 19
&& collectionParamsManageEntity.getParamTypeId() != 21
&& collectionParamsManageEntity.getParamTypeId() != 22
&& collectionParamsManageEntity.getParamTypeId() != 23
&& collectionParamsManageEntity.getParamTypeId() != 24
) {
DeviceInstallEntity deviceInstallEntity = deviceInstallService.selectDeviceById(collectionParamsManageEntity.getDeviceInstallId());
if (deviceInstallEntity != null) {
// 开始走策略判断

2
user-service/src/main/java/com/mh/user/strategy/HeatPumpStrategy.java

@ -735,6 +735,8 @@ public class HeatPumpStrategy implements DeviceStrategy {
} else {
sValue = "有故障";
}
} else {
return null;
}
if (!StringUtils.isBlank(sValue)) {
nowDataService.saveNowHistoryData(deviceInstallEntity.getDeviceAddr(), "热泵", sValue, dataType.toString(), deviceInstallEntity.getBuildingId());

3
user-service/src/main/java/com/mh/user/strategy/PressureTransStrategy.java

@ -11,6 +11,7 @@ import com.mh.user.utils.SpringBeanUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationContext;
import java.math.BigDecimal;
import java.text.DecimalFormat;
/**
@ -134,7 +135,7 @@ public class PressureTransStrategy implements DeviceStrategy {
public String analysisMQTTReceiveData(String dateStr, String registerAddr, String dataStr, String operateType, DeviceInstallEntity deviceInstallEntity,
CollectionParamsManageEntity collectionParamsManageEntity) {
String result = "fail";
if (Integer.parseInt(dataStr) < 0) {
if (new BigDecimal(dataStr).compareTo(BigDecimal.ZERO) < 0) {
return result;
}
if (operateType.equalsIgnoreCase(Constant.READ)) {// 读

4
user-service/src/main/java/com/mh/user/strategy/TempControlStrategy.java

@ -13,6 +13,8 @@ import com.mh.user.utils.SpringBeanUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationContext;
import java.math.BigDecimal;
/**
* @author LJF
* @version 1.0
@ -120,7 +122,7 @@ public class TempControlStrategy implements DeviceStrategy {
public String analysisMQTTReceiveData(String dateStr, String registerAddr, String dataStr, String operateType, DeviceInstallEntity deviceInstallEntity,
CollectionParamsManageEntity collectionParamsManageEntity) {
String result = Constant.FAIL;
if (Integer.parseInt(dataStr) < 0) {
if (new BigDecimal(dataStr).compareTo(BigDecimal.ZERO) < 0) {
log.info("温控报文检验失败: " + dataStr);
return result;
}

3
user-service/src/main/java/com/mh/user/strategy/WtMeterStrategy.java

@ -13,6 +13,7 @@ import com.mh.user.utils.SpringBeanUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationContext;
import java.math.BigDecimal;
import java.util.Date;
/**
@ -140,7 +141,7 @@ public class WtMeterStrategy implements DeviceStrategy {
public String analysisMQTTReceiveData(String dateStr, String registerAddr, String dataStr, String operateType, DeviceInstallEntity deviceInstallEntity,
CollectionParamsManageEntity collectionParamsManageEntity) {
String data = Constant.FAIL;
if (Integer.parseInt(dataStr) < 0) {
if (new BigDecimal(dataStr).compareTo(BigDecimal.ZERO) < 0) {
return data;
}
log.info("水表表号: " + deviceInstallEntity.getDeviceAddr() + ",水表读数:" + dataStr);

16
user-service/src/main/java/com/mh/user/utils/DateUtil.java

@ -308,4 +308,20 @@ public class DateUtil {
}
return null;
}
/**
* 获取时间戳
*
* @param lastTime
* @return
*/
public static long getTimeStamp(String lastTime) {
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
try {
return format.parse(lastTime).getTime();
} catch (ParseException e) {
e.printStackTrace();
}
return 0;
}
}

Loading…
Cancel
Save