Browse Source

优化采集结构

prod_202403
mh 11 months ago
parent
commit
40f7577927
  1. 62
      user-service/src/main/java/com/mh/user/job/DealDataJob.java
  2. 186
      user-service/src/main/java/com/mh/user/serialport/SerialPortSingle.java
  3. 4
      user-service/src/main/java/com/mh/user/serialport/SerialPortUtil.java
  4. 48
      user-service/src/main/java/com/mh/user/utils/ComThreadPoolService.java
  5. 51
      user-service/src/main/java/com/mh/user/utils/SendOrderUtils.java

62
user-service/src/main/java/com/mh/user/job/DealDataJob.java

@ -6,6 +6,7 @@ import com.mh.user.serialport.SerialPortThread;
import com.mh.user.service.DeviceCodeParamService;
import com.mh.user.service.DealDataService;
import com.mh.user.utils.AnalysisReceiveOrder485;
import com.mh.user.utils.ComThreadPoolService;
import com.mh.user.utils.GetReadOrder485;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@ -13,6 +14,7 @@ import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @author ljf
@ -37,6 +39,8 @@ public class DealDataJob {
this.dealDataService = dealDataService;
}
ThreadPoolExecutor comThreadPool = ComThreadPoolService.getInstance();
/**
* 定时处理汇总数据每15分钟处理一次,十分钟(0 0/10 * * * ?)
*/
@ -71,63 +75,27 @@ public class DealDataJob {
log.info("------定时采集开始>>>>Constant.FLAG=="+Constant.FLAG+"------");
if(!Constant.FLAG){
if(!Constant.WEB_FLAG){
log.info("------taskTimes=="+taskTimes+"------");
Constant.FLAG=true;
log.info("------Constant.WEB_FLAG=="+Constant.WEB_FLAG+"------");
log.info("------Constant.WEB_FLAG=="+ false +"------");
for (int i = 1; i <= 4; i++) {
String threadName = String.valueOf(i);
String threadName;
if (i == 1 || i == 3) {
threadName = "1";
log.info("------采集水位、水温!"+i+"------");
} else if (i == 2) {
threadName = "2";
log.info("------采集水、电、运行状态!"+i+"------");
} else {
threadName = "3";
log.info("------采集设定温度、设定水位、故障状态!"+i+"------");
}
for(int j=1;i<11;i++){
for(int j=1;j<11;j++){
SerialPortThread myThread = new SerialPortThread();
Thread thread = new Thread(myThread);
myThread.setName(threadName, String.valueOf(j));
thread.start();
comThreadPool.execute(thread);
}
}
// if (taskTimes<=4) {
// Constant.FLAG=true;
// log.info("------Constant.WEB_FLAG=="+Constant.WEB_FLAG+"------");
// if (taskTimes == 2) {//2
// for(int i=1;i<11;i++){
// SerialPortThread myThread = new SerialPortThread();
// Thread thread = new Thread(myThread);
// myThread.setName("2", String.valueOf(i));
// thread.start();
// }
// log.info("------采集水、电、运行状态!"+taskTimes+"------");
// }else if (taskTimes == 3){//3
// for(int i=1;i<11;i++){
// SerialPortThread myThread = new SerialPortThread();
// Thread thread = new Thread(myThread);
// myThread.setName("1", String.valueOf(i));
// thread.start();
// }
// log.info("------采集水位、水温!"+taskTimes+"------");
// }else if (taskTimes == 4) {//4
// for(int i=1;i<11;i++){
// SerialPortThread myThread = new SerialPortThread();
// Thread thread = new Thread(myThread);
// myThread.setName("3", String.valueOf(i));
// thread.start();
// }
// log.info("------采集设定温度、设定水位、故障状态!"+taskTimes+"------");
// }else {
// for(int i=1;i<11;i++){
// SerialPortThread myThread = new SerialPortThread();
// Thread thread = new Thread(myThread);
// myThread.setName("1", String.valueOf(i));
// thread.start();
// }
// log.info("------采集水位、水温!"+taskTimes+"------");
// }
// if(taskTimes<4){
// taskTimes++;
// }else{
// taskTimes=1;
// }
// }
}
}
} catch (Exception e) {

186
user-service/src/main/java/com/mh/user/serialport/SerialPortSingle.java

@ -14,6 +14,7 @@ import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;
//import purejavacomm.SerialPort;
import gnu.io.SerialPort;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
@ -29,10 +30,10 @@ import java.util.List;
@Slf4j
public class SerialPortSingle {
public SerialPort serialPort = null;
private String receiveStr = null;
private int baudrate=9600;
private String parity=null;
public SerialPort serialPort = null;
private String receiveStr = null;
private int baudrate = 9600;
private String parity = null;
// 调用service
ApplicationContext context = SpringBeanUtil.getApplicationContext();
@ -41,107 +42,111 @@ public class SerialPortSingle {
DeviceInstallService deviceInstallService = context.getBean(DeviceInstallService.class);
NowDataService nowDataService = context.getBean(NowDataService.class);
BuildingService buildingService = context.getBean(BuildingService.class);
public String serialPortSend(DeviceCodeParamEntity deviceCodeParamEntity){
public String serialPortSend(DeviceCodeParamEntity deviceCodeParamEntity) {
//查看所有串口
SerialPortUtil serialPortUtil = SerialPortUtil.getSerialPortUtil();
ArrayList<String> port = serialPortUtil.findPort();
// SerialTool serialPortUtil = SerialTool.getSerialPortUtil();
// ArrayList<String> port = serialPortUtil.findPort();
String rtData="";
String rtData = "";
// System.out.println("发现全部串口:" + port);
String comName=deviceCodeParamEntity.getDataCom().toUpperCase();
if (port.contains(comName)){
try{
try{
baudrate=deviceCodeParamEntity.getBaudrate();
parity=deviceCodeParamEntity.getParity();
if (parity==null || parity.equals("") || parity.equalsIgnoreCase("none")){
serialPort = serialPortUtil.openPort(comName, baudrate, SerialPort.DATABITS_8, SerialPort.PARITY_NONE, SerialPort.PARITY_ODD);
}else{
serialPort = serialPortUtil.openPort(comName, baudrate, SerialPort.DATABITS_8, SerialPort.PARITY_EVEN, SerialPort.PARITY_ODD);
}
//向串口发送指令
log.info("-----------------------------单抄向串口"+serialPort+"发送指令!-----------------------------");
SendOrderUtils.sendSerialPort(deviceCodeParamEntity, serialPort);
Thread.sleep(1500);
}catch(Exception e){
}
//对返回数据进行相关解析处理
receiveStr=null;
byte[] bytes = serialPortUtil.readFromPort(serialPort); //读取串口数据
try {
String byteStr = new String(bytes, 0, bytes.length).trim();
receiveStr = receiveStr + printHexString(bytes);
//去掉空格和null
receiveStr = receiveStr.replace("null", "");
receiveStr = receiveStr.replace(" ", "");
log.info("串口"+serialPort+"接收数据:" + receiveStr + ",大小: " + receiveStr.length());
} catch (NullPointerException e) {
serialPortUtil.closePort(serialPort);
log.info("单抄串口"+serialPort+"异常,没有数据返回!关闭串口");
String comName = deviceCodeParamEntity.getDataCom().toUpperCase();
if (!port.contains(comName)) {
log.info("串口:" + comName + "不存在!");
return "fail";
}
try {
try {
baudrate = deviceCodeParamEntity.getBaudrate();
parity = deviceCodeParamEntity.getParity();
if (parity == null || parity.equals("") || parity.equalsIgnoreCase("none")) {
serialPort = serialPortUtil.openPort(comName, baudrate, SerialPort.DATABITS_8, SerialPort.PARITY_NONE, SerialPort.PARITY_ODD);
} else {
serialPort = serialPortUtil.openPort(comName, baudrate, SerialPort.DATABITS_8, SerialPort.PARITY_EVEN, SerialPort.PARITY_ODD);
}
//返回值全部变成大写
String receiveData = receiveStr.toUpperCase();
//截取去掉FE
String dataStr = receiveData.replace("FE", "");
String deviceType=deviceCodeParamEntity.getDeviceType();
String deviceAddr=deviceCodeParamEntity.getDeviceAddr();
String registerAddr=deviceCodeParamEntity.getRegisterAddr();
String brand=deviceCodeParamEntity.getBrand();
String buildingId=deviceCodeParamEntity.getBuildingId();
String buildingName=buildingService.queryBuildingName(buildingId); //查询楼栋名称
//向串口发送指令
log.info("-----------------------------单抄向串口" + serialPort + "发送指令!-----------------------------");
SendOrderUtils.sendSerialPort(deviceCodeParamEntity, serialPort);
Thread.sleep(1500);
} catch (Exception e) {
log.error("前端设置出现异常==>", e);
return "fail";
}
//对返回数据进行相关解析处理
receiveStr = null;
byte[] bytes = serialPortUtil.readFromPort(serialPort); //读取串口数据
if (null == bytes) {
serialPortUtil.closePort(serialPort);
log.info("单抄串口" + serialPort + "异常,没有数据返回!关闭串口");
return "fail";
}
receiveStr = receiveStr + printHexString(bytes);
//去掉空格和null
receiveStr = receiveStr.replace("null", "");
receiveStr = receiveStr.replace(" ", "");
log.info("串口" + serialPort + "接收数据:" + receiveStr + ",大小: " + receiveStr.length());
//返回值全部变成大写
String receiveData = receiveStr.toUpperCase();
//截取去掉FE
String dataStr = receiveData.replace("FE", "");
String deviceType = deviceCodeParamEntity.getDeviceType();
String deviceAddr = deviceCodeParamEntity.getDeviceAddr();
String registerAddr = deviceCodeParamEntity.getRegisterAddr();
String brand = deviceCodeParamEntity.getBrand();
String buildingId = deviceCodeParamEntity.getBuildingId();
String buildingName = buildingService.queryBuildingName(buildingId); //查询楼栋名称
deviceInstallService.updateOnline(deviceAddr,deviceType,buildingId,"在线"); //设备在线
log.info(deviceType+"在线,设备号:"+deviceAddr+",所属楼栋:"+buildingName);
if (deviceType.equals("热泵")){
String strState=nowDataService.selectState(buildingId,deviceAddr);
if (strState!=null && strState.equals("离线")){ //采集到数据
nowDataService.updateRunState(buildingId,deviceAddr,"不运行"); //监控界面状态表热泵在线状态
}
deviceInstallService.updateOnline(deviceAddr, deviceType, buildingId, "在线"); //设备在线
log.info(deviceType + "在线,设备号:" + deviceAddr + ",所属楼栋:" + buildingName);
if (deviceType.equals("热泵")) {
String strState = nowDataService.selectState(buildingId, deviceAddr);
if (strState != null && strState.equals("离线")) { //采集到数据
nowDataService.updateRunState(buildingId, deviceAddr, "不运行"); //监控界面状态表热泵在线状态
}
try{
if ((dataStr.length() == 18 || dataStr.length() == 70 || dataStr.length() == 44) && deviceType.equals("水表")) {
rtData=analysisReceiveOrder485.analysisWtMeterOrder4852(dataStr,registerAddr,brand,buildingId);
} else if ((dataStr.length() == 36 || dataStr.length() == 44 || dataStr.length()==40 || dataStr.length()==50) && deviceType.equals("电表")) {
rtData=analysisReceiveOrder485.analysisMeterOrder4852(dataStr,registerAddr,brand,buildingId);
} else if (deviceType.equals("压变")) {
rtData=analysisReceiveOrder485.analysisPressureOrder4852(dataStr,registerAddr,brand,buildingId);
} else if ((dataStr.length() == 30) && deviceType.equals("状态检测")) {//五路状态读取,兼容旧版系统
analysisReceiveOrder485.analysisStateOrder485(dataStr,registerAddr,brand,buildingId);
} else if (deviceType.equals("水位开关") && (registerAddr.equals("0018") || registerAddr.equals("0017"))) {
rtData=analysisReceiveOrder485.analysisRelayOrder4852(dataStr,registerAddr,brand,buildingId);
} else if (deviceType.equals("热泵")) {
rtData=analysisReceiveOrder485.analysisPumpOrder4852(dataStr,registerAddr,brand,buildingId);
} else if (deviceType.equals("时控")) {
rtData=analysisReceiveOrder485.analysisTimeSetOrder4852(dataStr,registerAddr,brand,buildingId);
} else if (deviceType.equals("水位开关") && registerAddr.equals("0010")){ //热泵状态
rtData=analysisReceiveOrder485.analysisPumpStateOrder2(dataStr,registerAddr,brand,buildingId);
} else if (deviceType.equals("温度变送器")) {
rtData=analysisReceiveOrder485.analysisMulTempOrder4852(dataStr,registerAddr,brand,buildingId);
} else if (deviceType.equals("热泵状态")){
rtData=analysisReceiveOrder485.analysisPumpStateOrder2(dataStr,registerAddr,brand,buildingId);
}
}catch (Exception e){
log.error(deviceCodeParamEntity.getDeviceType()+"单抄保存数据库失败!");
serialPortUtil.closePort(serialPort);
}
try {
if ((dataStr.length() == 18 || dataStr.length() == 70 || dataStr.length() == 44) && deviceType.equals("水表")) {
rtData = analysisReceiveOrder485.analysisWtMeterOrder4852(dataStr, registerAddr, brand, buildingId);
} else if ((dataStr.length() == 36 || dataStr.length() == 44 || dataStr.length() == 40 || dataStr.length() == 50) && deviceType.equals("电表")) {
rtData = analysisReceiveOrder485.analysisMeterOrder4852(dataStr, registerAddr, brand, buildingId);
} else if (deviceType.equals("压变")) {
rtData = analysisReceiveOrder485.analysisPressureOrder4852(dataStr, registerAddr, brand, buildingId);
} else if ((dataStr.length() == 30) && deviceType.equals("状态检测")) {//五路状态读取,兼容旧版系统
analysisReceiveOrder485.analysisStateOrder485(dataStr, registerAddr, brand, buildingId);
} else if (deviceType.equals("水位开关") && (registerAddr.equals("0018") || registerAddr.equals("0017"))) {
rtData = analysisReceiveOrder485.analysisRelayOrder4852(dataStr, registerAddr, brand, buildingId);
} else if (deviceType.equals("热泵")) {
rtData = analysisReceiveOrder485.analysisPumpOrder4852(dataStr, registerAddr, brand, buildingId);
} else if (deviceType.equals("时控")) {
rtData = analysisReceiveOrder485.analysisTimeSetOrder4852(dataStr, registerAddr, brand, buildingId);
} else if (deviceType.equals("水位开关") && registerAddr.equals("0010")) { //热泵状态
rtData = analysisReceiveOrder485.analysisPumpStateOrder2(dataStr, registerAddr, brand, buildingId);
} else if (deviceType.equals("温度变送器")) {
rtData = analysisReceiveOrder485.analysisMulTempOrder4852(dataStr, registerAddr, brand, buildingId);
} else if (deviceType.equals("热泵状态")) {
rtData = analysisReceiveOrder485.analysisPumpStateOrder2(dataStr, registerAddr, brand, buildingId);
}
} catch (Exception e) {
log.error(deviceCodeParamEntity.getDeviceType() + "单抄保存数据库失败!");
serialPortUtil.closePort(serialPort);
System.out.println("关闭"+serialPort);
Thread.sleep(500);
log.info("-----------------------------"+serialPort+"单抄结束!-----------------------------");
return rtData;
}catch (Exception e){
// e.printStackTrace();
return "fail";
}
}else {
log.info("串口:"+comName+"不存在!");
serialPortUtil.closePort(serialPort);
System.out.println("关闭" + serialPort);
Thread.sleep(500);
log.info("-----------------------------" + serialPort + "单抄结束!-----------------------------");
return rtData;
} catch (Exception e) {
log.error("前端设置出现异常==>", e);
return "fail";
}
return "fail";
}
/**
* 字节数组转16进制字符串
*
* @param b 字节数组
* @return 16进制字符串
*/
@ -159,12 +164,13 @@ public class SerialPortSingle {
/**
* 十六进制字符串转byte[]
*
* @param hex 十六进制字符串
* @return byte[]
*/
public static byte[] hexStr2Byte(String hex) {
if (hex == null) {
return new byte[] {};
return new byte[]{};
}
// 奇数位补0
@ -183,8 +189,10 @@ public class SerialPortSingle {
}
return buffer.array();
}
/**
* 16进制转换成为string类型字符串
*
* @param s 待转换字符串
*/
public static String hexStringToString(String s) {

4
user-service/src/main/java/com/mh/user/serialport/SerialPortUtil.java

@ -23,9 +23,7 @@ public class SerialPortUtil {
static {
//在该类被ClassLoader加载时就初始化一个SerialTool对象
if (serialPortUtil == null) {
serialPortUtil = new SerialPortUtil();
}
serialPortUtil = new SerialPortUtil();
}
//私有化SerialTool类的构造方法,不允许其他类生成SerialTool对象

48
user-service/src/main/java/com/mh/user/utils/ComThreadPoolService.java

@ -0,0 +1,48 @@
package com.mh.user.utils;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author LJF
* @title 单例的线程池
* @description 使用静态内部类进行创建专门解析接收到的报文数据
* @updateTime 2020-12-09
* @throws
*/
public class ComThreadPoolService {
/** 线程池保持ALIVE状态线程数 */
public static final int CORE_POOL_SIZE = 10;
/** 线程池最大线程数 */
public static final int MAX_POOL_SIZE = 50;
/** 空闲线程回收时间 */
public static final int KEEP_ALIVE_TIME = 30000;
/** 线程池等待队列 */
public static final int BLOCKING_QUEUE_SIZE = 1000;
// 私有化构造器
private ComThreadPoolService(){}
// 对外访问的公共方法
public static ThreadPoolExecutor getInstance() {
return ThreadPoolServiceHolder.instance;
}
//写一个静态内部类,里面实例化外部类
private static class ThreadPoolServiceHolder {
private static final ThreadPoolExecutor instance = new ThreadPoolExecutor(
CORE_POOL_SIZE, // 线程池保持存活的线程数
MAX_POOL_SIZE, // 最大线程数
KEEP_ALIVE_TIME, // 空闲线程回收时间
TimeUnit.MICROSECONDS, // 单位
new LinkedBlockingQueue<>(BLOCKING_QUEUE_SIZE), // 线程队列
new ThreadPoolExecutor.AbortPolicy() // 线程池对拒绝任务的处理策略
);
}
}

51
user-service/src/main/java/com/mh/user/utils/SendOrderUtils.java

@ -167,26 +167,37 @@ public class SendOrderUtils {
String deviceType=deviceCodeParamEntity.getDeviceType();
String registerAddr=deviceCodeParamEntity.getRegisterAddr();
String sendStr=null;
if (deviceType.equals("电表")){
sendStr = GetReadOrder485.createMeterOrder(deviceCodeParamEntity);
}else if (deviceType.equals("水表")){
sendStr = GetReadOrder485.createWtMeterOrder(deviceCodeParamEntity);
}else if (deviceType.equals("压变")){
sendStr = GetReadOrder485.createPressureOrder(deviceCodeParamEntity);
}else if (deviceType.equals("热泵")){
sendStr = GetReadOrder485.createPumpOrder(deviceCodeParamEntity);
}else if (deviceType.equals("温控")){
sendStr = GetReadOrder485.createTempOrder(deviceCodeParamEntity);
}else if (deviceType.equals("时控")){
sendStr = GetReadOrder485.createTimeSetOrder(deviceCodeParamEntity);
}else if (deviceType.equals("水位开关")){
sendStr = GetReadOrder485.createRelayOrder(deviceCodeParamEntity);
}else if (deviceType.equals("状态检测")){
sendStr = GetReadOrder485.createStateOrder(deviceCodeParamEntity);
}else if (deviceType.equals("温度变送器")){
sendStr = GetReadOrder485.createMulTempOrder(deviceCodeParamEntity);
}else if (deviceType.equals("热泵状态")){
sendStr = GetReadOrder485.createPumpStateOrder(deviceCodeParamEntity);
switch (deviceType) {
case "电表":
sendStr = GetReadOrder485.createMeterOrder(deviceCodeParamEntity);
break;
case "水表":
sendStr = GetReadOrder485.createWtMeterOrder(deviceCodeParamEntity);
break;
case "压变":
sendStr = GetReadOrder485.createPressureOrder(deviceCodeParamEntity);
break;
case "热泵":
sendStr = GetReadOrder485.createPumpOrder(deviceCodeParamEntity);
break;
case "温控":
sendStr = GetReadOrder485.createTempOrder(deviceCodeParamEntity);
break;
case "时控":
sendStr = GetReadOrder485.createTimeSetOrder(deviceCodeParamEntity);
break;
case "水位开关":
sendStr = GetReadOrder485.createRelayOrder(deviceCodeParamEntity);
break;
case "状态检测":
sendStr = GetReadOrder485.createStateOrder(deviceCodeParamEntity);
break;
case "温度变送器":
sendStr = GetReadOrder485.createMulTempOrder(deviceCodeParamEntity);
break;
case "热泵状态":
sendStr = GetReadOrder485.createPumpStateOrder(deviceCodeParamEntity);
break;
}
return sendStr;

Loading…
Cancel
Save