Browse Source

1、mqtt消息处理优化;

dev
25604 3 weeks ago
parent
commit
1b806a1e4f
  1. 144
      user-service/src/main/java/com/mh/user/service/mqtt/service/impl/EventsServiceImpl.java
  2. 44
      user-service/src/main/resources/application-prod.yml

144
user-service/src/main/java/com/mh/user/service/mqtt/service/impl/EventsServiceImpl.java

@ -104,56 +104,114 @@ public class EventsServiceImpl implements IEventsService {
} }
// 修复类型转换问题 // 修复类型转换问题
List<?> rawDataList = datas.getDatas(); List<?> rawDataList = datas.getDatas();
for (Object rawData : rawDataList) { if (rawDataList == null || rawDataList.isEmpty()) {
// 安全地转换对象 log.warn("数据列表为空,SN: {}", sn);
SanShiFengDatas data = new SanShiFengDatas(); return;
if (rawData instanceof SanShiFengDatas) { }
data = (SanShiFengDatas) rawData; // 并行处理数据列表,避免阻塞
} else { rawDataList.parallelStream().forEach(rawData -> {
// 如果是 LinkedHashMap,则手动转换
JSONObject jsonObject = new JSONObject((HashMap<?, ?>) rawData);
try {
data = jsonObject.to(SanShiFengDatas.class);
} catch (Exception e) {
log.error("mqtt数据上报异常", e);
data.setName(jsonObject.getString("name"));
data.setValue("-1");
}
}
// 不使用消息队列,查询属于哪种设备类型,然后通过策略进行数据解析
log.info("设备SN:{},PLC名称:{},项目名称:{},时间:{},数据:{}", sn, plcName, projectName, time, data.toString());
// 获取点位参数名称
String name = data.getName();
// 获取点位值
BigDecimal value = new BigDecimal(0);
try { try {
value = new BigDecimal(String.valueOf(data.getValue())); processDataItem(rawData, sn, plcName, projectName, time, buildingId);
} catch (Exception e) { } catch (Exception e) {
value = BigDecimal.ZERO; log.error("处理单个数据项失败: {}", rawData, e);
}
// 直接更新collectionParamManage参数值
collectionParamManageService.updateCPMByOtherName(name, value, time, buildingId);
// 查询device_install表,走之前的逻辑
CollectionParamsManageEntity collectionParamsManageEntity = collectionParamManageService.selectDeviceInstallByOtherName(name, buildingId);
if (null != collectionParamsManageEntity && collectionParamsManageEntity.getDeviceInstallId() != null) {
DeviceInstallEntity deviceInstallEntity = deviceInstallService.selectDeviceById(collectionParamsManageEntity.getDeviceInstallId());
if (deviceInstallEntity != null) {
// 开始走策略判断
String deviceType = deviceInstallEntity.getDeviceType();
Device device = DeviceFactory.createDevice(deviceType);
DeviceStrategy strategy = DeviceStrategyFactory.createStrategy(deviceType);
if (null == strategy) {
continue;
}
device.setStrategy(strategy);
device.analysisMQTTReceiveData(time, deviceInstallEntity.getDeviceAddr(), String.valueOf(value), Constant.READ, deviceInstallEntity, collectionParamsManageEntity);
}
} }
} });
} catch (IOException e) { } catch (IOException e) {
log.error("处理数据时发生错误: ", e); log.error("处理数据时发生错误: ", e);
} }
} }
private void processDataItem(Object rawData, String sn, String plcName, String projectName, String time, String buildingId) {
// 安全地转换对象
SanShiFengDatas data = convertDataItem(rawData);
if (data == null) {
log.warn("数据转换失败,跳过处理");
return;
}
// 不使用消息队列,查询属于哪种设备类型,然后通过策略进行数据解析
log.info("设备SN:{},PLC名称:{},项目名称:{},时间:{},数据:{}", sn, plcName, projectName, time, data.toString());
// 获取点位参数名称
String name = data.getName();
// 获取点位值
BigDecimal value = new BigDecimal(0);
try {
value = new BigDecimal(String.valueOf(data.getValue()));
} catch (Exception e) {
value = BigDecimal.ZERO;
}
// 直接更新collectionParamManage参数值
collectionParamManageService.updateCPMByOtherName(name, value, time, buildingId);
// 查询device_install表,走之前的逻辑
CollectionParamsManageEntity collectionParamsManageEntity = collectionParamManageService.selectDeviceInstallByOtherName(name, buildingId);
if (null != collectionParamsManageEntity && collectionParamsManageEntity.getDeviceInstallId() != null) {
DeviceInstallEntity deviceInstallEntity = deviceInstallService.selectDeviceById(collectionParamsManageEntity.getDeviceInstallId());
if (deviceInstallEntity != null) {
// 开始走策略判断
String deviceType = deviceInstallEntity.getDeviceType();
Device device = DeviceFactory.createDevice(deviceType);
DeviceStrategy strategy = DeviceStrategyFactory.createStrategy(deviceType);
if (strategy != null) {
device.setStrategy(strategy);
device.analysisMQTTReceiveData(time, deviceInstallEntity.getDeviceAddr(), String.valueOf(value), Constant.READ, deviceInstallEntity, collectionParamsManageEntity);
}
}
}
}
private SanShiFengDatas convertDataItem(Object rawData) {
if (rawData == null) {
return null;
}
SanShiFengDatas data = new SanShiFengDatas();
try {
if (rawData instanceof SanShiFengDatas) {
data = (SanShiFengDatas) rawData;
} else if (rawData instanceof HashMap) {
JSONObject jsonObject = new JSONObject((HashMap<?, ?>) rawData);
data = jsonObject.to(SanShiFengDatas.class);
} else {
log.warn("不支持的数据类型: {}", rawData.getClass().getName());
return null;
}
} catch (Exception e) {
log.error("数据转换异常", e);
data.setName(getJsonValueAsString(rawData, "name"));
data.setValue("-1");
}
return data;
}
/**
* 从原始数据对象中获取指定键的字符串值
*
* @param rawData 原始数据对象通常为Map类型
* @param key 要获取的键名
* @return 键对应的字符串值如果不存在或转换失败则返回null
*/
private String getJsonValueAsString(Object rawData, String key) {
if (rawData == null || StringUtils.isBlank(key)) {
return null;
}
try {
if (rawData instanceof HashMap) {
HashMap<?, ?> map = (HashMap<?, ?>) rawData;
Object value = map.get(key);
return value != null ? String.valueOf(value) : null;
} else if (rawData instanceof JSONObject) {
JSONObject jsonObject = (JSONObject) rawData;
return jsonObject.getString(key);
} else {
// 如果是其他类型,尝试使用反射或通用方式获取
log.warn("不支持的数据类型: {}", rawData.getClass().getName());
return null;
}
} catch (Exception e) {
log.error("获取JSON值时发生异常,key: {}", key, e);
return null;
}
}
} }

