From 40f7577927c388d3ead89b5a5267977fdc472327 Mon Sep 17 00:00:00 2001 From: mh Date: Mon, 6 Nov 2023 17:51:17 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E9=87=87=E9=9B=86=E7=BB=93?= =?UTF-8?q?=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/com/mh/user/job/DealDataJob.java | 62 ++---- .../mh/user/serialport/SerialPortSingle.java | 186 +++++++++--------- .../mh/user/serialport/SerialPortUtil.java | 4 +- .../mh/user/utils/ComThreadPoolService.java | 48 +++++ .../com/mh/user/utils/SendOrderUtils.java | 51 +++-- 5 files changed, 192 insertions(+), 159 deletions(-) create mode 100644 user-service/src/main/java/com/mh/user/utils/ComThreadPoolService.java diff --git a/user-service/src/main/java/com/mh/user/job/DealDataJob.java b/user-service/src/main/java/com/mh/user/job/DealDataJob.java index ee7f644..331543e 100644 --- a/user-service/src/main/java/com/mh/user/job/DealDataJob.java +++ b/user-service/src/main/java/com/mh/user/job/DealDataJob.java @@ -6,6 +6,7 @@ import com.mh.user.serialport.SerialPortThread; import com.mh.user.service.DeviceCodeParamService; import com.mh.user.service.DealDataService; import com.mh.user.utils.AnalysisReceiveOrder485; +import com.mh.user.utils.ComThreadPoolService; import com.mh.user.utils.GetReadOrder485; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -13,6 +14,7 @@ import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.text.SimpleDateFormat; import java.util.Date; +import java.util.concurrent.ThreadPoolExecutor; /** * @author ljf @@ -37,6 +39,8 @@ public class DealDataJob { this.dealDataService = dealDataService; } + ThreadPoolExecutor comThreadPool = ComThreadPoolService.getInstance(); + /** * 定时处理汇总数据:每15分钟处理一次,十分钟(0 0/10 * * * ?) */ @@ -71,63 +75,27 @@ public class DealDataJob { log.info("------定时采集开始>>>>Constant.FLAG=="+Constant.FLAG+"------"); if(!Constant.FLAG){ if(!Constant.WEB_FLAG){ - log.info("------taskTimes=="+taskTimes+"------"); Constant.FLAG=true; - log.info("------Constant.WEB_FLAG=="+Constant.WEB_FLAG+"------"); + log.info("------Constant.WEB_FLAG=="+ false +"------"); for (int i = 1; i <= 4; i++) { - String threadName = String.valueOf(i); + String threadName; if (i == 1 || i == 3) { threadName = "1"; + log.info("------采集水位、水温!"+i+"------"); + } else if (i == 2) { + threadName = "2"; + log.info("------采集水、电、运行状态!"+i+"------"); + } else { + threadName = "3"; + log.info("------采集设定温度、设定水位、故障状态!"+i+"------"); } - for(int j=1;i<11;i++){ + for(int j=1;j<11;j++){ SerialPortThread myThread = new SerialPortThread(); Thread thread = new Thread(myThread); myThread.setName(threadName, String.valueOf(j)); - thread.start(); + comThreadPool.execute(thread); } } -// if (taskTimes<=4) { -// Constant.FLAG=true; -// log.info("------Constant.WEB_FLAG=="+Constant.WEB_FLAG+"------"); -// if (taskTimes == 2) {//2 -// for(int i=1;i<11;i++){ -// SerialPortThread myThread = new SerialPortThread(); -// Thread thread = new Thread(myThread); -// myThread.setName("2", String.valueOf(i)); -// thread.start(); -// } -// log.info("------采集水、电、运行状态!"+taskTimes+"------"); -// }else if (taskTimes == 3){//3 -// for(int i=1;i<11;i++){ -// SerialPortThread myThread = new SerialPortThread(); -// Thread thread = new Thread(myThread); -// myThread.setName("1", String.valueOf(i)); -// thread.start(); -// } -// log.info("------采集水位、水温!"+taskTimes+"------"); -// }else if (taskTimes == 4) {//4 -// for(int i=1;i<11;i++){ -// SerialPortThread myThread = new SerialPortThread(); -// Thread thread = new Thread(myThread); -// myThread.setName("3", String.valueOf(i)); -// thread.start(); -// } -// log.info("------采集设定温度、设定水位、故障状态!"+taskTimes+"------"); -// }else { -// for(int i=1;i<11;i++){ -// SerialPortThread myThread = new SerialPortThread(); -// Thread thread = new Thread(myThread); -// myThread.setName("1", String.valueOf(i)); -// thread.start(); -// } -// log.info("------采集水位、水温!"+taskTimes+"------"); -// } -// if(taskTimes<4){ -// taskTimes++; -// }else{ -// taskTimes=1; -// } -// } } } } catch (Exception e) { diff --git a/user-service/src/main/java/com/mh/user/serialport/SerialPortSingle.java b/user-service/src/main/java/com/mh/user/serialport/SerialPortSingle.java index 82bc9d1..3b34d9c 100644 --- a/user-service/src/main/java/com/mh/user/serialport/SerialPortSingle.java +++ b/user-service/src/main/java/com/mh/user/serialport/SerialPortSingle.java @@ -14,6 +14,7 @@ import org.springframework.context.ApplicationContext; import org.springframework.stereotype.Component; //import purejavacomm.SerialPort; import gnu.io.SerialPort; + import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -29,10 +30,10 @@ import java.util.List; @Slf4j public class SerialPortSingle { - public SerialPort serialPort = null; - private String receiveStr = null; - private int baudrate=9600; - private String parity=null; + public SerialPort serialPort = null; + private String receiveStr = null; + private int baudrate = 9600; + private String parity = null; // 调用service ApplicationContext context = SpringBeanUtil.getApplicationContext(); @@ -41,107 +42,111 @@ public class SerialPortSingle { DeviceInstallService deviceInstallService = context.getBean(DeviceInstallService.class); NowDataService nowDataService = context.getBean(NowDataService.class); BuildingService buildingService = context.getBean(BuildingService.class); - public String serialPortSend(DeviceCodeParamEntity deviceCodeParamEntity){ + + public String serialPortSend(DeviceCodeParamEntity deviceCodeParamEntity) { //查看所有串口 SerialPortUtil serialPortUtil = SerialPortUtil.getSerialPortUtil(); ArrayList port = serialPortUtil.findPort(); // SerialTool serialPortUtil = SerialTool.getSerialPortUtil(); // ArrayList port = serialPortUtil.findPort(); - String rtData=""; + String rtData = ""; // System.out.println("发现全部串口:" + port); - String comName=deviceCodeParamEntity.getDataCom().toUpperCase(); - if (port.contains(comName)){ - try{ - try{ - baudrate=deviceCodeParamEntity.getBaudrate(); - parity=deviceCodeParamEntity.getParity(); - if (parity==null || parity.equals("") || parity.equalsIgnoreCase("none")){ - serialPort = serialPortUtil.openPort(comName, baudrate, SerialPort.DATABITS_8, SerialPort.PARITY_NONE, SerialPort.PARITY_ODD); - }else{ - serialPort = serialPortUtil.openPort(comName, baudrate, SerialPort.DATABITS_8, SerialPort.PARITY_EVEN, SerialPort.PARITY_ODD); - } - //向串口发送指令 - log.info("-----------------------------单抄向串口"+serialPort+"发送指令!-----------------------------"); - SendOrderUtils.sendSerialPort(deviceCodeParamEntity, serialPort); - Thread.sleep(1500); - }catch(Exception e){ - - } - //对返回数据进行相关解析处理 - receiveStr=null; - byte[] bytes = serialPortUtil.readFromPort(serialPort); //读取串口数据 - try { - String byteStr = new String(bytes, 0, bytes.length).trim(); - receiveStr = receiveStr + printHexString(bytes); - //去掉空格和null - receiveStr = receiveStr.replace("null", ""); - receiveStr = receiveStr.replace(" ", ""); - log.info("串口"+serialPort+"接收数据:" + receiveStr + ",大小: " + receiveStr.length()); - } catch (NullPointerException e) { - serialPortUtil.closePort(serialPort); - log.info("单抄串口"+serialPort+"异常,没有数据返回!关闭串口"); + String comName = deviceCodeParamEntity.getDataCom().toUpperCase(); + if (!port.contains(comName)) { + log.info("串口:" + comName + "不存在!"); + return "fail"; + } + try { + try { + baudrate = deviceCodeParamEntity.getBaudrate(); + parity = deviceCodeParamEntity.getParity(); + if (parity == null || parity.equals("") || parity.equalsIgnoreCase("none")) { + serialPort = serialPortUtil.openPort(comName, baudrate, SerialPort.DATABITS_8, SerialPort.PARITY_NONE, SerialPort.PARITY_ODD); + } else { + serialPort = serialPortUtil.openPort(comName, baudrate, SerialPort.DATABITS_8, SerialPort.PARITY_EVEN, SerialPort.PARITY_ODD); } - //返回值全部变成大写 - String receiveData = receiveStr.toUpperCase(); - //截取去掉FE - String dataStr = receiveData.replace("FE", ""); - String deviceType=deviceCodeParamEntity.getDeviceType(); - String deviceAddr=deviceCodeParamEntity.getDeviceAddr(); - String registerAddr=deviceCodeParamEntity.getRegisterAddr(); - String brand=deviceCodeParamEntity.getBrand(); - String buildingId=deviceCodeParamEntity.getBuildingId(); - String buildingName=buildingService.queryBuildingName(buildingId); //查询楼栋名称 + //向串口发送指令 + log.info("-----------------------------单抄向串口" + serialPort + "发送指令!-----------------------------"); + SendOrderUtils.sendSerialPort(deviceCodeParamEntity, serialPort); + Thread.sleep(1500); + } catch (Exception e) { + log.error("前端设置出现异常==>", e); + return "fail"; + } + //对返回数据进行相关解析处理 + receiveStr = null; + byte[] bytes = serialPortUtil.readFromPort(serialPort); //读取串口数据 + if (null == bytes) { + serialPortUtil.closePort(serialPort); + log.info("单抄串口" + serialPort + "异常,没有数据返回!关闭串口"); + return "fail"; + } + receiveStr = receiveStr + printHexString(bytes); + //去掉空格和null + receiveStr = receiveStr.replace("null", ""); + receiveStr = receiveStr.replace(" ", ""); + log.info("串口" + serialPort + "接收数据:" + receiveStr + ",大小: " + receiveStr.length()); + //返回值全部变成大写 + String receiveData = receiveStr.toUpperCase(); + //截取去掉FE + String dataStr = receiveData.replace("FE", ""); + String deviceType = deviceCodeParamEntity.getDeviceType(); + String deviceAddr = deviceCodeParamEntity.getDeviceAddr(); + String registerAddr = deviceCodeParamEntity.getRegisterAddr(); + String brand = deviceCodeParamEntity.getBrand(); + String buildingId = deviceCodeParamEntity.getBuildingId(); + String buildingName = buildingService.queryBuildingName(buildingId); //查询楼栋名称 - deviceInstallService.updateOnline(deviceAddr,deviceType,buildingId,"在线"); //设备在线 - log.info(deviceType+"在线,设备号:"+deviceAddr+",所属楼栋:"+buildingName); - if (deviceType.equals("热泵")){ - String strState=nowDataService.selectState(buildingId,deviceAddr); - if (strState!=null && strState.equals("离线")){ //采集到数据 - nowDataService.updateRunState(buildingId,deviceAddr,"不运行"); //监控界面状态表热泵在线状态 - } + deviceInstallService.updateOnline(deviceAddr, deviceType, buildingId, "在线"); //设备在线 + log.info(deviceType + "在线,设备号:" + deviceAddr + ",所属楼栋:" + buildingName); + if (deviceType.equals("热泵")) { + String strState = nowDataService.selectState(buildingId, deviceAddr); + if (strState != null && strState.equals("离线")) { //采集到数据 + nowDataService.updateRunState(buildingId, deviceAddr, "不运行"); //监控界面状态表热泵在线状态 } - try{ - if ((dataStr.length() == 18 || dataStr.length() == 70 || dataStr.length() == 44) && deviceType.equals("水表")) { - rtData=analysisReceiveOrder485.analysisWtMeterOrder4852(dataStr,registerAddr,brand,buildingId); - } else if ((dataStr.length() == 36 || dataStr.length() == 44 || dataStr.length()==40 || dataStr.length()==50) && deviceType.equals("电表")) { - rtData=analysisReceiveOrder485.analysisMeterOrder4852(dataStr,registerAddr,brand,buildingId); - } else if (deviceType.equals("压变")) { - rtData=analysisReceiveOrder485.analysisPressureOrder4852(dataStr,registerAddr,brand,buildingId); - } else if ((dataStr.length() == 30) && deviceType.equals("状态检测")) {//五路状态读取,兼容旧版系统 - analysisReceiveOrder485.analysisStateOrder485(dataStr,registerAddr,brand,buildingId); - } else if (deviceType.equals("水位开关") && (registerAddr.equals("0018") || registerAddr.equals("0017"))) { - rtData=analysisReceiveOrder485.analysisRelayOrder4852(dataStr,registerAddr,brand,buildingId); - } else if (deviceType.equals("热泵")) { - rtData=analysisReceiveOrder485.analysisPumpOrder4852(dataStr,registerAddr,brand,buildingId); - } else if (deviceType.equals("时控")) { - rtData=analysisReceiveOrder485.analysisTimeSetOrder4852(dataStr,registerAddr,brand,buildingId); - } else if (deviceType.equals("水位开关") && registerAddr.equals("0010")){ //热泵状态 - rtData=analysisReceiveOrder485.analysisPumpStateOrder2(dataStr,registerAddr,brand,buildingId); - } else if (deviceType.equals("温度变送器")) { - rtData=analysisReceiveOrder485.analysisMulTempOrder4852(dataStr,registerAddr,brand,buildingId); - } else if (deviceType.equals("热泵状态")){ - rtData=analysisReceiveOrder485.analysisPumpStateOrder2(dataStr,registerAddr,brand,buildingId); - } - }catch (Exception e){ - log.error(deviceCodeParamEntity.getDeviceType()+"单抄保存数据库失败!"); - serialPortUtil.closePort(serialPort); + } + try { + if ((dataStr.length() == 18 || dataStr.length() == 70 || dataStr.length() == 44) && deviceType.equals("水表")) { + rtData = analysisReceiveOrder485.analysisWtMeterOrder4852(dataStr, registerAddr, brand, buildingId); + } else if ((dataStr.length() == 36 || dataStr.length() == 44 || dataStr.length() == 40 || dataStr.length() == 50) && deviceType.equals("电表")) { + rtData = analysisReceiveOrder485.analysisMeterOrder4852(dataStr, registerAddr, brand, buildingId); + } else if (deviceType.equals("压变")) { + rtData = analysisReceiveOrder485.analysisPressureOrder4852(dataStr, registerAddr, brand, buildingId); + } else if ((dataStr.length() == 30) && deviceType.equals("状态检测")) {//五路状态读取,兼容旧版系统 + analysisReceiveOrder485.analysisStateOrder485(dataStr, registerAddr, brand, buildingId); + } else if (deviceType.equals("水位开关") && (registerAddr.equals("0018") || registerAddr.equals("0017"))) { + rtData = analysisReceiveOrder485.analysisRelayOrder4852(dataStr, registerAddr, brand, buildingId); + } else if (deviceType.equals("热泵")) { + rtData = analysisReceiveOrder485.analysisPumpOrder4852(dataStr, registerAddr, brand, buildingId); + } else if (deviceType.equals("时控")) { + rtData = analysisReceiveOrder485.analysisTimeSetOrder4852(dataStr, registerAddr, brand, buildingId); + } else if (deviceType.equals("水位开关") && registerAddr.equals("0010")) { //热泵状态 + rtData = analysisReceiveOrder485.analysisPumpStateOrder2(dataStr, registerAddr, brand, buildingId); + } else if (deviceType.equals("温度变送器")) { + rtData = analysisReceiveOrder485.analysisMulTempOrder4852(dataStr, registerAddr, brand, buildingId); + } else if (deviceType.equals("热泵状态")) { + rtData = analysisReceiveOrder485.analysisPumpStateOrder2(dataStr, registerAddr, brand, buildingId); } + } catch (Exception e) { + log.error(deviceCodeParamEntity.getDeviceType() + "单抄保存数据库失败!"); serialPortUtil.closePort(serialPort); - System.out.println("关闭"+serialPort); - Thread.sleep(500); - log.info("-----------------------------"+serialPort+"单抄结束!-----------------------------"); - return rtData; - }catch (Exception e){ -// e.printStackTrace(); + return "fail"; } - }else { - log.info("串口:"+comName+"不存在!"); + serialPortUtil.closePort(serialPort); + System.out.println("关闭" + serialPort); + Thread.sleep(500); + log.info("-----------------------------" + serialPort + "单抄结束!-----------------------------"); + return rtData; + } catch (Exception e) { + log.error("前端设置出现异常==>", e); + return "fail"; } - return "fail"; } + /** * 字节数组转16进制字符串 + * * @param b 字节数组 * @return 16进制字符串 */ @@ -159,12 +164,13 @@ public class SerialPortSingle { /** * 十六进制字符串转byte[] + * * @param hex 十六进制字符串 * @return byte[] */ public static byte[] hexStr2Byte(String hex) { if (hex == null) { - return new byte[] {}; + return new byte[]{}; } // 奇数位补0 @@ -183,8 +189,10 @@ public class SerialPortSingle { } return buffer.array(); } + /** * 16进制转换成为string类型字符串 + * * @param s 待转换字符串 */ public static String hexStringToString(String s) { diff --git a/user-service/src/main/java/com/mh/user/serialport/SerialPortUtil.java b/user-service/src/main/java/com/mh/user/serialport/SerialPortUtil.java index e194d3f..f1dc576 100644 --- a/user-service/src/main/java/com/mh/user/serialport/SerialPortUtil.java +++ b/user-service/src/main/java/com/mh/user/serialport/SerialPortUtil.java @@ -23,9 +23,7 @@ public class SerialPortUtil { static { //在该类被ClassLoader加载时就初始化一个SerialTool对象 - if (serialPortUtil == null) { - serialPortUtil = new SerialPortUtil(); - } + serialPortUtil = new SerialPortUtil(); } //私有化SerialTool类的构造方法,不允许其他类生成SerialTool对象 diff --git a/user-service/src/main/java/com/mh/user/utils/ComThreadPoolService.java b/user-service/src/main/java/com/mh/user/utils/ComThreadPoolService.java new file mode 100644 index 0000000..9bc05ac --- /dev/null +++ b/user-service/src/main/java/com/mh/user/utils/ComThreadPoolService.java @@ -0,0 +1,48 @@ +package com.mh.user.utils; + +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * @author LJF + * @title : 单例的线程池 + * @description 使用静态内部类进行创建,专门解析接收到的报文数据 + * @updateTime 2020-12-09 + * @throws : + */ +public class ComThreadPoolService { + + /** 线程池保持ALIVE状态线程数 */ + public static final int CORE_POOL_SIZE = 10; + + /** 线程池最大线程数 */ + public static final int MAX_POOL_SIZE = 50; + + /** 空闲线程回收时间 */ + public static final int KEEP_ALIVE_TIME = 30000; + + /** 线程池等待队列 */ + public static final int BLOCKING_QUEUE_SIZE = 1000; + + // 私有化构造器 + private ComThreadPoolService(){} + + // 对外访问的公共方法 + public static ThreadPoolExecutor getInstance() { + return ThreadPoolServiceHolder.instance; + } + + //写一个静态内部类,里面实例化外部类 + private static class ThreadPoolServiceHolder { + private static final ThreadPoolExecutor instance = new ThreadPoolExecutor( + CORE_POOL_SIZE, // 线程池保持存活的线程数 + MAX_POOL_SIZE, // 最大线程数 + KEEP_ALIVE_TIME, // 空闲线程回收时间 + TimeUnit.MICROSECONDS, // 单位 + new LinkedBlockingQueue<>(BLOCKING_QUEUE_SIZE), // 线程队列 + new ThreadPoolExecutor.AbortPolicy() // 线程池对拒绝任务的处理策略 + ); + } + +} diff --git a/user-service/src/main/java/com/mh/user/utils/SendOrderUtils.java b/user-service/src/main/java/com/mh/user/utils/SendOrderUtils.java index ffe3971..1bead1f 100644 --- a/user-service/src/main/java/com/mh/user/utils/SendOrderUtils.java +++ b/user-service/src/main/java/com/mh/user/utils/SendOrderUtils.java @@ -167,26 +167,37 @@ public class SendOrderUtils { String deviceType=deviceCodeParamEntity.getDeviceType(); String registerAddr=deviceCodeParamEntity.getRegisterAddr(); String sendStr=null; - if (deviceType.equals("电表")){ - sendStr = GetReadOrder485.createMeterOrder(deviceCodeParamEntity); - }else if (deviceType.equals("水表")){ - sendStr = GetReadOrder485.createWtMeterOrder(deviceCodeParamEntity); - }else if (deviceType.equals("压变")){ - sendStr = GetReadOrder485.createPressureOrder(deviceCodeParamEntity); - }else if (deviceType.equals("热泵")){ - sendStr = GetReadOrder485.createPumpOrder(deviceCodeParamEntity); - }else if (deviceType.equals("温控")){ - sendStr = GetReadOrder485.createTempOrder(deviceCodeParamEntity); - }else if (deviceType.equals("时控")){ - sendStr = GetReadOrder485.createTimeSetOrder(deviceCodeParamEntity); - }else if (deviceType.equals("水位开关")){ - sendStr = GetReadOrder485.createRelayOrder(deviceCodeParamEntity); - }else if (deviceType.equals("状态检测")){ - sendStr = GetReadOrder485.createStateOrder(deviceCodeParamEntity); - }else if (deviceType.equals("温度变送器")){ - sendStr = GetReadOrder485.createMulTempOrder(deviceCodeParamEntity); - }else if (deviceType.equals("热泵状态")){ - sendStr = GetReadOrder485.createPumpStateOrder(deviceCodeParamEntity); + switch (deviceType) { + case "电表": + sendStr = GetReadOrder485.createMeterOrder(deviceCodeParamEntity); + break; + case "水表": + sendStr = GetReadOrder485.createWtMeterOrder(deviceCodeParamEntity); + break; + case "压变": + sendStr = GetReadOrder485.createPressureOrder(deviceCodeParamEntity); + break; + case "热泵": + sendStr = GetReadOrder485.createPumpOrder(deviceCodeParamEntity); + break; + case "温控": + sendStr = GetReadOrder485.createTempOrder(deviceCodeParamEntity); + break; + case "时控": + sendStr = GetReadOrder485.createTimeSetOrder(deviceCodeParamEntity); + break; + case "水位开关": + sendStr = GetReadOrder485.createRelayOrder(deviceCodeParamEntity); + break; + case "状态检测": + sendStr = GetReadOrder485.createStateOrder(deviceCodeParamEntity); + break; + case "温度变送器": + sendStr = GetReadOrder485.createMulTempOrder(deviceCodeParamEntity); + break; + case "热泵状态": + sendStr = GetReadOrder485.createPumpStateOrder(deviceCodeParamEntity); + break; } return sendStr;