Browse Source

1、数据更新处理增加缓存、线程池处理,防止内存泄漏;

dev
25604 2 weeks ago
parent
commit
b0e3cfe07d
  1. 4
      user-service/src/main/java/com/mh/user/controller/DeviceOperateController.java
  2. 8
      user-service/src/main/java/com/mh/user/dto/HotWaterBackPumpControlVO.java
  3. 19
      user-service/src/main/java/com/mh/user/dto/HotWaterDeviceControlVO.java
  4. 19
      user-service/src/main/java/com/mh/user/dto/HotWaterHotPumpControlVO.java
  5. 6
      user-service/src/main/java/com/mh/user/dto/HotWaterSystemControlVO.java
  6. 13
      user-service/src/main/java/com/mh/user/job/GetWeatherInfoJob.java
  7. 29
      user-service/src/main/java/com/mh/user/mapper/CollectionParamsManageMapper.java
  8. 2
      user-service/src/main/java/com/mh/user/service/CollectionParamsManageService.java
  9. 50
      user-service/src/main/java/com/mh/user/service/impl/CollectionParamsManageServiceImpl.java
  10. 350
      user-service/src/main/java/com/mh/user/service/impl/DataResultServiceImpl.java
  11. 612
      user-service/src/main/java/com/mh/user/service/mqtt/service/impl/EventsServiceImpl.java
  12. 12
      user-service/src/main/java/com/mh/user/utils/CacheUtil.java

4
user-service/src/main/java/com/mh/user/controller/DeviceOperateController.java

