From 1b806a1e4fa06f59811661d52b167edbce6fba2f Mon Sep 17 00:00:00 2001 From: 25604 Date: Tue, 23 Dec 2025 13:58:34 +0800 Subject: [PATCH] =?UTF-8?q?1=E3=80=81mqtt=E6=B6=88=E6=81=AF=E5=A4=84?= =?UTF-8?q?=E7=90=86=E4=BC=98=E5=8C=96=EF=BC=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../mqtt/service/impl/EventsServiceImpl.java | 144 ++++++++++++------ .../src/main/resources/application-prod.yml | 44 ++++-- 2 files changed, 133 insertions(+), 55 deletions(-) diff --git a/user-service/src/main/java/com/mh/user/service/mqtt/service/impl/EventsServiceImpl.java b/user-service/src/main/java/com/mh/user/service/mqtt/service/impl/EventsServiceImpl.java index d58327c..8302095 100644 --- a/user-service/src/main/java/com/mh/user/service/mqtt/service/impl/EventsServiceImpl.java +++ b/user-service/src/main/java/com/mh/user/service/mqtt/service/impl/EventsServiceImpl.java @@ -104,56 +104,114 @@ public class EventsServiceImpl implements IEventsService { } // 修复类型转换问题 List rawDataList = datas.getDatas(); - for (Object rawData : rawDataList) { - // 安全地转换对象 - SanShiFengDatas data = new SanShiFengDatas(); - if (rawData instanceof SanShiFengDatas) { - data = (SanShiFengDatas) rawData; - } else { - // 如果是 LinkedHashMap,则手动转换 - JSONObject jsonObject = new JSONObject((HashMap) rawData); - try { - data = jsonObject.to(SanShiFengDatas.class); - } catch (Exception e) { - log.error("mqtt数据上报异常", e); - data.setName(jsonObject.getString("name")); - data.setValue("-1"); - } - } - // 不使用消息队列,查询属于哪种设备类型,然后通过策略进行数据解析 - log.info("设备SN:{},PLC名称:{},项目名称:{},时间:{},数据:{}", sn, plcName, projectName, time, data.toString()); - // 获取点位参数名称 - String name = data.getName(); - // 获取点位值 - BigDecimal value = new BigDecimal(0); + if (rawDataList == null || rawDataList.isEmpty()) { + log.warn("数据列表为空,SN: {}", sn); + return; + } + // 并行处理数据列表,避免阻塞 + rawDataList.parallelStream().forEach(rawData -> { try { - value = new BigDecimal(String.valueOf(data.getValue())); + processDataItem(rawData, sn, plcName, projectName, time, buildingId); } catch (Exception e) { - value = BigDecimal.ZERO; - } - // 直接更新collectionParamManage参数值 - collectionParamManageService.updateCPMByOtherName(name, value, time, buildingId); - // 查询device_install表,走之前的逻辑 - CollectionParamsManageEntity collectionParamsManageEntity = collectionParamManageService.selectDeviceInstallByOtherName(name, buildingId); - if (null != collectionParamsManageEntity && collectionParamsManageEntity.getDeviceInstallId() != null) { - DeviceInstallEntity deviceInstallEntity = deviceInstallService.selectDeviceById(collectionParamsManageEntity.getDeviceInstallId()); - if (deviceInstallEntity != null) { - // 开始走策略判断 - String deviceType = deviceInstallEntity.getDeviceType(); - Device device = DeviceFactory.createDevice(deviceType); - DeviceStrategy strategy = DeviceStrategyFactory.createStrategy(deviceType); - if (null == strategy) { - continue; - } - device.setStrategy(strategy); - device.analysisMQTTReceiveData(time, deviceInstallEntity.getDeviceAddr(), String.valueOf(value), Constant.READ, deviceInstallEntity, collectionParamsManageEntity); - } + log.error("处理单个数据项失败: {}", rawData, e); } - } + }); } catch (IOException e) { log.error("处理数据时发生错误: ", e); } } + private void processDataItem(Object rawData, String sn, String plcName, String projectName, String time, String buildingId) { + // 安全地转换对象 + SanShiFengDatas data = convertDataItem(rawData); + if (data == null) { + log.warn("数据转换失败,跳过处理"); + return; + } + // 不使用消息队列,查询属于哪种设备类型,然后通过策略进行数据解析 + log.info("设备SN:{},PLC名称:{},项目名称:{},时间:{},数据:{}", sn, plcName, projectName, time, data.toString()); + // 获取点位参数名称 + String name = data.getName(); + // 获取点位值 + BigDecimal value = new BigDecimal(0); + try { + value = new BigDecimal(String.valueOf(data.getValue())); + } catch (Exception e) { + value = BigDecimal.ZERO; + } + // 直接更新collectionParamManage参数值 + collectionParamManageService.updateCPMByOtherName(name, value, time, buildingId); + // 查询device_install表,走之前的逻辑 + CollectionParamsManageEntity collectionParamsManageEntity = collectionParamManageService.selectDeviceInstallByOtherName(name, buildingId); + if (null != collectionParamsManageEntity && collectionParamsManageEntity.getDeviceInstallId() != null) { + DeviceInstallEntity deviceInstallEntity = deviceInstallService.selectDeviceById(collectionParamsManageEntity.getDeviceInstallId()); + if (deviceInstallEntity != null) { + // 开始走策略判断 + String deviceType = deviceInstallEntity.getDeviceType(); + Device device = DeviceFactory.createDevice(deviceType); + DeviceStrategy strategy = DeviceStrategyFactory.createStrategy(deviceType); + if (strategy != null) { + device.setStrategy(strategy); + device.analysisMQTTReceiveData(time, deviceInstallEntity.getDeviceAddr(), String.valueOf(value), Constant.READ, deviceInstallEntity, collectionParamsManageEntity); + } + } + } + } + + private SanShiFengDatas convertDataItem(Object rawData) { + if (rawData == null) { + return null; + } + + SanShiFengDatas data = new SanShiFengDatas(); + try { + if (rawData instanceof SanShiFengDatas) { + data = (SanShiFengDatas) rawData; + } else if (rawData instanceof HashMap) { + JSONObject jsonObject = new JSONObject((HashMap) rawData); + data = jsonObject.to(SanShiFengDatas.class); + } else { + log.warn("不支持的数据类型: {}", rawData.getClass().getName()); + return null; + } + } catch (Exception e) { + log.error("数据转换异常", e); + data.setName(getJsonValueAsString(rawData, "name")); + data.setValue("-1"); + } + return data; + } + + /** + * 从原始数据对象中获取指定键的字符串值 + * + * @param rawData 原始数据对象(通常为Map类型) + * @param key 要获取的键名 + * @return 键对应的字符串值,如果不存在或转换失败则返回null + */ + private String getJsonValueAsString(Object rawData, String key) { + if (rawData == null || StringUtils.isBlank(key)) { + return null; + } + + try { + if (rawData instanceof HashMap) { + HashMap map = (HashMap) rawData; + Object value = map.get(key); + return value != null ? String.valueOf(value) : null; + } else if (rawData instanceof JSONObject) { + JSONObject jsonObject = (JSONObject) rawData; + return jsonObject.getString(key); + } else { + // 如果是其他类型,尝试使用反射或通用方式获取 + log.warn("不支持的数据类型: {}", rawData.getClass().getName()); + return null; + } + } catch (Exception e) { + log.error("获取JSON值时发生异常,key: {}", key, e); + return null; + } + } + } diff --git a/user-service/src/main/resources/application-prod.yml b/user-service/src/main/resources/application-prod.yml index b1fc53a..db1cf1c 100644 --- a/user-service/src/main/resources/application-prod.yml +++ b/user-service/src/main/resources/application-prod.yml @@ -1,3 +1,17 @@ +# 项目相关配置 +mh: + # 名称 + name: nfxy_prod + # 版本 + version: 1.0.0 + # 版权年份 + copyrightYear: 2025 + # 文件路径 示例( Windows配置D:/mh/uploadPath,Linux配置 /home/mh/uploadPath) + profile: E:/mh_data/uploadPath + # 获取ip地址开关 + addressEnabled: false + # 验证码类型 math 数字计算 char 字符验证 + captchaType: math server: port: 8762 #8761创新、8762广商、8763华厦、广州理工,华粤8762,广外8764,北师大(珠海)8762 spring: @@ -13,11 +27,11 @@ spring: # username: sa # password: mh@803 ## url: jdbc:sqlserver://120.25.220.177:32012;DatabaseName=M_CHWS;allowMultiQueries=true - #阿里云服务器-广州理工 - url: jdbc:sqlserver://111.230.50.186:32012;DatabaseName=CHWS;allowMultiQueries=true;encrypt=false - driver-class-name: com.microsoft.sqlserver.jdbc.SQLServerDriver - username: test - password: minghan123456@ +# #阿里云服务器-广州理工 +# url: jdbc:sqlserver://111.230.50.186:32012;DatabaseName=CHWS;allowMultiQueries=true;encrypt=false +# driver-class-name: com.microsoft.sqlserver.jdbc.SQLServerDriver +# username: test +# password: minghan123456@ # #华厦云服务器 # url: jdbc:sqlserver://127.0.0.1:1433;DatabaseName=CHWS;allowMultiQueries=true # driver-class-name: com.microsoft.sqlserver.jdbc.SQLServerDriver @@ -70,6 +84,12 @@ spring: # username: chws_bsdz # password: Mhtech@803803 + #南方学院 + url: jdbc:sqlserver://175.178.153.91:8033;DatabaseName=chws_nfxy;allowMultiQueries=true;encrypt=false + driver-class-name: com.microsoft.sqlserver.jdbc.SQLServerDriver + username: chws_nfxy + password: minghan@123456 + filters: stat,wall,config max-active: 100 initial-size: 1 @@ -150,23 +170,23 @@ mqttSpring: # BASIC parameters are required. BASIC: protocol: MQTT - host: 192.168.1.179 + host: mqtt.mhito.net port: 1883 - username: test - password: test123456 - client-id: chws_nfxy_mqtt_dev + username: chws_nfxy + password: mhtech@803 + client-id: chws_nfxy_mqtt_prod # If the protocol is ws/wss, this value is required. path: # Topics that need to be subscribed when initially connecting to mqtt, multiple topics are divided by ",". - inbound-topic: chws_nfxy_mqtt_dev/read/events_upload/devices + inbound-topic: chws_nfxy_mqtt_prod/read/events_upload/devices # 此部分是提供给后端生成token返回给前端,让前端使用websocket方式去和MQTT实现交互的,笔者此文的案例中并没有去实现 # 无人机远程控制模式(drone remote control) DRC: protocol: WS - host: 192.168.1.179 + host: mqtt.mhito.net port: 8083 path: /mqtt control: - topic: chws_nfxy_mqtt_dev/control/events_upload/devices + topic: mh_control/events_control/devices