44
user-service/src/main/resources/application-prod.yml

@ -1,3 +1,17 @@
# 项目相关配置
mh:
# 名称
name: nfxy_prod
# 版本
version: 1.0.0
# 版权年份
copyrightYear: 2025
# 文件路径 示例( Windows配置D:/mh/uploadPath,Linux配置 /home/mh/uploadPath)
profile: E:/mh_data/uploadPath
# 获取ip地址开关
addressEnabled: false
# 验证码类型 math 数字计算 char 字符验证
captchaType: math
server: server:
port: 8762 #8761创新、8762广商、8763华厦、广州理工,华粤8762,广外8764,北师大(珠海)8762 port: 8762 #8761创新、8762广商、8763华厦、广州理工,华粤8762,广外8764,北师大(珠海)8762
spring: spring:
@ -13,11 +27,11 @@ spring:
# username: sa # username: sa
# password: mh@803 # password: mh@803
## url: jdbc:sqlserver://120.25.220.177:32012;DatabaseName=M_CHWS;allowMultiQueries=true ## url: jdbc:sqlserver://120.25.220.177:32012;DatabaseName=M_CHWS;allowMultiQueries=true
#阿里云服务器-广州理工 # #阿里云服务器-广州理工
url: jdbc:sqlserver://111.230.50.186:32012;DatabaseName=CHWS;allowMultiQueries=true;encrypt=false # url: jdbc:sqlserver://111.230.50.186:32012;DatabaseName=CHWS;allowMultiQueries=true;encrypt=false
driver-class-name: com.microsoft.sqlserver.jdbc.SQLServerDriver # driver-class-name: com.microsoft.sqlserver.jdbc.SQLServerDriver
username: test # username: test
password: minghan123456@ # password: minghan123456@
# #华厦云服务器 # #华厦云服务器
# url: jdbc:sqlserver://127.0.0.1:1433;DatabaseName=CHWS;allowMultiQueries=true # url: jdbc:sqlserver://127.0.0.1:1433;DatabaseName=CHWS;allowMultiQueries=true
# driver-class-name: com.microsoft.sqlserver.jdbc.SQLServerDriver # driver-class-name: com.microsoft.sqlserver.jdbc.SQLServerDriver
@ -70,6 +84,12 @@ spring:
# username: chws_bsdz # username: chws_bsdz
# password: Mhtech@803803 # password: Mhtech@803803
#南方学院
url: jdbc:sqlserver://175.178.153.91:8033;DatabaseName=chws_nfxy;allowMultiQueries=true;encrypt=false
driver-class-name: com.microsoft.sqlserver.jdbc.SQLServerDriver
username: chws_nfxy
password: minghan@123456
filters: stat,wall,config filters: stat,wall,config
max-active: 100 max-active: 100
initial-size: 1 initial-size: 1
@ -150,23 +170,23 @@ mqttSpring:
# BASIC parameters are required. # BASIC parameters are required.
BASIC: BASIC:
protocol: MQTT protocol: MQTT
host: 192.168.1.179 host: mqtt.mhito.net
port: 1883 port: 1883
username: test username: chws_nfxy
password: test123456 password: mhtech@803
client-id: chws_nfxy_mqtt_dev client-id: chws_nfxy_mqtt_prod
# If the protocol is ws/wss, this value is required. # If the protocol is ws/wss, this value is required.
path: path:
# Topics that need to be subscribed when initially connecting to mqtt, multiple topics are divided by ",". # Topics that need to be subscribed when initially connecting to mqtt, multiple topics are divided by ",".
inbound-topic: chws_nfxy_mqtt_dev/read/events_upload/devices inbound-topic: chws_nfxy_mqtt_prod/read/events_upload/devices
# 此部分是提供给后端生成token返回给前端,让前端使用websocket方式去和MQTT实现交互的,笔者此文的案例中并没有去实现 # 此部分是提供给后端生成token返回给前端,让前端使用websocket方式去和MQTT实现交互的,笔者此文的案例中并没有去实现
# 无人机远程控制模式(drone remote control) # 无人机远程控制模式(drone remote control)
DRC: DRC:
protocol: WS protocol: WS
host: 192.168.1.179 host: mqtt.mhito.net
port: 8083 port: 8083
path: /mqtt path: /mqtt
control: control:
topic: chws_nfxy_mqtt_dev/control/events_upload/devices topic: mh_control/events_control/devices

Loading…
Cancel
Save