@ -76,7 +76,7 @@ public class DeviceOperateController {
// 获取mqtt操作队列(后期通过mqtt队列配置发送主题) // 获取mqtt操作队列(后期通过mqtt队列配置发送主题)
String sendTopic = name + "/" + controlTopic + "/" + sn; String sendTopic = name + "/" + controlTopic + "/" + sn;
log.info("发送主题:{},消息:{}", sendTopic, sendOrder); log.info("发送主题:{},消息:{}", sendTopic, sendOrder);
iMqttGatewayService.publish(sendTopic, sendOrder, 1); iMqttGatewayService.publish(sendTopic, sendOrder, 0);
// 判断当前cpmId是否是 11:固定是 启用写入时间戳 // 判断当前cpmId是否是 11:固定是 启用写入时间戳
if (serialPortModel.getCpmId().equals("11")) { if (serialPortModel.getCpmId().equals("11")) {
// 是的话,重新写入,启用时间写入值变成 0 // 是的话,重新写入,启用时间写入值变成 0
@ -89,7 +89,7 @@ public class DeviceOperateController {
// 获取mqtt操作队列(后期通过mqtt队列配置发送主题) // 获取mqtt操作队列(后期通过mqtt队列配置发送主题)
sendTopic = name + "/" + controlTopic + "/" + sn; sendTopic = name + "/" + controlTopic + "/" + sn;
log.info("发送主题:{},消息:{}", sendTopic, sendOrder); log.info("发送主题:{},消息:{}", sendTopic, sendOrder);
iMqttGatewayService.publish(sendTopic, sendOrder, 1); iMqttGatewayService.publish(sendTopic, sendOrder, 0);
} }
} }
} else { } else {

8
user-service/src/main/java/com/mh/user/dto/HotWaterBackPumpControlVO.java

@ -126,6 +126,14 @@ public class HotWaterBackPumpControlVO {
private int twoPumpStart; private int twoPumpStart;
private String twoPumpStartId; private String twoPumpStartId;
// 温度设置上限
private BigDecimal tempSetUpperLimit;
private String tempSetUpperLimitId;
// 温度设置下限
private BigDecimal tempSetLowerLimit;
private String tempSetLowerLimitId;
@Override @Override
public String toString() { public String toString() {
return new StringJoiner(", ", HotWaterBackPumpControlVO.class.getSimpleName() + "[", "]") return new StringJoiner(", ", HotWaterBackPumpControlVO.class.getSimpleName() + "[", "]")

19
user-service/src/main/java/com/mh/user/dto/HotWaterDeviceControlVO.java

@ -35,6 +35,25 @@ public class HotWaterDeviceControlVO {
private Date currentTime; private Date currentTime;
private String currentTimeId; private String currentTimeId;
/**
* 通讯失败
*/
private int communicationFailure;
private String communicationFailureId;
/**
* 通讯失败次数
* @return
*/
private int communicationFailureCount;
private String communicationFailureCountId;
/**
* modbus 复位
*/
private int reset;
private String resetId;
@Override @Override
public String toString() { public String toString() {
return new StringJoiner(", ", HotWaterDeviceControlVO.class.getSimpleName() + "[", "]") return new StringJoiner(", ", HotWaterDeviceControlVO.class.getSimpleName() + "[", "]")

19
user-service/src/main/java/com/mh/user/dto/HotWaterHotPumpControlVO.java

@ -220,6 +220,25 @@ public class HotWaterHotPumpControlVO { // 去掉pump后的类名
// 热泵_故障 -> 去掉pump前缀 // 热泵_故障 -> 去掉pump前缀
private int fault; private int fault;
/**
* 通讯失败
*/
private int communicationFailure;
private String communicationFailureId;
/**
* 通讯失败次数
* @return
*/
private int communicationFailureCount;
private String communicationFailureCountId;
/**
* modbus 复位
*/
private int reset;
private String resetId;
@Override @Override
public String toString() { public String toString() {
return new StringJoiner(", ", HotWaterHotPumpControlVO.class.getSimpleName() + "[", "]") // 更新类名引用 return new StringJoiner(", ", HotWaterHotPumpControlVO.class.getSimpleName() + "[", "]") // 更新类名引用

6
user-service/src/main/java/com/mh/user/dto/HotWaterSystemControlVO.java

@ -99,6 +99,12 @@ public class HotWaterSystemControlVO {
private int orderNum; private int orderNum;
/**
* modbus 复位
*/
private int reset;
private String resetId;
@Override @Override
public String toString() { public String toString() {
return new StringJoiner(", ", HotWaterSystemControlVO.class.getSimpleName() + "[", "]") return new StringJoiner(", ", HotWaterSystemControlVO.class.getSimpleName() + "[", "]")

13
user-service/src/main/java/com/mh/user/job/GetWeatherInfoJob.java

@ -47,7 +47,7 @@ public class GetWeatherInfoJob {
// 从系统参数中获取对应的项目区域 // 从系统参数中获取对应的项目区域
SysParamEntity sysParam = sysParamService.selectSysParam(); SysParamEntity sysParam = sysParamService.selectSysParam();
if (null != sysParam) { if (null != sysParam) {
String url = "https://restapi.amap.com/v3/weather/weatherInfo?extensions=all&key="+amapKey+"&city="+sysParam.getProArea(); String url = "https://restapi.amap.com/v3/weather/weatherInfo?extensions=all&key=" + amapKey + "&city=" + sysParam.getProArea();
String returnResult = restTemplate.getForObject(url, String.class); String returnResult = restTemplate.getForObject(url, String.class);
if (!StringUtils.isBlank(returnResult)) { if (!StringUtils.isBlank(returnResult)) {
JSONObject jsonObject = JSON.parseObject(returnResult); JSONObject jsonObject = JSON.parseObject(returnResult);
@ -61,4 +61,15 @@ public class GetWeatherInfoJob {
} }
} }
} }
/**
* 定时清除collectionParam
*/
@Scheduled(cron = "0 0 0 1/1 * ?")
public void deleteCache() {
Object wetTemp = caffeineCache.getIfPresent("collectionParams");
if (wetTemp != null) {
caffeineCache.invalidate("collectionParams");
}
}
} }

29
user-service/src/main/java/com/mh/user/mapper/CollectionParamsManageMapper.java

@ -297,4 +297,33 @@ public interface CollectionParamsManageMapper extends BaseMapper<CollectionParam
@Result(column = "quality", property = "quality") @Result(column = "quality", property = "quality")
}) })
CollectionParamsManageEntity selectByDeviceIdAndParamTypeId(Long deviceInstallId, String paramTypeId); CollectionParamsManageEntity selectByDeviceIdAndParamTypeId(Long deviceInstallId, String paramTypeId);
@Select("select * from collection_params_manage order by create_time desc")
@Results({
@Result(column = "id", property = "id"),
@Result(column = "create_time", property = "createTime"),
@Result(column = "update_time", property = "updateTime"),
@Result(column = "device_install_id", property = "deviceInstallId"),
@Result(column = "register_addr", property = "registerAddr"),
@Result(column = "func_code", property = "funcCode"),
@Result(column = "mt_ratio", property = "mtRatio"),
@Result(column = "mt_init_value", property = "mtInitValue"),
@Result(column = "digits", property = "digits"),
@Result(column = "data_type", property = "dataType"),
@Result(column = "cur_value", property = "curValue"),
@Result(column = "cur_time", property = "curTime"),
@Result(column = "mt_is_sum", property = "mtIsSum"),
@Result(column = "unit", property = "unit"),
@Result(column = "order_num", property = "orderNum"),
@Result(column = "remark", property = "remark"),
@Result(column = "register_size", property = "registerSize"),
@Result(column = "is_use", property = "isUse"),
@Result(column = "other_name", property = "otherName"),
@Result(column = "grade", property = "grade"),
@Result(column = "param_type_id", property = "paramTypeId"),
@Result(column = "collection_type", property = "collectionType"),
@Result(column = "quality", property = "quality"),
@Result(column = "building_id", property = "buildingId")
})
List<CollectionParamsManageEntity> selectAllCPMList();
} }

2
user-service/src/main/java/com/mh/user/service/CollectionParamsManageService.java

@ -39,4 +39,6 @@ public interface CollectionParamsManageService {
List<HotWaterControlDTO> operateList(String floorId); List<HotWaterControlDTO> operateList(String floorId);
void getBatchUpdateCollectionParams(List<SanShiFengDatas> batch, String sn, String plcName, String projectName, String time, String buildingId); void getBatchUpdateCollectionParams(List<SanShiFengDatas> batch, String sn, String plcName, String projectName, String time, String buildingId);
List<CollectionParamsManageEntity> selectAllCPMList();
} }

50
user-service/src/main/java/com/mh/user/service/impl/CollectionParamsManageServiceImpl.java

@ -155,6 +155,11 @@ public class CollectionParamsManageServiceImpl implements CollectionParamsManage
// 时间 // 时间
handleSystemTimeParameters(vo, item); handleSystemTimeParameters(vo, item);
break; break;
case "28":
// modbus 重置复位
vo.setReset(item.getCurValue().intValue());
vo.setResetId(item.getCpmId());
break;
default: default:
break; break;
} }
@ -315,6 +320,21 @@ public class CollectionParamsManageServiceImpl implements CollectionParamsManage
hotPumpVo.setManualAutoSwitchId(item.getCpmId()); hotPumpVo.setManualAutoSwitchId(item.getCpmId());
} }
break; break;
case "28":
// 通讯失败复位
hotPumpVo.setReset(item.getCurValue().intValue());
hotPumpVo.setResetId(item.getCpmId());
break;
case "29":
// 通讯失败
hotPumpVo.setCommunicationFailure(item.getCurValue().intValue());
hotPumpVo.setCommunicationFailureId(item.getCpmId());
break;
case "30":
// 通讯失败次数
hotPumpVo.setCommunicationFailureCount(item.getCurValue().intValue());
hotPumpVo.setCommunicationFailureCountId(item.getCpmId());
break;
default: default:
break; break;
} }
@ -587,6 +607,16 @@ public class CollectionParamsManageServiceImpl implements CollectionParamsManage
backPumpVo.setTwoPumpStart(item.getCurValue().intValue()); backPumpVo.setTwoPumpStart(item.getCurValue().intValue());
backPumpVo.setTwoPumpStartId(item.getCpmId()); backPumpVo.setTwoPumpStartId(item.getCpmId());
break; break;
case "26":
// 温度上限
backPumpVo.setTempSetUpperLimit(item.getCurValue());
backPumpVo.setTempSetUpperLimitId(item.getCpmId());
break;
case "27":
// 温度下限
backPumpVo.setTempSetLowerLimit(item.getCurValue());
backPumpVo.setTempSetLowerLimitId(item.getCpmId());
break;
default: default:
break; break;
} }
@ -739,6 +769,21 @@ public class CollectionParamsManageServiceImpl implements CollectionParamsManage
meterVo.setCurrentTime(item.getCurTime()); meterVo.setCurrentTime(item.getCurTime());
meterVo.setCurrentTimeId(item.getCpmId()); meterVo.setCurrentTimeId(item.getCpmId());
break; break;
case "29":
// 水电表通讯故障
meterVo.setCommunicationFailure(item.getCurValue().intValue());
meterVo.setCommunicationFailureId(item.getCpmId());
break;
case "30":
// 水电表通讯故障计数
meterVo.setCommunicationFailureCount(item.getCurValue().intValue());
meterVo.setCommunicationFailureCountId(item.getCpmId());
break;
case "28":
// 水电表通信故障复位
meterVo.setReset(item.getCurValue().intValue());
meterVo.setResetId(item.getCpmId());
break;
default: default:
break; break;
} }
@ -834,4 +879,9 @@ public class CollectionParamsManageServiceImpl implements CollectionParamsManage
} }
return 0; return 0;
} }
@Override
public List<CollectionParamsManageEntity> selectAllCPMList() {
return collectionParamsManageMapper.selectAllCPMList();
}
} }

350
user-service/src/main/java/com/mh/user/service/impl/DataResultServiceImpl.java

