diff --git a/user-service/src/main/java/com/mh/user/controller/DeviceOperateController.java b/user-service/src/main/java/com/mh/user/controller/DeviceOperateController.java index cabba57..b0ffc7b 100644 --- a/user-service/src/main/java/com/mh/user/controller/DeviceOperateController.java +++ b/user-service/src/main/java/com/mh/user/controller/DeviceOperateController.java @@ -76,7 +76,7 @@ public class DeviceOperateController { // 获取mqtt操作队列(后期通过mqtt队列配置发送主题) String sendTopic = name + "/" + controlTopic + "/" + sn; log.info("发送主题:{},消息:{}", sendTopic, sendOrder); - iMqttGatewayService.publish(sendTopic, sendOrder, 1); + iMqttGatewayService.publish(sendTopic, sendOrder, 0); // 判断当前cpmId是否是 11:固定是 启用写入时间戳 if (serialPortModel.getCpmId().equals("11")) { // 是的话,重新写入,启用时间写入值变成 0 @@ -89,7 +89,7 @@ public class DeviceOperateController { // 获取mqtt操作队列(后期通过mqtt队列配置发送主题) sendTopic = name + "/" + controlTopic + "/" + sn; log.info("发送主题:{},消息:{}", sendTopic, sendOrder); - iMqttGatewayService.publish(sendTopic, sendOrder, 1); + iMqttGatewayService.publish(sendTopic, sendOrder, 0); } } } else { diff --git a/user-service/src/main/java/com/mh/user/dto/HotWaterBackPumpControlVO.java b/user-service/src/main/java/com/mh/user/dto/HotWaterBackPumpControlVO.java index e2ec933..e77590e 100644 --- a/user-service/src/main/java/com/mh/user/dto/HotWaterBackPumpControlVO.java +++ b/user-service/src/main/java/com/mh/user/dto/HotWaterBackPumpControlVO.java @@ -126,6 +126,14 @@ public class HotWaterBackPumpControlVO { private int twoPumpStart; private String twoPumpStartId; + // 温度设置上限 + private BigDecimal tempSetUpperLimit; + private String tempSetUpperLimitId; + + // 温度设置下限 + private BigDecimal tempSetLowerLimit; + private String tempSetLowerLimitId; + @Override public String toString() { return new StringJoiner(", ", HotWaterBackPumpControlVO.class.getSimpleName() + "[", "]") diff --git a/user-service/src/main/java/com/mh/user/dto/HotWaterDeviceControlVO.java b/user-service/src/main/java/com/mh/user/dto/HotWaterDeviceControlVO.java index d448e5d..9e91a39 100644 --- a/user-service/src/main/java/com/mh/user/dto/HotWaterDeviceControlVO.java +++ b/user-service/src/main/java/com/mh/user/dto/HotWaterDeviceControlVO.java @@ -35,6 +35,25 @@ public class HotWaterDeviceControlVO { private Date currentTime; private String currentTimeId; + /** + * 通讯失败 + */ + private int communicationFailure; + private String communicationFailureId; + + /** + * 通讯失败次数 + * @return + */ + private int communicationFailureCount; + private String communicationFailureCountId; + + /** + * modbus 复位 + */ + private int reset; + private String resetId; + @Override public String toString() { return new StringJoiner(", ", HotWaterDeviceControlVO.class.getSimpleName() + "[", "]") diff --git a/user-service/src/main/java/com/mh/user/dto/HotWaterHotPumpControlVO.java b/user-service/src/main/java/com/mh/user/dto/HotWaterHotPumpControlVO.java index 03a3ec2..84f50b3 100644 --- a/user-service/src/main/java/com/mh/user/dto/HotWaterHotPumpControlVO.java +++ b/user-service/src/main/java/com/mh/user/dto/HotWaterHotPumpControlVO.java @@ -220,6 +220,25 @@ public class HotWaterHotPumpControlVO { // 去掉pump后的类名 // 热泵_故障 -> 去掉pump前缀 private int fault; + /** + * 通讯失败 + */ + private int communicationFailure; + private String communicationFailureId; + + /** + * 通讯失败次数 + * @return + */ + private int communicationFailureCount; + private String communicationFailureCountId; + + /** + * modbus 复位 + */ + private int reset; + private String resetId; + @Override public String toString() { return new StringJoiner(", ", HotWaterHotPumpControlVO.class.getSimpleName() + "[", "]") // 更新类名引用 diff --git a/user-service/src/main/java/com/mh/user/dto/HotWaterSystemControlVO.java b/user-service/src/main/java/com/mh/user/dto/HotWaterSystemControlVO.java index d72fa0f..4a74df6 100644 --- a/user-service/src/main/java/com/mh/user/dto/HotWaterSystemControlVO.java +++ b/user-service/src/main/java/com/mh/user/dto/HotWaterSystemControlVO.java @@ -99,6 +99,12 @@ public class HotWaterSystemControlVO { private int orderNum; + /** + * modbus 复位 + */ + private int reset; + private String resetId; + @Override public String toString() { return new StringJoiner(", ", HotWaterSystemControlVO.class.getSimpleName() + "[", "]") diff --git a/user-service/src/main/java/com/mh/user/job/GetWeatherInfoJob.java b/user-service/src/main/java/com/mh/user/job/GetWeatherInfoJob.java index 6921041..2723d62 100644 --- a/user-service/src/main/java/com/mh/user/job/GetWeatherInfoJob.java +++ b/user-service/src/main/java/com/mh/user/job/GetWeatherInfoJob.java @@ -47,7 +47,7 @@ public class GetWeatherInfoJob { // 从系统参数中获取对应的项目区域 SysParamEntity sysParam = sysParamService.selectSysParam(); if (null != sysParam) { - String url = "https://restapi.amap.com/v3/weather/weatherInfo?extensions=all&key="+amapKey+"&city="+sysParam.getProArea(); + String url = "https://restapi.amap.com/v3/weather/weatherInfo?extensions=all&key=" + amapKey + "&city=" + sysParam.getProArea(); String returnResult = restTemplate.getForObject(url, String.class); if (!StringUtils.isBlank(returnResult)) { JSONObject jsonObject = JSON.parseObject(returnResult); @@ -61,4 +61,15 @@ public class GetWeatherInfoJob { } } } + + /** + * 定时清除collectionParam + */ + @Scheduled(cron = "0 0 0 1/1 * ?") + public void deleteCache() { + Object wetTemp = caffeineCache.getIfPresent("collectionParams"); + if (wetTemp != null) { + caffeineCache.invalidate("collectionParams"); + } + } } diff --git a/user-service/src/main/java/com/mh/user/mapper/CollectionParamsManageMapper.java b/user-service/src/main/java/com/mh/user/mapper/CollectionParamsManageMapper.java index ba561db..078cacc 100644 --- a/user-service/src/main/java/com/mh/user/mapper/CollectionParamsManageMapper.java +++ b/user-service/src/main/java/com/mh/user/mapper/CollectionParamsManageMapper.java @@ -297,4 +297,33 @@ public interface CollectionParamsManageMapper extends BaseMapper selectAllCPMList(); } diff --git a/user-service/src/main/java/com/mh/user/service/CollectionParamsManageService.java b/user-service/src/main/java/com/mh/user/service/CollectionParamsManageService.java index add5e05..3b9423d 100644 --- a/user-service/src/main/java/com/mh/user/service/CollectionParamsManageService.java +++ b/user-service/src/main/java/com/mh/user/service/CollectionParamsManageService.java @@ -39,4 +39,6 @@ public interface CollectionParamsManageService { List operateList(String floorId); void getBatchUpdateCollectionParams(List batch, String sn, String plcName, String projectName, String time, String buildingId); + + List selectAllCPMList(); } diff --git a/user-service/src/main/java/com/mh/user/service/impl/CollectionParamsManageServiceImpl.java b/user-service/src/main/java/com/mh/user/service/impl/CollectionParamsManageServiceImpl.java index 703e43f..1cd6e8f 100644 --- a/user-service/src/main/java/com/mh/user/service/impl/CollectionParamsManageServiceImpl.java +++ b/user-service/src/main/java/com/mh/user/service/impl/CollectionParamsManageServiceImpl.java @@ -155,6 +155,11 @@ public class CollectionParamsManageServiceImpl implements CollectionParamsManage // 时间 handleSystemTimeParameters(vo, item); break; + case "28": + // modbus 重置复位 + vo.setReset(item.getCurValue().intValue()); + vo.setResetId(item.getCpmId()); + break; default: break; } @@ -315,6 +320,21 @@ public class CollectionParamsManageServiceImpl implements CollectionParamsManage hotPumpVo.setManualAutoSwitchId(item.getCpmId()); } break; + case "28": + // 通讯失败复位 + hotPumpVo.setReset(item.getCurValue().intValue()); + hotPumpVo.setResetId(item.getCpmId()); + break; + case "29": + // 通讯失败 + hotPumpVo.setCommunicationFailure(item.getCurValue().intValue()); + hotPumpVo.setCommunicationFailureId(item.getCpmId()); + break; + case "30": + // 通讯失败次数 + hotPumpVo.setCommunicationFailureCount(item.getCurValue().intValue()); + hotPumpVo.setCommunicationFailureCountId(item.getCpmId()); + break; default: break; } @@ -587,6 +607,16 @@ public class CollectionParamsManageServiceImpl implements CollectionParamsManage backPumpVo.setTwoPumpStart(item.getCurValue().intValue()); backPumpVo.setTwoPumpStartId(item.getCpmId()); break; + case "26": + // 温度上限 + backPumpVo.setTempSetUpperLimit(item.getCurValue()); + backPumpVo.setTempSetUpperLimitId(item.getCpmId()); + break; + case "27": + // 温度下限 + backPumpVo.setTempSetLowerLimit(item.getCurValue()); + backPumpVo.setTempSetLowerLimitId(item.getCpmId()); + break; default: break; } @@ -739,6 +769,21 @@ public class CollectionParamsManageServiceImpl implements CollectionParamsManage meterVo.setCurrentTime(item.getCurTime()); meterVo.setCurrentTimeId(item.getCpmId()); break; + case "29": + // 水电表通讯故障 + meterVo.setCommunicationFailure(item.getCurValue().intValue()); + meterVo.setCommunicationFailureId(item.getCpmId()); + break; + case "30": + // 水电表通讯故障计数 + meterVo.setCommunicationFailureCount(item.getCurValue().intValue()); + meterVo.setCommunicationFailureCountId(item.getCpmId()); + break; + case "28": + // 水电表通信故障复位 + meterVo.setReset(item.getCurValue().intValue()); + meterVo.setResetId(item.getCpmId()); + break; default: break; } @@ -834,4 +879,9 @@ public class CollectionParamsManageServiceImpl implements CollectionParamsManage } return 0; } + + @Override + public List selectAllCPMList() { + return collectionParamsManageMapper.selectAllCPMList(); + } } diff --git a/user-service/src/main/java/com/mh/user/service/impl/DataResultServiceImpl.java b/user-service/src/main/java/com/mh/user/service/impl/DataResultServiceImpl.java index 5f9175a..bcc2ddc 100644 --- a/user-service/src/main/java/com/mh/user/service/impl/DataResultServiceImpl.java +++ b/user-service/src/main/java/com/mh/user/service/impl/DataResultServiceImpl.java @@ -7,12 +7,15 @@ import com.mh.user.mapper.DeviceInstallMapper; import com.mh.user.service.DataResultService; import com.mh.user.service.DeviceInstallService; import com.mh.user.utils.ExchangeStringUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.text.DecimalFormat; -import java.text.ParseException; -import java.text.SimpleDateFormat; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; import java.util.Calendar; import java.util.Date; import java.util.List; @@ -22,6 +25,13 @@ import static java.util.Calendar.*; @Service public class DataResultServiceImpl implements DataResultService { + private static final Logger logger = LoggerFactory.getLogger(DataResultServiceImpl.class); + + private static final DecimalFormat DECIMAL_FORMAT = new DecimalFormat("0.000"); + + // 使用线程安全的DateTimeFormatter替代SimpleDateFormat + private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + @Autowired DataResultMapper dataResultMapper; @@ -31,151 +41,237 @@ public class DataResultServiceImpl implements DataResultService { @Autowired DeviceInstallService deviceInstallService; - private static final DecimalFormat DECIMAL_FORMAT = new DecimalFormat("0.000"); - - private static final SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + private static final double ELEC_MAX_INCREMENT = 2000; + private static final double OTHER_MAX_INCREMENT = 300; + private static final double ELEC_DAY_VALUE = 1000; + private static final double OTHER_DAY_VALUE = 100; @Override public void saveDataResult(DataResultEntity dataResultEntity) { + if (dataResultEntity == null) { + logger.warn("saveDataResult: dataResultEntity is null"); + return; + } - double lastValue=0; - double calcValue=0; - double initValue=0; - double curValue=0; - double dayValue=0; - double newCurValue = 0; - int days=0; - try{ - Date date=new Date(); //获取系统日期 - if (dataResultEntity.getCurDate()==null){ - dataResultEntity.setCurDate(date); + try { + Date currentDate = new Date(); + if (dataResultEntity.getCurDate() == null) { + dataResultEntity.setCurDate(currentDate); } + // 格式化时间,按五分钟保存一次 Date formattedDate = formatToFiveMinuteInterval(dataResultEntity.getCurDate()); dataResultEntity.setCurDate(formattedDate); - //从安装表获取设备信息 - DeviceInstallEntity deviceInstallEntity=deviceInstallMapper.selectDevice( + // 从安装表获取设备信息 + DeviceInstallEntity deviceInstallEntity = deviceInstallMapper.selectDevice( dataResultEntity.getDeviceAddr(), dataResultEntity.getDeviceType(), dataResultEntity.getBuildingId()); - double ratio=deviceInstallEntity.getRatio(); //倍率 - initValue=deviceInstallEntity.getInitValue(); - dayValue=deviceInstallEntity.getDayValue(); - newCurValue = deviceInstallEntity.getLastValue(); - if(dataResultEntity.getDeviceType().equals("电表")){ - if (dataResultEntity.getCurValue() - newCurValue > 2000) { - return; - } - dayValue=1000; - }else{ - if (dataResultEntity.getCurValue() - newCurValue > 300) { - return; - } - dayValue=100; + + if (deviceInstallEntity == null) { + logger.warn("saveDataResult: deviceInstallEntity is null for addr={}, type={}, building={}", + dataResultEntity.getDeviceAddr(), + dataResultEntity.getDeviceType(), + dataResultEntity.getBuildingId()); + return; + } + + double ratio = deviceInstallEntity.getRatio(); + double initValue = deviceInstallEntity.getInitValue(); + double dayValue = deviceInstallEntity.getDayValue(); + double lastValueFromInstall = deviceInstallEntity.getLastValue(); + + // 判断设备类型并设置阈值 + boolean isElectricMeter = "电表".equals(dataResultEntity.getDeviceType()); + double maxIncrement = isElectricMeter ? ELEC_MAX_INCREMENT : OTHER_MAX_INCREMENT; + dayValue = isElectricMeter ? ELEC_DAY_VALUE : OTHER_DAY_VALUE; + + // 检查增量是否异常 + if (dataResultEntity.getCurValue() - lastValueFromInstall > maxIncrement) { + logger.info("saveDataResult: increment too large, skip. curValue={}, lastValue={}, deviceAddr={}", + dataResultEntity.getCurValue(), lastValueFromInstall, dataResultEntity.getDeviceAddr()); + return; } - int r = dataResultMapper.selectDataResultCount( - sdf1.format(dataResultEntity.getCurDate()), + + String curDateStr = formatDateToString(formattedDate); + int recordCount = dataResultMapper.selectDataResultCount( + curDateStr, dataResultEntity.getDeviceAddr(), dataResultEntity.getDeviceType(), dataResultEntity.getBuildingId()); - if (r==0){//插入记录 - // 获取上一个抄表记录curValue,curDate - // dataResultEntity.getCurDate()减去5分钟, - Calendar calendar = Calendar.getInstance(); - calendar.setTime(dataResultEntity.getCurDate()); - calendar.add(Calendar.MINUTE, -5); - DataResultEntity lastData = dataResultMapper.selectDataResult(sdf1.format(calendar.getTime()), - dataResultEntity.getDeviceAddr(), - dataResultEntity.getDeviceType(), - dataResultEntity.getBuildingId()); - DataResultEntity data=new DataResultEntity(); - data.setDeviceAddr(dataResultEntity.getDeviceAddr()); //通讯地址 - data.setDeviceType(dataResultEntity.getDeviceType()); //设备类型 - String curDate=sdf1.format(dataResultEntity.getCurDate()); - data.setCurDate(sdf1.parse(curDate)); //当前日期 - data.setCurValue(dataResultEntity.getCurValue()); //当前读数 - curValue=dataResultEntity.getCurValue(); - data.setBuildingId(deviceInstallEntity.getBuildingId()); //楼栋编号 - data.setRatio(ratio); //倍率 - if (lastData!=null){ - data.setLastDate(lastData.getCurDate()); //上次抄表日期 - data.setLastValue(lastData.getCurValue()); //上次读数 - } else { - lastValue = deviceInstallEntity.getLastValue(); //上次读数 - if (deviceInstallEntity.getLastDate()!=null){ //上次抄表日期 - String lastDate=sdf1.format(deviceInstallEntity.getLastDate()); - data.setLastDate(sdf1.parse(lastDate)); - }else{ - String lastDate=sdf1.format(date); - data.setLastDate(sdf1.parse(lastDate)); - } - data.setLastValue(lastValue); - } - calcValue=(dataResultEntity.getCurValue()-lastValue)*ratio; //计算用量 - // 使用时 - calcValue = Double.parseDouble(DECIMAL_FORMAT.format(calcValue)); - data.setCalcValue(calcValue); //用量 + double calcValue; + boolean shouldSave = false; + + if (recordCount == 0) { + // 插入新记录 + DataResultEntity newData = createNewDataResult(dataResultEntity, deviceInstallEntity, formattedDate, ratio); + calcValue = newData.getCalcValue(); - //判断读数,并保存数据 + // 计算相差天数 + Date lastDate = newData.getLastDate(); if (deviceInstallEntity.getLastDate() == null) { - deviceInstallEntity.setLastDate(date); - } - days=(int)ExchangeStringUtil.daysBetween(dataResultEntity.getCurDate(),deviceInstallEntity.getLastDate()); //计算相差天数 - if (calcValue>=0 && calcValue<=dayValue){ - dataResultMapper.saveDataResult(data); //插入新的记录 - }else if(calcValue>dayValue){ - if(days>0){ - if(calcValue/days<=dayValue){ - dataResultMapper.saveDataResult(data); - } - } + lastDate = currentDate; } + int days = (int) ExchangeStringUtil.daysBetween(formattedDate, lastDate); + + shouldSave = validateAndSaveData(newData, calcValue, dayValue, days, true); - }else {//修改记录的curValue、calcValue - DataResultEntity data2=dataResultMapper.selectDataResult(sdf1.format(dataResultEntity.getCurDate()), +// if (shouldSave) { +// lastValue = newData.getLastValue(); +// } + } else { + // 修改现有记录 + DataResultEntity existingData = dataResultMapper.selectDataResult( + curDateStr, dataResultEntity.getDeviceAddr(), dataResultEntity.getDeviceType(), dataResultEntity.getBuildingId()); - lastValue=data2.getLastValue(); //安装基表上次读数 - calcValue=dataResultEntity.getCurValue()-lastValue; //计算用量 - String curDate=sdf1.format(dataResultEntity.getCurDate()); - data2.setCurDate(sdf1.parse(curDate));//当前日期 - data2.setCurValue(dataResultEntity.getCurValue());//当前读数 - data2.setCalcValue(calcValue);//用量 - - //判断读数,并保存数据 - days=(int)ExchangeStringUtil.daysBetween(dataResultEntity.getCurDate(),data2.getLastDate()); //计算相差天数 - if (calcValue>=0 && calcValue<=dayValue){ - dataResultMapper.updateDataResult(data2); //更新记录 - }else if(calcValue>dayValue){ - if(days>0){ - if(calcValue/days<=dayValue){ - dataResultMapper.saveDataResult(data2); - } - } + + if (existingData != null) { + double lastValue = existingData.getLastValue(); + calcValue = (dataResultEntity.getCurValue() - lastValue) * ratio; + calcValue = formatDouble(calcValue); + + existingData.setCurDate(formattedDate); + existingData.setCurValue(dataResultEntity.getCurValue()); + existingData.setCalcValue(calcValue); + + Date lastDate = existingData.getLastDate(); + int days = (int) ExchangeStringUtil.daysBetween(formattedDate, lastDate); + + shouldSave = validateAndSaveData(existingData, calcValue, dayValue, days, false); } } - //修改安装表中lastValue,lastDate,ini_value的值 - if (calcValue>=0 && calcValue<=dayValue){ - deviceInstallMapper.updateLastValue(deviceInstallEntity.getId(),String.valueOf(dataResultEntity.getCurValue()),date); - }else if(calcValue>dayValue){ - if(days>0){ - if(calcValue/days<=dayValue){ - deviceInstallMapper.updateLastValue(deviceInstallEntity.getId(),String.valueOf(dataResultEntity.getCurValue()),date); - } - } + // 更新安装表中的lastValue和lastDate + if (shouldSave) { + deviceInstallMapper.updateLastValue( + deviceInstallEntity.getId(), + String.valueOf(dataResultEntity.getCurValue()), + currentDate); } - if(initValue==0){//第一次采集的时候 - deviceInstallMapper.updateLastValue(deviceInstallEntity.getId(),String.valueOf(dataResultEntity.getCurValue()),date); - deviceInstallMapper.updateInitValue(dataResultEntity.getDeviceAddr(), + + // 第一次采集时初始化initValue + if (initValue == 0) { + deviceInstallMapper.updateLastValue( + deviceInstallEntity.getId(), + String.valueOf(dataResultEntity.getCurValue()), + currentDate); + deviceInstallMapper.updateInitValue( + dataResultEntity.getDeviceAddr(), dataResultEntity.getDeviceType(), - dataResultEntity.getBuildingId(),String.valueOf(dataResultEntity.getCurValue()), deviceInstallEntity.getId()); + dataResultEntity.getBuildingId(), + String.valueOf(dataResultEntity.getCurValue()), + deviceInstallEntity.getId()); + } + + } catch (Exception e) { + logger.error("saveDataResult error for deviceAddr={}, deviceType={}", + dataResultEntity.getDeviceAddr(), dataResultEntity.getDeviceType(), e); + } + } + + /** + * 创建新的数据结果实体 + */ + private DataResultEntity createNewDataResult(DataResultEntity dataResultEntity, + DeviceInstallEntity deviceInstallEntity, + Date curDate, + double ratio) throws Exception { + DataResultEntity data = new DataResultEntity(); + data.setDeviceAddr(dataResultEntity.getDeviceAddr()); + data.setDeviceType(dataResultEntity.getDeviceType()); + data.setCurDate(curDate); + data.setCurValue(dataResultEntity.getCurValue()); + data.setBuildingId(deviceInstallEntity.getBuildingId()); + data.setRatio(ratio); + + // 获取上一个抄表记录 + Calendar calendar = Calendar.getInstance(); + calendar.setTime(curDate); + calendar.add(MINUTE, -5); + String lastDateStr = formatDateToString(calendar.getTime()); + + DataResultEntity lastData = dataResultMapper.selectDataResult( + lastDateStr, + dataResultEntity.getDeviceAddr(), + dataResultEntity.getDeviceType(), + dataResultEntity.getBuildingId()); + + double lastValue; + if (lastData != null) { + data.setLastDate(lastData.getCurDate()); + data.setLastValue(lastData.getCurValue()); + lastValue = lastData.getCurValue(); + } else { + lastValue = deviceInstallEntity.getLastValue(); + Date lastDate = deviceInstallEntity.getLastDate(); + if (lastDate == null) { + lastDate = new Date(); + } + data.setLastDate(lastDate); + data.setLastValue(lastValue); + } + + double calcValue = (dataResultEntity.getCurValue() - lastValue) * ratio; + calcValue = formatDouble(calcValue); + data.setCalcValue(calcValue); + + return data; + } + + /** + * 验证数据并保存 + * @return 是否保存成功 + */ + private boolean validateAndSaveData(DataResultEntity dataResultEntity, + double calcValue, + double dayValue, + int days, + boolean isNew) { + if (calcValue < 0) { + logger.warn("calcValue is negative: {}", calcValue); + return false; + } + + if (calcValue <= dayValue) { + if (isNew) { + dataResultMapper.saveDataResult(dataResultEntity); + } else { + dataResultMapper.updateDataResult(dataResultEntity); } - }catch (Exception e){ - //e.printStackTrace(); + return true; + } else if (days > 0 && calcValue / days <= dayValue) { + dataResultMapper.saveDataResult(dataResultEntity); + return true; + } + + logger.warn("calcValue exceeds dayValue: calcValue={}, dayValue={}, days={}", calcValue, dayValue, days); + return false; + } + + /** + * 格式化日期为字符串 + */ + private String formatDateToString(Date date) { + if (date == null) { + return null; + } + LocalDateTime localDateTime = date.toInstant().atZone(ZoneId.systemDefault()).toLocalDateTime(); + return localDateTime.format(DATE_TIME_FORMATTER); + } + + /** + * 格式化双精度数值 + */ + private double formatDouble(double value) { + try { + return Double.parseDouble(DECIMAL_FORMAT.format(value)); + } catch (NumberFormatException e) { + logger.error("Failed to format double value: {}", value, e); + return value; } } @@ -190,25 +286,15 @@ public class DataResultServiceImpl implements DataResultService { return new Date(); } - Calendar calendar = getInstance(); + Calendar calendar = Calendar.getInstance(); calendar.setTime(originalDate); int minutes = calendar.get(MINUTE); - int seconds = calendar.get(SECOND); - int milliseconds = calendar.get(MILLISECOND); - - // 计算下一个五分钟节点 - int nextFiveMinute = ((minutes / 5) + 1) * 5; - // 如果超过55分钟,则进位到下一小时 - if (nextFiveMinute >= 60) { - calendar.add(HOUR_OF_DAY, 1); - calendar.set(MINUTE, 0); - } else { - calendar.set(MINUTE, nextFiveMinute); - } + // 向下取整到最近的五分钟节点 + int roundedMinutes = (minutes / 5) * 5; - // 秒和毫秒设为0 + calendar.set(MINUTE, roundedMinutes); calendar.set(SECOND, 0); calendar.set(MILLISECOND, 0); @@ -221,7 +307,7 @@ public class DataResultServiceImpl implements DataResultService { } @Override - public List queryDataResult(String buildingId, String startDate, String endDate,String deviceType, int page, int limit) { + public List queryDataResult(String buildingId, String startDate, String endDate, String deviceType, int page, int limit) { return dataResultMapper.queryDataResult(buildingId,startDate,endDate,deviceType,page,limit); } diff --git a/user-service/src/main/java/com/mh/user/service/mqtt/service/impl/EventsServiceImpl.java b/user-service/src/main/java/com/mh/user/service/mqtt/service/impl/EventsServiceImpl.java index 953f280..0ca729c 100644 --- a/user-service/src/main/java/com/mh/user/service/mqtt/service/impl/EventsServiceImpl.java +++ b/user-service/src/main/java/com/mh/user/service/mqtt/service/impl/EventsServiceImpl.java @@ -19,23 +19,27 @@ import com.mh.user.service.GatewayManageService; import com.mh.user.service.mqtt.service.IEventsService; import com.mh.user.strategy.DeviceStrategy; import com.mh.user.strategy.DeviceStrategyFactory; -import com.mh.user.utils.CacheUtil; import com.mh.user.utils.DateUtil; -import com.mh.user.utils.SpringBeanUtil; import io.netty.util.CharsetUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.context.ApplicationContext; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.messaging.MessageHeaders; import org.springframework.stereotype.Service; +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; import java.io.IOException; import java.math.BigDecimal; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; /** * @author LJF @@ -62,8 +66,129 @@ public class EventsServiceImpl implements IEventsService { private DeviceInstallService deviceInstallService; @Autowired + @Qualifier("caffeineCache") private Cache caffeineCache; + // 常量定义 + private static final int BATCH_SIZE = 100; + private static final long TIME_INTERVAL_THRESHOLD_MS = 150000; // 150秒 + + // 线程池配置 + private static final int CORE_POOL_SIZE = 3; + private static final int MAX_POOL_SIZE = 5; + private static final int QUEUE_CAPACITY = 100; + private static final long KEEP_ALIVE_TIME = 30L; + + // 缓存配置 + private static final int TIME_CACHE_MAX_SIZE = 1000; + private static final int TIME_CACHE_EXPIRE_MINUTES = 30; + private static final int DEVICE_CACHE_MAX_SIZE = 500; + private static final int DEVICE_CACHE_EXPIRE_MINUTES = 60; + + // 线程池 + private ExecutorService deviceAnalysisExecutor; + + // 本地缓存,使用带过期时间的缓存 + private final ConcurrentHashMap timeCache = new ConcurrentHashMap<>(); + + // 设备缓存,避免频繁查询数据库 + private final ConcurrentHashMap deviceCache = new ConcurrentHashMap<>(); + + // Device和Strategy实例缓存 + private final ConcurrentHashMap deviceInstanceCache = new ConcurrentHashMap<>(); + private final ConcurrentHashMap strategyInstanceCache = new ConcurrentHashMap<>(); + + // 缓存的设备信息 + private static class CachedDeviceInfo { + DeviceInstallEntity device; + long timestamp; + + CachedDeviceInfo(DeviceInstallEntity device) { + this.device = device; + this.timestamp = System.currentTimeMillis(); + } + + boolean isExpired() { + return System.currentTimeMillis() - timestamp > TimeUnit.MINUTES.toMillis(DEVICE_CACHE_EXPIRE_MINUTES); + } + } + + @PostConstruct + public void init() { + // 初始化线程池 + deviceAnalysisExecutor = new ThreadPoolExecutor( + CORE_POOL_SIZE, + MAX_POOL_SIZE, + KEEP_ALIVE_TIME, + TimeUnit.SECONDS, + new ArrayBlockingQueue<>(QUEUE_CAPACITY), + new ThreadFactory() { + private final AtomicInteger threadNumber = new AtomicInteger(1); + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "device-analysis-thread-" + threadNumber.getAndIncrement()); + } + }, + new ThreadPoolExecutor.CallerRunsPolicy() // 改为CallerRuns,防止任务丢失 + ); + + // 启动缓存清理定时任务 + ScheduledExecutorService cleanupExecutor = Executors.newSingleThreadScheduledExecutor(r -> + new Thread(r, "cache-cleanup-thread")); + cleanupExecutor.scheduleAtFixedRate(this::cleanupCaches, 5, 5, TimeUnit.MINUTES); + + log.info("EventsServiceImpl initialized, thread pool: core={}, max={}, queue={}", + CORE_POOL_SIZE, MAX_POOL_SIZE, QUEUE_CAPACITY); + } + + @PreDestroy + public void destroy() { + if (deviceAnalysisExecutor != null && !deviceAnalysisExecutor.isShutdown()) { + deviceAnalysisExecutor.shutdown(); + try { + if (!deviceAnalysisExecutor.awaitTermination(30, TimeUnit.SECONDS)) { + deviceAnalysisExecutor.shutdownNow(); + } + } catch (InterruptedException e) { + deviceAnalysisExecutor.shutdownNow(); + Thread.currentThread().interrupt(); + } + log.info("EventsServiceImpl destroyed, thread pool shutdown"); + } + clearAllCaches(); + } + + /** + * 清理过期缓存 + */ + private void cleanupCaches() { + try { + long now = System.currentTimeMillis(); + long expireTime = TimeUnit.MINUTES.toMillis(TIME_CACHE_EXPIRE_MINUTES); + + // 清理timeCache过期条目 + timeCache.entrySet().removeIf(entry -> now - entry.getValue() > expireTime); + + // 清理deviceCache过期条目 + deviceCache.entrySet().removeIf(entry -> entry.getValue().isExpired()); + + log.debug("缓存清理完成, timeCache.size={}, deviceCache.size={}", + timeCache.size(), deviceCache.size()); + } catch (Exception e) { + log.error("清理缓存失败", e); + } + } + + /** + * 清空所有缓存 + */ + private void clearAllCaches() { + timeCache.clear(); + deviceCache.clear(); + deviceInstanceCache.clear(); + strategyInstanceCache.clear(); + } + @ServiceActivator(inputChannel = ChannelName.EVENTS_UPLOAD_INBOUND) @Override public void handleInboundUpload(byte[] receiver, MessageHeaders headers) { @@ -95,143 +220,431 @@ public class EventsServiceImpl implements IEventsService { log.info("接收到控制指令下发=>{}", sendStr); } - private void handleInboundData(byte[] receiver,String topic, String logMessage) { + private void handleInboundData(byte[] receiver, String topic, String logMessage) { try { // 使用 TypeReference 确保泛型信息被保留 SanShiFengReceiver datas = mapper.readValue(receiver, new TypeReference>() {}); - log.info("主题:{},类型:{}: ,数据:{}", topic, logMessage, datas.toString()); + // log.info("主题:{},类型:{}: ,数据:{}", topic, logMessage, datas.toString()); + // 开始遍历 数据 String sn = datas.getSn(); String plcName = datas.getPlcName(); String projectName = datas.getProjectName(); String time = datas.getTime(); + // 更新网关设备在线状态 gatewayManageService.updateGatewayManageOnlineBySn(sn, 0); + // 获取网关对应的buildingId String buildingId = gatewayManageService.queryBuildingIdBySn(sn); if (StringUtils.isBlank(buildingId)) { - log.error("未找到对应的buildingId"); + log.error("未找到对应的buildingId, SN: {}", sn); return; } - // 修复类型转换问题 + + // 获取数据列表 List rawDataList = datas.getDatas(); if (rawDataList == null || rawDataList.isEmpty()) { log.warn("数据列表为空,SN: {}", sn); return; } - // rawDataList进行批量更新,100条数据进行批量处理 - int batchSize = 100; - for (int i = 0; i < rawDataList.size(); i += batchSize) { - int endIndex = Math.min(i + batchSize, rawDataList.size()); + + // 批量更新collectionParams + processBatchUpdate(rawDataList, sn, plcName, projectName, time, buildingId); + + // 检查时间间隔并处理数据 + if (shouldProcessData(sn, time)) { + processDataList(rawDataList, sn, plcName, projectName, time, buildingId); + } + + } catch (IOException e) { + log.error("处理数据时发生错误: ", e); + } + } + + /** + * 批量更新collectionParams + */ + private void processBatchUpdate(List rawDataList, + String sn, + String plcName, + String projectName, + String time, + String buildingId) { + try { + int size = rawDataList.size(); + for (int i = 0; i < size; i += BATCH_SIZE) { + int endIndex = Math.min(i + BATCH_SIZE, size); List batch = rawDataList.subList(i, endIndex); collectionParamManageService.getBatchUpdateCollectionParams(batch, sn, plcName, projectName, time, buildingId); } + } catch (Exception e) { + log.error("批量更新collectionParams失败: SN={}", sn, e); + } + } + + /** + * 判断是否应该处理数据(基于时间间隔) + */ + private boolean shouldProcessData(String sn, String time) { + if (StringUtils.isBlank(sn) || StringUtils.isBlank(time)) { + return false; + } + + String cacheKey = sn + "_time"; + Long lastTimestamp = timeCache.get(cacheKey); + + if (lastTimestamp == null) { + timeCache.put(cacheKey, DateUtil.getTimeStamp(time)); + return false; + } + + try { + long currentTimeStamp = DateUtil.getTimeStamp(time); + long timeDiff = Math.abs(currentTimeStamp - lastTimestamp); + + if (timeDiff >= TIME_INTERVAL_THRESHOLD_MS) { + timeCache.put(cacheKey, currentTimeStamp); + return true; + } + } catch (Exception e) { + log.error("计算时间间隔失败: SN={}, time={}", sn, time, e); + } + + return false; + } + + /** + * 处理数据列表(修复CountDownLatch泄露问题) + */ + private void processDataList(List rawDataList, + String sn, + String plcName, + String projectName, + String time, + String buildingId) { + if (rawDataList == null || rawDataList.isEmpty()) { + return; + } + + // 限制并发数,避免一次性创建过多任务 + int size = rawDataList.size(); + int maxConcurrent = Math.min(size, MAX_POOL_SIZE * 2); + CountDownLatch latch = new CountDownLatch(size); + AtomicInteger successCount = new AtomicInteger(0); + AtomicInteger failCount = new AtomicInteger(0); + + // 使用信号量控制并发数 + Semaphore semaphore = new Semaphore(maxConcurrent); + + for (int i = 0; i < size; i++) { + final int index = i; + final SanShiFengDatas data = rawDataList.get(i); -// // 先批量更新collectionParam -// rawDataList.parallelStream().forEach(rawData -> { -// try { -// processDataUpdateCpmItem(rawData, sn, plcName, projectName, time, buildingId); -// } catch (Exception e) { -// log.error("处理单个数据项失败: {}", rawData, e); -// } -// }); - // 通过判断当前time跟上一个time相差30s才存储进入队列 - if (caffeineCache.getIfPresent(sn+"_time") != null) { - String lastTime = (String)caffeineCache.getIfPresent(sn+"_time"); - // yyyy-MM-dd HH:mm:ss格式转为秒的时间戳 - long lastTimeStamp = DateUtil.getTimeStamp(lastTime); - long currentTimeStamp = DateUtil.getTimeStamp(time); - // 判断时间间隔 - if (!StringUtils.isBlank(lastTime) && Math.abs(currentTimeStamp -lastTimeStamp) >= 60000) { - // 并行处理数据列表,主线程不阻塞 - rawDataList.parallelStream().forEach(rawData -> { - try { - processDataItem(rawData, sn, plcName, projectName, time, buildingId); - } catch (Exception e) { - log.error("处理单个数据项失败: {}", rawData, e); - } - }); + // 提交任务 + submitTaskWithFallback(() -> { + try { + semaphore.acquire(); + try { + processDataItem(data, sn, plcName, projectName, time, buildingId); + successCount.incrementAndGet(); + } catch (Exception e) { + log.error("处理数据项失败: index={}", index, e); + failCount.incrementAndGet(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.error("任务被中断: index={}", index, e); + } finally { + semaphore.release(); + latch.countDown(); // 确保countDown一定会执行 } - } else { - caffeineCache.put(sn+"_time", time); + }); + } + + // 等待所有任务完成 + try { + latch.await(30, TimeUnit.SECONDS); + if (latch.getCount() > 0) { + log.warn("部分任务未在指定时间内完成, remaining={}, success={}, fail={}", + latch.getCount(), successCount.get(), failCount.get()); } - } catch (IOException e) { - log.error("处理数据时发生错误: ", e); + } catch (InterruptedException e) { + log.warn("等待数据处理完成被中断"); + Thread.currentThread().interrupt(); + } + } + + /** + * 提交任务,失败时直接在当前线程执行 + */ + private void submitTaskWithFallback(Runnable task) { + try { + // 尝试提交到线程池 + if (!deviceAnalysisExecutor.isShutdown()) { + deviceAnalysisExecutor.submit(task); + return; + } + } catch (RejectedExecutionException | IllegalStateException e) { + log.warn("任务提交失败,降级到当前线程执行"); } + // 降级:在当前线程执行 + task.run(); } private void processDataUpdateCpmItem(SanShiFengDatas data, String sn, String plcName, String projectName, String time, String buildingId) { - // 安全地转换对象 -// SanShiFengDatas data = convertDataItem(rawData); if (data == null) { - log.warn("数据转换失败,跳过处理"); + log.warn("数据为null,跳过处理"); return; } - // 不使用消息队列,查询属于哪种设备类型,然后通过策略进行数据解析 - // log.info("设备SN:{},PLC名称:{},项目名称:{},时间:{},数据:{}", sn, plcName, projectName, time, data.toString()); - // 获取点位参数名称 + String name = data.getName(); - // 获取点位值 - BigDecimal value; + if (StringUtils.isBlank(name)) { + log.warn("点位名称为空,跳过处理"); + return; + } + + BigDecimal value = data.getValue(); + if (value == null) { + value = BigDecimal.ZERO; + } + try { - value = data.getValue(); + collectionParamManageService.updateCPMByOtherName(name, value, time, buildingId); } catch (Exception e) { - value = BigDecimal.ZERO; + log.error("更新collectionParamManage失败: name={}, value={}", name, value, e); } - // 直接更新collectionParamManage参数值 - collectionParamManageService.updateCPMByOtherName(name, value, time, buildingId); } - private void processDataItem(Object rawData, String sn, String plcName, String projectName, String time, String buildingId) { - // 安全地转换对象 - SanShiFengDatas data = convertDataItem(rawData); + private void processDataItem(SanShiFengDatas data, String sn, String plcName, String projectName, String time, String buildingId) { if (data == null) { - log.warn("数据转换失败,跳过处理"); + log.warn("数据为null,跳过处理"); return; } - // 不使用消息队列,查询属于哪种设备类型,然后通过策略进行数据解析 - // log.info("设备SN:{},PLC名称:{},项目名称:{},时间:{},数据:{}", sn, plcName, projectName, time, data.toString()); - // 获取点位参数名称 + + // 获取点位参数名称和值 String name = data.getName(); - // 获取点位值 - BigDecimal value = new BigDecimal(0); + if (StringUtils.isBlank(name)) { + log.warn("点位名称为空,跳过处理"); + return; + } + + BigDecimal value = data.getValue(); + if (value == null) { + value = BigDecimal.ZERO; + } + try { - value = new BigDecimal(String.valueOf(data.getValue())); + // 获取collectionParams缓存,使用带过期时间的本地缓存 + List collectionParams = getCollectionParamsCache(); + + if (collectionParams == null || collectionParams.isEmpty()) { + log.debug("collectionParams缓存为空"); + return; + } + + // 查找匹配的参数实体 + CollectionParamsManageEntity collectionParamsManageEntity = findMatchingCollectionParams(collectionParams, name, buildingId); + + if (collectionParamsManageEntity == null) { + return; + } + + // 检查参数类型,过滤不需要处理的类型 + if (!shouldProcessParamType(collectionParamsManageEntity.getParamTypeId())) { + return; + } + + // 查询设备信息并处理 + processDeviceData(collectionParamsManageEntity, time, value); + } catch (Exception e) { - value = BigDecimal.ZERO; + log.error("处理数据项失败: name={}, value={}", name, value, e); + } + } + + /** + * 获取collectionParams缓存 + */ + private List getCollectionParamsCache() { + try { + // 优先使用caffeine缓存 + List cachedParams = (List) caffeineCache.getIfPresent("collectionParams"); + + if (cachedParams != null) { + return cachedParams; + } + + // 从数据库加载 + List collectionParams = collectionParamManageService.selectAllCPMList(); + + if (collectionParams != null) { + // 放入caffeine缓存 + caffeineCache.put("collectionParams", collectionParams); + log.info("collectionParams已加载到缓存,共{}条记录", collectionParams.size()); + } + + return collectionParams; + + } catch (Exception e) { + log.error("获取collectionParams缓存失败", e); + return null; } - // 直接更新collectionParamManage参数值 - //collectionParamManageService.updateCPMByOtherName(name, value, time, buildingId); - // 查询device_install表,走之前的逻辑 - CollectionParamsManageEntity collectionParamsManageEntity = collectionParamManageService.selectDeviceInstallByOtherName(name, buildingId); - if (null != collectionParamsManageEntity - && collectionParamsManageEntity.getDeviceInstallId() != null - && collectionParamsManageEntity.getParamTypeId() != 0 - && collectionParamsManageEntity.getParamTypeId() != 4 - && collectionParamsManageEntity.getParamTypeId() != 15 - && collectionParamsManageEntity.getParamTypeId() != 16 - && collectionParamsManageEntity.getParamTypeId() != 17 - && collectionParamsManageEntity.getParamTypeId() != 18 - && collectionParamsManageEntity.getParamTypeId() != 19 - && collectionParamsManageEntity.getParamTypeId() != 21 - && collectionParamsManageEntity.getParamTypeId() != 22 - && collectionParamsManageEntity.getParamTypeId() != 23 - && collectionParamsManageEntity.getParamTypeId() != 24 - && collectionParamsManageEntity.getParamTypeId() != 3 // 通过运行状态点判断故障点 - ) { - DeviceInstallEntity deviceInstallEntity = deviceInstallService.selectDeviceById(collectionParamsManageEntity.getDeviceInstallId()); - if (deviceInstallEntity != null) { - // 开始走策略判断 - String deviceType = deviceInstallEntity.getDeviceType(); - Device device = DeviceFactory.createDevice(deviceType); - DeviceStrategy strategy = DeviceStrategyFactory.createStrategy(deviceType); - if (strategy != null) { - device.setStrategy(strategy); - device.analysisMQTTReceiveData(time, deviceInstallEntity.getDeviceAddr(), value.toPlainString(), Constant.READ, deviceInstallEntity, collectionParamsManageEntity); + } + + /** + * 查找匹配的collectionParams + */ + private CollectionParamsManageEntity findMatchingCollectionParams(List collectionParams, + String name, + String buildingId) { + if (collectionParams == null || StringUtils.isBlank(name) || StringUtils.isBlank(buildingId)) { + return null; + } + + try { + for (CollectionParamsManageEntity val : collectionParams) { + if (val == null) { + continue; + } + + String otherName = val.getOtherName(); + if (otherName != null && otherName.trim().equals(name.trim())) { + Object bidObj = val.getBuildingId(); + if (bidObj != null && bidObj.toString().equals(buildingId)) { + return val; + } } } + } catch (Exception e) { + log.error("查找collectionParams失败: name={}, buildingId={}", name, buildingId, e); } + + return null; + } + + /** + * 判断参数类型是否需要处理 + */ + private boolean shouldProcessParamType(int paramTypeId) { + // 不需要处理的参数类型 + int[] excludedTypes = {0, 3, 4, 15, 16, 17, 18, 19, 21, 22, 23, 24}; + + for (int excluded : excludedTypes) { + if (paramTypeId == excluded) { + return false; + } + } + return true; + } + + /** + * 处理设备数据(优化:使用缓存减少数据库查询和对象创建) + */ + private void processDeviceData(CollectionParamsManageEntity collectionParamsManageEntity, + String time, + BigDecimal value) { + if (collectionParamsManageEntity == null || + collectionParamsManageEntity.getDeviceInstallId() == null) { + return; + } + + Long deviceInstallId = collectionParamsManageEntity.getDeviceInstallId(); + + try { + // 从缓存获取设备信息 + DeviceInstallEntity deviceInstallEntity = getDeviceFromCache(deviceInstallId); + + if (deviceInstallEntity == null) { + log.warn("设备信息不存在: deviceInstallId={}", deviceInstallId); + return; + } + + String deviceType = deviceInstallEntity.getDeviceType(); + if (StringUtils.isBlank(deviceType)) { + log.warn("设备类型为空: deviceInstallId={}", deviceInstallEntity.getId()); + return; + } + + // 从缓存获取Device和Strategy实例 + Device device = getDeviceInstance(deviceType); + DeviceStrategy strategy = getStrategyInstance(deviceType); + + if (device == null || strategy == null) { + log.warn("创建设备或策略失败: deviceType={}", deviceType); + return; + } + + // 执行分析 + device.setStrategy(strategy); + device.analysisMQTTReceiveData( + time, + deviceInstallEntity.getDeviceAddr(), + value.toPlainString(), + Constant.READ, + deviceInstallEntity, + collectionParamsManageEntity); + + } catch (Exception e) { + log.error("处理设备数据失败: time={}, value={}", time, value, e); + } + } + + /** + * 从缓存获取设备信息 + */ + private DeviceInstallEntity getDeviceFromCache(Long deviceInstallId) { + if (deviceInstallId == null) { + return null; + } + + // 先从缓存获取 + CachedDeviceInfo cachedInfo = deviceCache.get(deviceInstallId); + + if (cachedInfo != null && !cachedInfo.isExpired()) { + return cachedInfo.device; + } + + // 缓存未命中或已过期,从数据库加载 + try { + DeviceInstallEntity device = deviceInstallService.selectDeviceById(deviceInstallId); + if (device != null) { + deviceCache.put(deviceInstallId, new CachedDeviceInfo(device)); + } + return device; + } catch (Exception e) { + log.error("查询设备信息失败: deviceInstallId={}", deviceInstallId, e); + return null; + } + } + + /** + * 获取Device实例(使用缓存) + */ + private Device getDeviceInstance(String deviceType) { + return deviceInstanceCache.computeIfAbsent(deviceType, type -> { + try { + return DeviceFactory.createDevice(type); + } catch (Exception e) { + log.error("创建Device实例失败: deviceType={}", type, e); + return null; + } + }); + } + + /** + * 获取DeviceStrategy实例(使用缓存) + */ + private DeviceStrategy getStrategyInstance(String deviceType) { + return strategyInstanceCache.computeIfAbsent(deviceType, type -> { + try { + return DeviceStrategyFactory.createStrategy(type); + } catch (Exception e) { + log.error("创建DeviceStrategy实例失败: deviceType={}", type, e); + return null; + } + }); } private SanShiFengDatas convertDataItem(Object rawData) { @@ -239,23 +652,28 @@ public class EventsServiceImpl implements IEventsService { return null; } - SanShiFengDatas data = new SanShiFengDatas(); try { if (rawData instanceof SanShiFengDatas) { - data = (SanShiFengDatas) rawData; + return (SanShiFengDatas) rawData; } else if (rawData instanceof HashMap) { JSONObject jsonObject = new JSONObject((HashMap) rawData); - data = jsonObject.to(SanShiFengDatas.class); + return jsonObject.to(SanShiFengDatas.class); } else { log.warn("不支持的数据类型: {}", rawData.getClass().getName()); return null; } } catch (Exception e) { log.error("数据转换异常", e); - data.setName(getJsonValueAsString(rawData, "name")); - data.setValue(new BigDecimal("-1")); + // 尝试恢复至少name字段 + String name = getJsonValueAsString(rawData, "name"); + if (!StringUtils.isBlank(name)) { + SanShiFengDatas data = new SanShiFengDatas(); + data.setName(name); + data.setValue(BigDecimal.ZERO); + return data; + } + return null; } - return data; } /** @@ -279,7 +697,6 @@ public class EventsServiceImpl implements IEventsService { JSONObject jsonObject = (JSONObject) rawData; return jsonObject.getString(key); } else { - // 如果是其他类型,尝试使用反射或通用方式获取 log.warn("不支持的数据类型: {}", rawData.getClass().getName()); return null; } @@ -289,5 +706,4 @@ public class EventsServiceImpl implements IEventsService { } } - } diff --git a/user-service/src/main/java/com/mh/user/utils/CacheUtil.java b/user-service/src/main/java/com/mh/user/utils/CacheUtil.java index 158b75a..731f18c 100644 --- a/user-service/src/main/java/com/mh/user/utils/CacheUtil.java +++ b/user-service/src/main/java/com/mh/user/utils/CacheUtil.java @@ -3,6 +3,7 @@ package com.mh.user.utils; import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONObject; import com.github.benmanes.caffeine.cache.Cache; +import com.mh.user.entity.CollectionParamsManageEntity; import com.mh.user.entity.DeviceCodeParamEntity; import com.mh.user.entity.GatewayManageEntity; import com.mh.user.service.DeviceCodeParamService; @@ -95,6 +96,17 @@ public class CacheUtil { return JSONArray.parseArray(JSONObject.toJSONString(cacheObject), GatewayManageEntity.class); } + public List getCollectionParams() { + Object cacheObject = caffeineCache.getIfPresent("collectionParams"); + // 如果为空,重新添加 + if (cacheObject == null) { + createDeviceParams(); + // 在重新获取数据 + cacheObject = caffeineCache.getIfPresent("collectionParams"); + } + return JSONArray.parseArray(JSONObject.toJSONString(cacheObject), CollectionParamsManageEntity.class); + } + /** * 删除缓存 */