@ -7,12 +7,15 @@ import com.mh.user.mapper.DeviceInstallMapper;
import com.mh.user.service.DataResultService; import com.mh.user.service.DataResultService;
import com.mh.user.service.DeviceInstallService; import com.mh.user.service.DeviceInstallService;
import com.mh.user.utils.ExchangeStringUtil; import com.mh.user.utils.ExchangeStringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.text.DecimalFormat; import java.text.DecimalFormat;
import java.text.ParseException; import java.time.LocalDateTime;
import java.text.SimpleDateFormat; import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Calendar; import java.util.Calendar;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
@ -22,6 +25,13 @@ import static java.util.Calendar.*;
@Service @Service
public class DataResultServiceImpl implements DataResultService { public class DataResultServiceImpl implements DataResultService {
private static final Logger logger = LoggerFactory.getLogger(DataResultServiceImpl.class);
private static final DecimalFormat DECIMAL_FORMAT = new DecimalFormat("0.000");
// 使用线程安全的DateTimeFormatter替代SimpleDateFormat
private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
@Autowired @Autowired
DataResultMapper dataResultMapper; DataResultMapper dataResultMapper;
@ -31,151 +41,237 @@ public class DataResultServiceImpl implements DataResultService {
@Autowired @Autowired
DeviceInstallService deviceInstallService; DeviceInstallService deviceInstallService;
private static final DecimalFormat DECIMAL_FORMAT = new DecimalFormat("0.000"); private static final double ELEC_MAX_INCREMENT = 2000;
private static final double OTHER_MAX_INCREMENT = 300;
private static final SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); private static final double ELEC_DAY_VALUE = 1000;
private static final double OTHER_DAY_VALUE = 100;
@Override @Override
public void saveDataResult(DataResultEntity dataResultEntity) { public void saveDataResult(DataResultEntity dataResultEntity) {
if (dataResultEntity == null) {
logger.warn("saveDataResult: dataResultEntity is null");
return;
}
double lastValue=0; try {
double calcValue=0; Date currentDate = new Date();
double initValue=0; if (dataResultEntity.getCurDate() == null) {
double curValue=0; dataResultEntity.setCurDate(currentDate);
double dayValue=0;
double newCurValue = 0;
int days=0;
try{
Date date=new Date(); //获取系统日期
if (dataResultEntity.getCurDate()==null){
dataResultEntity.setCurDate(date);
} }
// 格式化时间,按五分钟保存一次 // 格式化时间,按五分钟保存一次
Date formattedDate = formatToFiveMinuteInterval(dataResultEntity.getCurDate()); Date formattedDate = formatToFiveMinuteInterval(dataResultEntity.getCurDate());
dataResultEntity.setCurDate(formattedDate); dataResultEntity.setCurDate(formattedDate);
//从安装表获取设备信息 // 从安装表获取设备信息
DeviceInstallEntity deviceInstallEntity=deviceInstallMapper.selectDevice( DeviceInstallEntity deviceInstallEntity = deviceInstallMapper.selectDevice(
dataResultEntity.getDeviceAddr(), dataResultEntity.getDeviceAddr(),
dataResultEntity.getDeviceType(), dataResultEntity.getDeviceType(),
dataResultEntity.getBuildingId()); dataResultEntity.getBuildingId());
double ratio=deviceInstallEntity.getRatio(); //倍率
initValue=deviceInstallEntity.getInitValue(); if (deviceInstallEntity == null) {
dayValue=deviceInstallEntity.getDayValue(); logger.warn("saveDataResult: deviceInstallEntity is null for addr={}, type={}, building={}",
newCurValue = deviceInstallEntity.getLastValue(); dataResultEntity.getDeviceAddr(),
if(dataResultEntity.getDeviceType().equals("电表")){ dataResultEntity.getDeviceType(),
if (dataResultEntity.getCurValue() - newCurValue > 2000) { dataResultEntity.getBuildingId());
return; return;
} }
dayValue=1000;
}else{ double ratio = deviceInstallEntity.getRatio();
if (dataResultEntity.getCurValue() - newCurValue > 300) { double initValue = deviceInstallEntity.getInitValue();
return; double dayValue = deviceInstallEntity.getDayValue();
} double lastValueFromInstall = deviceInstallEntity.getLastValue();
dayValue=100;
// 判断设备类型并设置阈值
boolean isElectricMeter = "电表".equals(dataResultEntity.getDeviceType());
double maxIncrement = isElectricMeter ? ELEC_MAX_INCREMENT : OTHER_MAX_INCREMENT;
dayValue = isElectricMeter ? ELEC_DAY_VALUE : OTHER_DAY_VALUE;
// 检查增量是否异常
if (dataResultEntity.getCurValue() - lastValueFromInstall > maxIncrement) {
logger.info("saveDataResult: increment too large, skip. curValue={}, lastValue={}, deviceAddr={}",
dataResultEntity.getCurValue(), lastValueFromInstall, dataResultEntity.getDeviceAddr());
return;
} }
int r = dataResultMapper.selectDataResultCount(
sdf1.format(dataResultEntity.getCurDate()), String curDateStr = formatDateToString(formattedDate);
int recordCount = dataResultMapper.selectDataResultCount(
curDateStr,
dataResultEntity.getDeviceAddr(), dataResultEntity.getDeviceAddr(),
dataResultEntity.getDeviceType(), dataResultEntity.getDeviceType(),
dataResultEntity.getBuildingId()); dataResultEntity.getBuildingId());
if (r==0){//插入记录
// 获取上一个抄表记录curValue,curDate
// dataResultEntity.getCurDate()减去5分钟,
Calendar calendar = Calendar.getInstance();
calendar.setTime(dataResultEntity.getCurDate());
calendar.add(Calendar.MINUTE, -5);
DataResultEntity lastData = dataResultMapper.selectDataResult(sdf1.format(calendar.getTime()),
dataResultEntity.getDeviceAddr(),
dataResultEntity.getDeviceType(),
dataResultEntity.getBuildingId());
DataResultEntity data=new DataResultEntity();
data.setDeviceAddr(dataResultEntity.getDeviceAddr()); //通讯地址
data.setDeviceType(dataResultEntity.getDeviceType()); //设备类型
String curDate=sdf1.format(dataResultEntity.getCurDate());
data.setCurDate(sdf1.parse(curDate)); //当前日期
data.setCurValue(dataResultEntity.getCurValue()); //当前读数
curValue=dataResultEntity.getCurValue();
data.setBuildingId(deviceInstallEntity.getBuildingId()); //楼栋编号
data.setRatio(ratio); //倍率
if (lastData!=null){
data.setLastDate(lastData.getCurDate()); //上次抄表日期
data.setLastValue(lastData.getCurValue()); //上次读数
} else {
lastValue = deviceInstallEntity.getLastValue(); //上次读数
if (deviceInstallEntity.getLastDate()!=null){ //上次抄表日期
String lastDate=sdf1.format(deviceInstallEntity.getLastDate());
data.setLastDate(sdf1.parse(lastDate));
}else{
String lastDate=sdf1.format(date);
data.setLastDate(sdf1.parse(lastDate));
}
data.setLastValue(lastValue);
}
calcValue=(dataResultEntity.getCurValue()-lastValue)*ratio; //计算用量
// 使用时
calcValue = Double.parseDouble(DECIMAL_FORMAT.format(calcValue));
data.setCalcValue(calcValue); //用量 double calcValue;
boolean shouldSave = false;
if (recordCount == 0) {
// 插入新记录
DataResultEntity newData = createNewDataResult(dataResultEntity, deviceInstallEntity, formattedDate, ratio);
calcValue = newData.getCalcValue();
//判断读数,并保存数据 // 计算相差天数
Date lastDate = newData.getLastDate();
if (deviceInstallEntity.getLastDate() == null) { if (deviceInstallEntity.getLastDate() == null) {
deviceInstallEntity.setLastDate(date); lastDate = currentDate;
}
days=(int)ExchangeStringUtil.daysBetween(dataResultEntity.getCurDate(),deviceInstallEntity.getLastDate()); //计算相差天数
if (calcValue>=0 && calcValue<=dayValue){
dataResultMapper.saveDataResult(data); //插入新的记录
}else if(calcValue>dayValue){
if(days>0){
if(calcValue/days<=dayValue){
dataResultMapper.saveDataResult(data);
}
}
} }
int days = (int) ExchangeStringUtil.daysBetween(formattedDate, lastDate);
shouldSave = validateAndSaveData(newData, calcValue, dayValue, days, true);
}else {//修改记录的curValue、calcValue // if (shouldSave) {
DataResultEntity data2=dataResultMapper.selectDataResult(sdf1.format(dataResultEntity.getCurDate()), // lastValue = newData.getLastValue();
// }
} else {
// 修改现有记录
DataResultEntity existingData = dataResultMapper.selectDataResult(
curDateStr,
dataResultEntity.getDeviceAddr(), dataResultEntity.getDeviceAddr(),
dataResultEntity.getDeviceType(), dataResultEntity.getDeviceType(),
dataResultEntity.getBuildingId()); dataResultEntity.getBuildingId());
lastValue=data2.getLastValue(); //安装基表上次读数
calcValue=dataResultEntity.getCurValue()-lastValue; //计算用量 if (existingData != null) {
String curDate=sdf1.format(dataResultEntity.getCurDate()); double lastValue = existingData.getLastValue();
data2.setCurDate(sdf1.parse(curDate));//当前日期 calcValue = (dataResultEntity.getCurValue() - lastValue) * ratio;
data2.setCurValue(dataResultEntity.getCurValue());//当前读数 calcValue = formatDouble(calcValue);
data2.setCalcValue(calcValue);//用量
existingData.setCurDate(formattedDate);
//判断读数,并保存数据 existingData.setCurValue(dataResultEntity.getCurValue());
days=(int)ExchangeStringUtil.daysBetween(dataResultEntity.getCurDate(),data2.getLastDate()); //计算相差天数 existingData.setCalcValue(calcValue);
if (calcValue>=0 && calcValue<=dayValue){
dataResultMapper.updateDataResult(data2); //更新记录 Date lastDate = existingData.getLastDate();
}else if(calcValue>dayValue){ int days = (int) ExchangeStringUtil.daysBetween(formattedDate, lastDate);
if(days>0){
if(calcValue/days<=dayValue){ shouldSave = validateAndSaveData(existingData, calcValue, dayValue, days, false);
dataResultMapper.saveDataResult(data2);
}
}
} }
} }
//修改安装表中lastValue,lastDate,ini_value的值 // 更新安装表中的lastValue和lastDate
if (calcValue>=0 && calcValue<=dayValue){ if (shouldSave) {
deviceInstallMapper.updateLastValue(deviceInstallEntity.getId(),String.valueOf(dataResultEntity.getCurValue()),date); deviceInstallMapper.updateLastValue(
}else if(calcValue>dayValue){ deviceInstallEntity.getId(),
if(days>0){ String.valueOf(dataResultEntity.getCurValue()),
if(calcValue/days<=dayValue){ currentDate);
deviceInstallMapper.updateLastValue(deviceInstallEntity.getId(),String.valueOf(dataResultEntity.getCurValue()),date);
}
}
} }
if(initValue==0){//第一次采集的时候
deviceInstallMapper.updateLastValue(deviceInstallEntity.getId(),String.valueOf(dataResultEntity.getCurValue()),date); // 第一次采集时初始化initValue
deviceInstallMapper.updateInitValue(dataResultEntity.getDeviceAddr(), if (initValue == 0) {
deviceInstallMapper.updateLastValue(
deviceInstallEntity.getId(),
String.valueOf(dataResultEntity.getCurValue()),
currentDate);
deviceInstallMapper.updateInitValue(
dataResultEntity.getDeviceAddr(),
dataResultEntity.getDeviceType(), dataResultEntity.getDeviceType(),
dataResultEntity.getBuildingId(),String.valueOf(dataResultEntity.getCurValue()), deviceInstallEntity.getId()); dataResultEntity.getBuildingId(),
String.valueOf(dataResultEntity.getCurValue()),
deviceInstallEntity.getId());
}
} catch (Exception e) {
logger.error("saveDataResult error for deviceAddr={}, deviceType={}",
dataResultEntity.getDeviceAddr(), dataResultEntity.getDeviceType(), e);
}
}
/**
* 创建新的数据结果实体
*/
private DataResultEntity createNewDataResult(DataResultEntity dataResultEntity,
DeviceInstallEntity deviceInstallEntity,
Date curDate,
double ratio) throws Exception {
DataResultEntity data = new DataResultEntity();
data.setDeviceAddr(dataResultEntity.getDeviceAddr());
data.setDeviceType(dataResultEntity.getDeviceType());
data.setCurDate(curDate);
data.setCurValue(dataResultEntity.getCurValue());
data.setBuildingId(deviceInstallEntity.getBuildingId());
data.setRatio(ratio);
// 获取上一个抄表记录
Calendar calendar = Calendar.getInstance();
calendar.setTime(curDate);
calendar.add(MINUTE, -5);
String lastDateStr = formatDateToString(calendar.getTime());
DataResultEntity lastData = dataResultMapper.selectDataResult(
lastDateStr,
dataResultEntity.getDeviceAddr(),
dataResultEntity.getDeviceType(),
dataResultEntity.getBuildingId());
double lastValue;
if (lastData != null) {
data.setLastDate(lastData.getCurDate());
data.setLastValue(lastData.getCurValue());
lastValue = lastData.getCurValue();
} else {
lastValue = deviceInstallEntity.getLastValue();
Date lastDate = deviceInstallEntity.getLastDate();
if (lastDate == null) {
lastDate = new Date();
}
data.setLastDate(lastDate);
data.setLastValue(lastValue);
}
double calcValue = (dataResultEntity.getCurValue() - lastValue) * ratio;
calcValue = formatDouble(calcValue);
data.setCalcValue(calcValue);
return data;
}
/**
* 验证数据并保存
* @return 是否保存成功
*/
private boolean validateAndSaveData(DataResultEntity dataResultEntity,
double calcValue,
double dayValue,
int days,
boolean isNew) {
if (calcValue < 0) {
logger.warn("calcValue is negative: {}", calcValue);
return false;
}
if (calcValue <= dayValue) {
if (isNew) {
dataResultMapper.saveDataResult(dataResultEntity);
} else {
dataResultMapper.updateDataResult(dataResultEntity);
} }
}catch (Exception e){ return true;
//e.printStackTrace(); } else if (days > 0 && calcValue / days <= dayValue) {
dataResultMapper.saveDataResult(dataResultEntity);
return true;
}
logger.warn("calcValue exceeds dayValue: calcValue={}, dayValue={}, days={}", calcValue, dayValue, days);
return false;
}
/**
* 格式化日期为字符串
*/
private String formatDateToString(Date date) {
if (date == null) {
return null;
}
LocalDateTime localDateTime = date.toInstant().atZone(ZoneId.systemDefault()).toLocalDateTime();
return localDateTime.format(DATE_TIME_FORMATTER);
}
/**
* 格式化双精度数值
*/
private double formatDouble(double value) {
try {
return Double.parseDouble(DECIMAL_FORMAT.format(value));
} catch (NumberFormatException e) {
logger.error("Failed to format double value: {}", value, e);
return value;
} }
} }
@ -190,25 +286,15 @@ public class DataResultServiceImpl implements DataResultService {
return new Date(); return new Date();
} }
Calendar calendar = getInstance(); Calendar calendar = Calendar.getInstance();
calendar.setTime(originalDate); calendar.setTime(originalDate);
int minutes = calendar.get(MINUTE); int minutes = calendar.get(MINUTE);
int seconds = calendar.get(SECOND);
int milliseconds = calendar.get(MILLISECOND);
// 计算下一个五分钟节点
int nextFiveMinute = ((minutes / 5) + 1) * 5;
// 如果超过55分钟,则进位到下一小时 // 向下取整到最近的五分钟节点
if (nextFiveMinute >= 60) { int roundedMinutes = (minutes / 5) * 5;
calendar.add(HOUR_OF_DAY, 1);
calendar.set(MINUTE, 0);
} else {
calendar.set(MINUTE, nextFiveMinute);
}
// 秒和毫秒设为0 calendar.set(MINUTE, roundedMinutes);
calendar.set(SECOND, 0); calendar.set(SECOND, 0);
calendar.set(MILLISECOND, 0); calendar.set(MILLISECOND, 0);
@ -221,7 +307,7 @@ public class DataResultServiceImpl implements DataResultService {
} }
@Override @Override
public List<DataResultEntity> queryDataResult(String buildingId, String startDate, String endDate,String deviceType, int page, int limit) { public List<DataResultEntity> queryDataResult(String buildingId, String startDate, String endDate, String deviceType, int page, int limit) {
return dataResultMapper.queryDataResult(buildingId,startDate,endDate,deviceType,page,limit); return dataResultMapper.queryDataResult(buildingId,startDate,endDate,deviceType,page,limit);
} }

612
user-service/src/main/java/com/mh/user/service/mqtt/service/impl/EventsServiceImpl.java

@ -19,23 +19,27 @@ import com.mh.user.service.GatewayManageService;
import com.mh.user.service.mqtt.service.IEventsService; import com.mh.user.service.mqtt.service.IEventsService;
import com.mh.user.strategy.DeviceStrategy; import com.mh.user.strategy.DeviceStrategy;
import com.mh.user.strategy.DeviceStrategyFactory; import com.mh.user.strategy.DeviceStrategyFactory;
import com.mh.user.utils.CacheUtil;
import com.mh.user.utils.DateUtil; import com.mh.user.utils.DateUtil;
import com.mh.user.utils.SpringBeanUtil;
import io.netty.util.CharsetUtil; import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.ApplicationContext;
import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.MessageHeaders;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.IOException; import java.io.IOException;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/** /**
* @author LJF * @author LJF
@ -62,8 +66,129 @@ public class EventsServiceImpl implements IEventsService {
private DeviceInstallService deviceInstallService; private DeviceInstallService deviceInstallService;
@Autowired @Autowired
@Qualifier("caffeineCache")
private Cache caffeineCache; private Cache caffeineCache;
// 常量定义
private static final int BATCH_SIZE = 100;
private static final long TIME_INTERVAL_THRESHOLD_MS = 150000; // 150秒
// 线程池配置
private static final int CORE_POOL_SIZE = 3;
private static final int MAX_POOL_SIZE = 5;
private static final int QUEUE_CAPACITY = 100;
private static final long KEEP_ALIVE_TIME = 30L;
// 缓存配置
private static final int TIME_CACHE_MAX_SIZE = 1000;
private static final int TIME_CACHE_EXPIRE_MINUTES = 30;
private static final int DEVICE_CACHE_MAX_SIZE = 500;
private static final int DEVICE_CACHE_EXPIRE_MINUTES = 60;
// 线程池
private ExecutorService deviceAnalysisExecutor;
// 本地缓存,使用带过期时间的缓存
private final ConcurrentHashMap<String, Long> timeCache = new ConcurrentHashMap<>();
// 设备缓存,避免频繁查询数据库
private final ConcurrentHashMap<Long, CachedDeviceInfo> deviceCache = new ConcurrentHashMap<>();
// Device和Strategy实例缓存
private final ConcurrentHashMap<String, Device> deviceInstanceCache = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, DeviceStrategy> strategyInstanceCache = new ConcurrentHashMap<>();
// 缓存的设备信息
private static class CachedDeviceInfo {
DeviceInstallEntity device;
long timestamp;
CachedDeviceInfo(DeviceInstallEntity device) {
this.device = device;
this.timestamp = System.currentTimeMillis();
}
boolean isExpired() {
return System.currentTimeMillis() - timestamp > TimeUnit.MINUTES.toMillis(DEVICE_CACHE_EXPIRE_MINUTES);
}
}
@PostConstruct
public void init() {
// 初始化线程池
deviceAnalysisExecutor = new ThreadPoolExecutor(
CORE_POOL_SIZE,
MAX_POOL_SIZE,
KEEP_ALIVE_TIME,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(QUEUE_CAPACITY),
new ThreadFactory() {
private final AtomicInteger threadNumber = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "device-analysis-thread-" + threadNumber.getAndIncrement());
}
},
new ThreadPoolExecutor.CallerRunsPolicy() // 改为CallerRuns,防止任务丢失
);
// 启动缓存清理定时任务
ScheduledExecutorService cleanupExecutor = Executors.newSingleThreadScheduledExecutor(r ->
new Thread(r, "cache-cleanup-thread"));
cleanupExecutor.scheduleAtFixedRate(this::cleanupCaches, 5, 5, TimeUnit.MINUTES);
log.info("EventsServiceImpl initialized, thread pool: core={}, max={}, queue={}",
CORE_POOL_SIZE, MAX_POOL_SIZE, QUEUE_CAPACITY);
}
@PreDestroy
public void destroy() {
if (deviceAnalysisExecutor != null && !deviceAnalysisExecutor.isShutdown()) {
deviceAnalysisExecutor.shutdown();
try {
if (!deviceAnalysisExecutor.awaitTermination(30, TimeUnit.SECONDS)) {
deviceAnalysisExecutor.shutdownNow();
}
} catch (InterruptedException e) {
deviceAnalysisExecutor.shutdownNow();
Thread.currentThread().interrupt();
}
log.info("EventsServiceImpl destroyed, thread pool shutdown");
}
clearAllCaches();
}
/**
* 清理过期缓存
*/
private void cleanupCaches() {
try {
long now = System.currentTimeMillis();
long expireTime = TimeUnit.MINUTES.toMillis(TIME_CACHE_EXPIRE_MINUTES);
// 清理timeCache过期条目
timeCache.entrySet().removeIf(entry -> now - entry.getValue() > expireTime);
// 清理deviceCache过期条目
deviceCache.entrySet().removeIf(entry -> entry.getValue().isExpired());
log.debug("缓存清理完成, timeCache.size={}, deviceCache.size={}",
timeCache.size(), deviceCache.size());
} catch (Exception e) {
log.error("清理缓存失败", e);
}
}
/**
* 清空所有缓存
*/
private void clearAllCaches() {
timeCache.clear();
deviceCache.clear();
deviceInstanceCache.clear();
strategyInstanceCache.clear();
}
@ServiceActivator(inputChannel = ChannelName.EVENTS_UPLOAD_INBOUND) @ServiceActivator(inputChannel = ChannelName.EVENTS_UPLOAD_INBOUND)
@Override @Override
public void handleInboundUpload(byte[] receiver, MessageHeaders headers) { public void handleInboundUpload(byte[] receiver, MessageHeaders headers) {
@ -95,143 +220,431 @@ public class EventsServiceImpl implements IEventsService {
log.info("接收到控制指令下发=>{}", sendStr); log.info("接收到控制指令下发=>{}", sendStr);
} }
private void handleInboundData(byte[] receiver,String topic, String logMessage) { private void handleInboundData(byte[] receiver, String topic, String logMessage) {
try { try {
// 使用 TypeReference 确保泛型信息被保留 // 使用 TypeReference 确保泛型信息被保留
SanShiFengReceiver<SanShiFengDatas> datas = mapper.readValue(receiver, SanShiFengReceiver<SanShiFengDatas> datas = mapper.readValue(receiver,
new TypeReference<SanShiFengReceiver<SanShiFengDatas>>() {}); new TypeReference<SanShiFengReceiver<SanShiFengDatas>>() {});
log.info("主题:{},类型:{}: ,数据:{}", topic, logMessage, datas.toString()); // log.info("主题:{},类型:{}: ,数据:{}", topic, logMessage, datas.toString());
// 开始遍历 数据 // 开始遍历 数据
String sn = datas.getSn(); String sn = datas.getSn();
String plcName = datas.getPlcName(); String plcName = datas.getPlcName();
String projectName = datas.getProjectName(); String projectName = datas.getProjectName();
String time = datas.getTime(); String time = datas.getTime();
// 更新网关设备在线状态 // 更新网关设备在线状态
gatewayManageService.updateGatewayManageOnlineBySn(sn, 0); gatewayManageService.updateGatewayManageOnlineBySn(sn, 0);
// 获取网关对应的buildingId // 获取网关对应的buildingId
String buildingId = gatewayManageService.queryBuildingIdBySn(sn); String buildingId = gatewayManageService.queryBuildingIdBySn(sn);
if (StringUtils.isBlank(buildingId)) { if (StringUtils.isBlank(buildingId)) {
log.error("未找到对应的buildingId"); log.error("未找到对应的buildingId, SN: {}", sn);
return; return;
} }
// 修复类型转换问题
// 获取数据列表
List<SanShiFengDatas> rawDataList = datas.getDatas(); List<SanShiFengDatas> rawDataList = datas.getDatas();
if (rawDataList == null || rawDataList.isEmpty()) { if (rawDataList == null || rawDataList.isEmpty()) {
log.warn("数据列表为空,SN: {}", sn); log.warn("数据列表为空,SN: {}", sn);
return; return;
} }
// rawDataList进行批量更新,100条数据进行批量处理
int batchSize = 100; // 批量更新collectionParams
for (int i = 0; i < rawDataList.size(); i += batchSize) { processBatchUpdate(rawDataList, sn, plcName, projectName, time, buildingId);
int endIndex = Math.min(i + batchSize, rawDataList.size());
// 检查时间间隔并处理数据
if (shouldProcessData(sn, time)) {
processDataList(rawDataList, sn, plcName, projectName, time, buildingId);
}
} catch (IOException e) {
log.error("处理数据时发生错误: ", e);
}
}
/**
* 批量更新collectionParams
*/
private void processBatchUpdate(List<SanShiFengDatas> rawDataList,
String sn,
String plcName,
String projectName,
String time,
String buildingId) {
try {
int size = rawDataList.size();
for (int i = 0; i < size; i += BATCH_SIZE) {
int endIndex = Math.min(i + BATCH_SIZE, size);
List<SanShiFengDatas> batch = rawDataList.subList(i, endIndex); List<SanShiFengDatas> batch = rawDataList.subList(i, endIndex);
collectionParamManageService.getBatchUpdateCollectionParams(batch, sn, plcName, projectName, time, buildingId); collectionParamManageService.getBatchUpdateCollectionParams(batch, sn, plcName, projectName, time, buildingId);
} }
} catch (Exception e) {
log.error("批量更新collectionParams失败: SN={}", sn, e);
}
}
/**
* 判断是否应该处理数据基于时间间隔
*/
private boolean shouldProcessData(String sn, String time) {
if (StringUtils.isBlank(sn) || StringUtils.isBlank(time)) {
return false;
}
String cacheKey = sn + "_time";
Long lastTimestamp = timeCache.get(cacheKey);
if (lastTimestamp == null) {
timeCache.put(cacheKey, DateUtil.getTimeStamp(time));
return false;
}
try {
long currentTimeStamp = DateUtil.getTimeStamp(time);
long timeDiff = Math.abs(currentTimeStamp - lastTimestamp);
if (timeDiff >= TIME_INTERVAL_THRESHOLD_MS) {
timeCache.put(cacheKey, currentTimeStamp);
return true;
}
} catch (Exception e) {
log.error("计算时间间隔失败: SN={}, time={}", sn, time, e);
}
return false;
}
/**
* 处理数据列表修复CountDownLatch泄露问题
*/
private void processDataList(List<SanShiFengDatas> rawDataList,
String sn,
String plcName,
String projectName,
String time,
String buildingId) {
if (rawDataList == null || rawDataList.isEmpty()) {
return;
}
// 限制并发数,避免一次性创建过多任务
int size = rawDataList.size();
int maxConcurrent = Math.min(size, MAX_POOL_SIZE * 2);
CountDownLatch latch = new CountDownLatch(size);
AtomicInteger successCount = new AtomicInteger(0);
AtomicInteger failCount = new AtomicInteger(0);
// 使用信号量控制并发数
Semaphore semaphore = new Semaphore(maxConcurrent);
for (int i = 0; i < size; i++) {
final int index = i;
final SanShiFengDatas data = rawDataList.get(i);
// // 先批量更新collectionParam // 提交任务
// rawDataList.parallelStream().forEach(rawData -> { submitTaskWithFallback(() -> {
// try { try {
// processDataUpdateCpmItem(rawData, sn, plcName, projectName, time, buildingId); semaphore.acquire();
// } catch (Exception e) { try {
// log.error("处理单个数据项失败: {}", rawData, e); processDataItem(data, sn, plcName, projectName, time, buildingId);
// } successCount.incrementAndGet();
// }); } catch (Exception e) {
// 通过判断当前time跟上一个time相差30s才存储进入队列 log.error("处理数据项失败: index={}", index, e);
if (caffeineCache.getIfPresent(sn+"_time") != null) { failCount.incrementAndGet();
String lastTime = (String)caffeineCache.getIfPresent(sn+"_time"); }
// yyyy-MM-dd HH:mm:ss格式转为秒的时间戳 } catch (InterruptedException e) {
long lastTimeStamp = DateUtil.getTimeStamp(lastTime); Thread.currentThread().interrupt();
long currentTimeStamp = DateUtil.getTimeStamp(time); log.error("任务被中断: index={}", index, e);
// 判断时间间隔 } finally {
if (!StringUtils.isBlank(lastTime) && Math.abs(currentTimeStamp -lastTimeStamp) >= 60000) { semaphore.release();
// 并行处理数据列表,主线程不阻塞 latch.countDown(); // 确保countDown一定会执行
rawDataList.parallelStream().forEach(rawData -> {
try {
processDataItem(rawData, sn, plcName, projectName, time, buildingId);
} catch (Exception e) {
log.error("处理单个数据项失败: {}", rawData, e);
}
});
} }
} else { });
caffeineCache.put(sn+"_time", time); }
// 等待所有任务完成
try {
latch.await(30, TimeUnit.SECONDS);
if (latch.getCount() > 0) {
log.warn("部分任务未在指定时间内完成, remaining={}, success={}, fail={}",
latch.getCount(), successCount.get(), failCount.get());
} }
} catch (IOException e) { } catch (InterruptedException e) {
log.error("处理数据时发生错误: ", e); log.warn("等待数据处理完成被中断");
Thread.currentThread().interrupt();
}
}
/**
* 提交任务失败时直接在当前线程执行
*/
private void submitTaskWithFallback(Runnable task) {
try {
// 尝试提交到线程池
if (!deviceAnalysisExecutor.isShutdown()) {
deviceAnalysisExecutor.submit(task);
return;
}
} catch (RejectedExecutionException | IllegalStateException e) {
log.warn("任务提交失败,降级到当前线程执行");
} }
// 降级:在当前线程执行
task.run();
} }
private void processDataUpdateCpmItem(SanShiFengDatas data, String sn, String plcName, String projectName, String time, String buildingId) { private void processDataUpdateCpmItem(SanShiFengDatas data, String sn, String plcName, String projectName, String time, String buildingId) {
// 安全地转换对象
// SanShiFengDatas data = convertDataItem(rawData);
if (data == null) { if (data == null) {
log.warn("数据转换失败,跳过处理"); log.warn("数据为null,跳过处理");
return; return;
} }
// 不使用消息队列,查询属于哪种设备类型,然后通过策略进行数据解析
// log.info("设备SN:{},PLC名称:{},项目名称:{},时间:{},数据:{}", sn, plcName, projectName, time, data.toString());
// 获取点位参数名称
String name = data.getName(); String name = data.getName();
// 获取点位值 if (StringUtils.isBlank(name)) {
BigDecimal value; log.warn("点位名称为空,跳过处理");
return;
}
BigDecimal value = data.getValue();
if (value == null) {
value = BigDecimal.ZERO;
}
try { try {
value = data.getValue(); collectionParamManageService.updateCPMByOtherName(name, value, time, buildingId);
} catch (Exception e) { } catch (Exception e) {
value = BigDecimal.ZERO; log.error("更新collectionParamManage失败: name={}, value={}", name, value, e);
} }
// 直接更新collectionParamManage参数值
collectionParamManageService.updateCPMByOtherName(name, value, time, buildingId);
} }
private void processDataItem(Object rawData, String sn, String plcName, String projectName, String time, String buildingId) { private void processDataItem(SanShiFengDatas data, String sn, String plcName, String projectName, String time, String buildingId) {
// 安全地转换对象
SanShiFengDatas data = convertDataItem(rawData);
if (data == null) { if (data == null) {
log.warn("数据转换失败,跳过处理"); log.warn("数据为null,跳过处理");
return; return;
} }
// 不使用消息队列,查询属于哪种设备类型,然后通过策略进行数据解析
// log.info("设备SN:{},PLC名称:{},项目名称:{},时间:{},数据:{}", sn, plcName, projectName, time, data.toString()); // 获取点位参数名称和值
// 获取点位参数名称
String name = data.getName(); String name = data.getName();
// 获取点位值 if (StringUtils.isBlank(name)) {
BigDecimal value = new BigDecimal(0); log.warn("点位名称为空,跳过处理");
return;
}
BigDecimal value = data.getValue();
if (value == null) {
value = BigDecimal.ZERO;
}
try { try {
value = new BigDecimal(String.valueOf(data.getValue())); // 获取collectionParams缓存,使用带过期时间的本地缓存
List<CollectionParamsManageEntity> collectionParams = getCollectionParamsCache();
if (collectionParams == null || collectionParams.isEmpty()) {
log.debug("collectionParams缓存为空");
return;
}
// 查找匹配的参数实体
CollectionParamsManageEntity collectionParamsManageEntity = findMatchingCollectionParams(collectionParams, name, buildingId);
if (collectionParamsManageEntity == null) {
return;
}
// 检查参数类型,过滤不需要处理的类型
if (!shouldProcessParamType(collectionParamsManageEntity.getParamTypeId())) {
return;
}
// 查询设备信息并处理
processDeviceData(collectionParamsManageEntity, time, value);
} catch (Exception e) { } catch (Exception e) {
value = BigDecimal.ZERO; log.error("处理数据项失败: name={}, value={}", name, value, e);
}
}
/**
* 获取collectionParams缓存
*/
private List<CollectionParamsManageEntity> getCollectionParamsCache() {
try {
// 优先使用caffeine缓存
List<CollectionParamsManageEntity> cachedParams = (List<CollectionParamsManageEntity>) caffeineCache.getIfPresent("collectionParams");
if (cachedParams != null) {
return cachedParams;
}
// 从数据库加载
List<CollectionParamsManageEntity> collectionParams = collectionParamManageService.selectAllCPMList();
if (collectionParams != null) {
// 放入caffeine缓存
caffeineCache.put("collectionParams", collectionParams);
log.info("collectionParams已加载到缓存,共{}条记录", collectionParams.size());
}
return collectionParams;
} catch (Exception e) {
log.error("获取collectionParams缓存失败", e);
return null;
} }
// 直接更新collectionParamManage参数值 }
//collectionParamManageService.updateCPMByOtherName(name, value, time, buildingId);
// 查询device_install表,走之前的逻辑 /**
CollectionParamsManageEntity collectionParamsManageEntity = collectionParamManageService.selectDeviceInstallByOtherName(name, buildingId); * 查找匹配的collectionParams
if (null != collectionParamsManageEntity */
&& collectionParamsManageEntity.getDeviceInstallId() != null private CollectionParamsManageEntity findMatchingCollectionParams(List<CollectionParamsManageEntity> collectionParams,
&& collectionParamsManageEntity.getParamTypeId() != 0 String name,
&& collectionParamsManageEntity.getParamTypeId() != 4 String buildingId) {
&& collectionParamsManageEntity.getParamTypeId() != 15 if (collectionParams == null || StringUtils.isBlank(name) || StringUtils.isBlank(buildingId)) {
&& collectionParamsManageEntity.getParamTypeId() != 16 return null;
&& collectionParamsManageEntity.getParamTypeId() != 17 }
&& collectionParamsManageEntity.getParamTypeId() != 18
&& collectionParamsManageEntity.getParamTypeId() != 19 try {
&& collectionParamsManageEntity.getParamTypeId() != 21 for (CollectionParamsManageEntity val : collectionParams) {
&& collectionParamsManageEntity.getParamTypeId() != 22 if (val == null) {
&& collectionParamsManageEntity.getParamTypeId() != 23 continue;
&& collectionParamsManageEntity.getParamTypeId() != 24 }
&& collectionParamsManageEntity.getParamTypeId() != 3 // 通过运行状态点判断故障点
) { String otherName = val.getOtherName();
DeviceInstallEntity deviceInstallEntity = deviceInstallService.selectDeviceById(collectionParamsManageEntity.getDeviceInstallId()); if (otherName != null && otherName.trim().equals(name.trim())) {
if (deviceInstallEntity != null) { Object bidObj = val.getBuildingId();
// 开始走策略判断 if (bidObj != null && bidObj.toString().equals(buildingId)) {
String deviceType = deviceInstallEntity.getDeviceType(); return val;
Device device = DeviceFactory.createDevice(deviceType); }
DeviceStrategy strategy = DeviceStrategyFactory.createStrategy(deviceType);
if (strategy != null) {
device.setStrategy(strategy);
device.analysisMQTTReceiveData(time, deviceInstallEntity.getDeviceAddr(), value.toPlainString(), Constant.READ, deviceInstallEntity, collectionParamsManageEntity);
} }
} }
} catch (Exception e) {
log.error("查找collectionParams失败: name={}, buildingId={}", name, buildingId, e);
} }
return null;
}
/**
* 判断参数类型是否需要处理
*/
private boolean shouldProcessParamType(int paramTypeId) {
// 不需要处理的参数类型
int[] excludedTypes = {0, 3, 4, 15, 16, 17, 18, 19, 21, 22, 23, 24};
for (int excluded : excludedTypes) {
if (paramTypeId == excluded) {
return false;
}
}
return true;
}
/**
* 处理设备数据优化使用缓存减少数据库查询和对象创建
*/
private void processDeviceData(CollectionParamsManageEntity collectionParamsManageEntity,
String time,
BigDecimal value) {
if (collectionParamsManageEntity == null ||
collectionParamsManageEntity.getDeviceInstallId() == null) {
return;
}
Long deviceInstallId = collectionParamsManageEntity.getDeviceInstallId();
try {
// 从缓存获取设备信息
DeviceInstallEntity deviceInstallEntity = getDeviceFromCache(deviceInstallId);
if (deviceInstallEntity == null) {
log.warn("设备信息不存在: deviceInstallId={}", deviceInstallId);
return;
}
String deviceType = deviceInstallEntity.getDeviceType();
if (StringUtils.isBlank(deviceType)) {
log.warn("设备类型为空: deviceInstallId={}", deviceInstallEntity.getId());
return;
}
// 从缓存获取Device和Strategy实例
Device device = getDeviceInstance(deviceType);
DeviceStrategy strategy = getStrategyInstance(deviceType);
if (device == null || strategy == null) {
log.warn("创建设备或策略失败: deviceType={}", deviceType);
return;
}
// 执行分析
device.setStrategy(strategy);
device.analysisMQTTReceiveData(
time,
deviceInstallEntity.getDeviceAddr(),
value.toPlainString(),
Constant.READ,
deviceInstallEntity,
collectionParamsManageEntity);
} catch (Exception e) {
log.error("处理设备数据失败: time={}, value={}", time, value, e);
}
}
/**
* 从缓存获取设备信息
*/
private DeviceInstallEntity getDeviceFromCache(Long deviceInstallId) {
if (deviceInstallId == null) {
return null;
}
// 先从缓存获取
CachedDeviceInfo cachedInfo = deviceCache.get(deviceInstallId);
if (cachedInfo != null && !cachedInfo.isExpired()) {
return cachedInfo.device;
}
// 缓存未命中或已过期,从数据库加载
try {
DeviceInstallEntity device = deviceInstallService.selectDeviceById(deviceInstallId);
if (device != null) {
deviceCache.put(deviceInstallId, new CachedDeviceInfo(device));
}
return device;
} catch (Exception e) {
log.error("查询设备信息失败: deviceInstallId={}", deviceInstallId, e);
return null;
}
}
/**
* 获取Device实例使用缓存
*/
private Device getDeviceInstance(String deviceType) {
return deviceInstanceCache.computeIfAbsent(deviceType, type -> {
try {
return DeviceFactory.createDevice(type);
} catch (Exception e) {
log.error("创建Device实例失败: deviceType={}", type, e);
return null;
}
});
}
/**
* 获取DeviceStrategy实例使用缓存
*/
private DeviceStrategy getStrategyInstance(String deviceType) {
return strategyInstanceCache.computeIfAbsent(deviceType, type -> {
try {
return DeviceStrategyFactory.createStrategy(type);
} catch (Exception e) {
log.error("创建DeviceStrategy实例失败: deviceType={}", type, e);
return null;
}
});
} }
private SanShiFengDatas convertDataItem(Object rawData) { private SanShiFengDatas convertDataItem(Object rawData) {
@ -239,23 +652,28 @@ public class EventsServiceImpl implements IEventsService {
return null; return null;
} }
SanShiFengDatas data = new SanShiFengDatas();
try { try {
if (rawData instanceof SanShiFengDatas) { if (rawData instanceof SanShiFengDatas) {
data = (SanShiFengDatas) rawData; return (SanShiFengDatas) rawData;
} else if (rawData instanceof HashMap) { } else if (rawData instanceof HashMap) {
JSONObject jsonObject = new JSONObject((HashMap<?, ?>) rawData); JSONObject jsonObject = new JSONObject((HashMap<?, ?>) rawData);
data = jsonObject.to(SanShiFengDatas.class); return jsonObject.to(SanShiFengDatas.class);
} else { } else {
log.warn("不支持的数据类型: {}", rawData.getClass().getName()); log.warn("不支持的数据类型: {}", rawData.getClass().getName());
return null; return null;
} }
} catch (Exception e) { } catch (Exception e) {
log.error("数据转换异常", e); log.error("数据转换异常", e);
data.setName(getJsonValueAsString(rawData, "name")); // 尝试恢复至少name字段
data.setValue(new BigDecimal("-1")); String name = getJsonValueAsString(rawData, "name");
if (!StringUtils.isBlank(name)) {
SanShiFengDatas data = new SanShiFengDatas();
data.setName(name);
data.setValue(BigDecimal.ZERO);
return data;
}
return null;
} }
return data;
} }
/** /**
@ -279,7 +697,6 @@ public class EventsServiceImpl implements IEventsService {
JSONObject jsonObject = (JSONObject) rawData; JSONObject jsonObject = (JSONObject) rawData;
return jsonObject.getString(key); return jsonObject.getString(key);
} else { } else {
// 如果是其他类型,尝试使用反射或通用方式获取
log.warn("不支持的数据类型: {}", rawData.getClass().getName()); log.warn("不支持的数据类型: {}", rawData.getClass().getName());
return null; return null;
} }
@ -289,5 +706,4 @@ public class EventsServiceImpl implements IEventsService {
} }
} }
} }

12
user-service/src/main/java/com/mh/user/utils/CacheUtil.java

@ -3,6 +3,7 @@ package com.mh.user.utils;
import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject; import com.alibaba.fastjson2.JSONObject;
import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Cache;
import com.mh.user.entity.CollectionParamsManageEntity;
import com.mh.user.entity.DeviceCodeParamEntity; import com.mh.user.entity.DeviceCodeParamEntity;
import com.mh.user.entity.GatewayManageEntity; import com.mh.user.entity.GatewayManageEntity;
import com.mh.user.service.DeviceCodeParamService; import com.mh.user.service.DeviceCodeParamService;
@ -95,6 +96,17 @@ public class CacheUtil {
return JSONArray.parseArray(JSONObject.toJSONString(cacheObject), GatewayManageEntity.class); return JSONArray.parseArray(JSONObject.toJSONString(cacheObject), GatewayManageEntity.class);
} }
public List<CollectionParamsManageEntity> getCollectionParams() {
Object cacheObject = caffeineCache.getIfPresent("collectionParams");
// 如果为空,重新添加
if (cacheObject == null) {
createDeviceParams();
// 在重新获取数据
cacheObject = caffeineCache.getIfPresent("collectionParams");
}
return JSONArray.parseArray(JSONObject.toJSONString(cacheObject), CollectionParamsManageEntity.class);
}
/** /**
* 删除缓存 * 删除缓存
*/ */

Loading…
Cancel
Save