中央热水项目
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

328 lines
11 KiB

package com.mh.user.job;
import com.mh.common.utils.StringUtils;
import com.mh.user.constants.Constant;
import com.mh.user.entity.AddCronJobReq;
import com.mh.user.entity.MqttSubscriptionEntity;
import com.mh.user.manage.QuartzManager;
import com.mh.user.netty.NettyEchoServer;
import com.mh.user.serialport.SerialPortListener;
import com.mh.user.serialport.SerialPortUtil;
import com.mh.user.serialport.SerialTool;
import com.mh.user.service.DeviceCodeParamService;
import com.mh.user.service.MqttSubscriptionService;
import com.mh.user.service.mqtt.service.IMqttTopicService;
import com.mh.user.utils.CacheUtil;
import com.mh.user.utils.ExchangeStringUtil;
import com.mh.user.utils.GetReadOrder485;
import gnu.io.SerialPort;
import gnu.io.SerialPortEvent;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @author ljf
* @title :
* @description : 自动执行采集程序,通过
* @updateTime 2020-05-15
* @throws :
*/
@Component
public class CollectionLoopRunner implements ApplicationRunner {
@Resource
QuartzManager quartzManager;
public static SerialPort serialPort = null;
@Resource
private DeviceCodeParamService deviceCodeParamService;
@Resource
private GetWeatherInfoJob getWeatherInfoJob;
@Resource
private MqttSubscriptionService iMqttSubscriptionService;
@Resource
private IMqttTopicService iMqttTopicService;
@Override
public void run(ApplicationArguments args) throws Exception {
// collectionMeterAndCloud();//采集
// Constant.WEB_FLAG=false; //恢复采集
// 初始化指令设备参数
initialDeviceCodeParams();
// 模拟采集
//simulationCollection();
// 获取天气数据
getWeatherInfoJob.getWeatherInfo();
// 启动netty端口
// NettyEchoServer nettyEchoServer = new NettyEchoServer();
// nettyEchoServer.bind(8098);
// 初始化mqtt订阅记录
initializeMqttSubscription();
}
/**
* 初始化mqtt订阅记录
*/
private void initializeMqttSubscription() {
MqttSubscriptionEntity mqttSubscription = new MqttSubscriptionEntity();
mqttSubscription.setStatus("0");
List<MqttSubscriptionEntity> mqttSubscriptions = iMqttSubscriptionService.selectMqttSubList(mqttSubscription);
for (MqttSubscriptionEntity subscription : mqttSubscriptions) {
try {
if (!StringUtils.isBlank(subscription.getTopic())) {
iMqttTopicService.subscribe(subscription.getTopic(), subscription.getQos());
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
private void simulationCollection() throws Exception {
purejavacomm.SerialPort serialPort1 = null;
do {
if (!Constant.WEB_FLAG) {
try {
// 采集水表
serialPort1 = SerialTool.openPort("COM12",
2400,
purejavacomm.SerialPort.DATABITS_8,
purejavacomm.SerialPort.STOPBITS_1,
purejavacomm.SerialPort.PARITY_NONE);
//从串口读取数据 SerialTool.sendToPort(SerialTool.HexString2Bytes(sendStr), serialPort, sendStr, deviceType);
// 81108739
String sendStr = "FE FE FE 68 10 39 87 10 81 00 00 00 01 03 90 1F 00 7C 16".replace(" ", "");
SerialTool.sendToPort(SerialTool.HexString2Bytes(sendStr), serialPort1, sendStr, "水表");
Thread.sleep(888);
byte[] bytes = SerialTool.readFromPort(serialPort1);
assert bytes != null;
String returnStr = ExchangeStringUtil.parseByte2HexStr(bytes);
System.out.println("响应报文==>" + returnStr);
serialPort1.close();
if (!Constant.WEB_FLAG) {
continue;
}
// 采集电表
serialPort1 = SerialTool.openPort("COM12",
1200,
purejavacomm.SerialPort.DATABITS_8,
purejavacomm.SerialPort.STOPBITS_1,
purejavacomm.SerialPort.PARITY_EVEN);
//从串口读取数据 SerialTool.sendToPort(SerialTool.HexString2Bytes(sendStr), serialPort, sendStr, deviceType);
// 080170000469
sendStr = "FE FE FE 68 69 04 00 70 01 08 68 01 02 43 C3 BF 16".replace(" ", "");
SerialTool.sendToPort(SerialTool.HexString2Bytes(sendStr), serialPort1, sendStr, "电表");
Thread.sleep(888);
bytes = SerialTool.readFromPort(serialPort1);
assert bytes != null;
returnStr = ExchangeStringUtil.parseByte2HexStr(bytes);
System.out.println("响应报文==>" + returnStr);
serialPort1.close();
} catch (Exception e) {
e.printStackTrace();
} finally {
if (null != serialPort1) {
serialPort1.close();
}
}
}
} while (true);
}
private void initialDeviceCodeParams() {
GetReadOrder485 getReadOrder485 = new GetReadOrder485();
int r=deviceCodeParamService.queryCount(); //查询记录数
if (r==0){
getReadOrder485.createOrderParam(); //生成采集参数
}
int r1 = deviceCodeParamService.queryCount2(); //查询记录数
if (r1 == 0) {
getReadOrder485.createOrderParam2(); //生成采集参数
}
int r2=deviceCodeParamService.queryCount3();//查询记录数
if (r2==0){
getReadOrder485.createOrderParam3(); //生成采集参数
}
// 分组参数缓存
CacheUtil instance = CacheUtil.getInstance();
}
public void test() throws Exception {
System.out.println("测试定时采集开关");
// 执行定义的定时采集程序
Map<String, String> params = new HashMap<>();
params.put("id", "2");
params.put("name", "test1");
//每 1 分钟 执行一次
AddCronJobReq addCronJobReq = new AddCronJobReq();
addCronJobReq.setDate("0/12 * * * * ?");
// addCronJobReq.setDate("0/5 * * * * ?");
addCronJobReq.setJobClass("JobTest1");
addCronJobReq.setJobGroupName("JobTestGroup1");
addCronJobReq.setJobName("Test1");
addCronJobReq.setParams(params);
addCronJobReq.setTriggerGroupName("triggerTestGroup1");
addCronJobReq.setTriggerName("triggerTest1");
quartzManager.addCronJob(addCronJobReq);
}
public void test1() throws Exception {
System.out.println("测试定时采集开关");
// 执行定义的定时采集程序
Map<String, String> params = new HashMap<>();
params.put("id", "1");
params.put("name", "test");
//每 1 分钟 执行一次
AddCronJobReq addCronJobReq = new AddCronJobReq();
addCronJobReq.setDate("0/10 * * * * ?");
// addCronJobReq.setDate("0/5 * * * * ?");
addCronJobReq.setJobClass("JobTest");
addCronJobReq.setJobGroupName("JobTestGroup");
addCronJobReq.setJobName("Test");
addCronJobReq.setParams(params);
addCronJobReq.setTriggerGroupName("triggerTestGroup");
addCronJobReq.setTriggerName("triggerTest");
quartzManager.addCronJob(addCronJobReq);
}
/**
* 定时采集电表和冷量计
*
* @throws Exception
*/
public void collectionMeterAndCloud() throws Exception {
// 执行定义的定时采集程序
Map<String, String> params = new HashMap<>();
params.put("id", "6");
params.put("name", "cloud_meter");
AddCronJobReq addCronJobReq = new AddCronJobReq();
//每 2小时执行一次
// addCronJobReq.setDate("0 0 0/2 * * ?"); //0 0 0/2 * * ?
//每 40 分钟 执行一次
// addCronJobReq.setDate("0 10/40 * * * ?");
//每 30 分钟 执行一次
// addCronJobReq.setDate("0 0/30 * * * ?"); //广州理工
//每 20 分钟 执行一次
// addCronJobReq.setDate("0 0/20 * * * ?");
//每 10 分钟 执行一次
addCronJobReq.setDate("0 0/10 * * * ?"); //华厦
// 每 3 分钟 执行一次
// addCronJobReq.setDate("35 0/2 * * * ?");
//每 2 分钟 执行一次
// addCronJobReq.setDate("25 0/2 * * * ?");
//每 1 分钟 执行一次
// addCronJobReq.setDate("0/10 * * * * ?");
// addCronJobReq.setDate("0/5 * * * * ?");
addCronJobReq.setJobClass("JobCloudAndMeter");
addCronJobReq.setJobGroupName("JobCloudAndMeterGroup");
addCronJobReq.setJobName("CloudAndMeter");
addCronJobReq.setParams(params);
addCronJobReq.setTriggerGroupName("triggerCloudAndMeterGroup");
addCronJobReq.setTriggerName("triggerCloudAndMeter");
quartzManager.addCronJob(addCronJobReq);
}
/**
* 定时采集冷量计
*
* @throws Exception
*/
public void collectionCloud() throws Exception {
// 执行定义的定时采集程序
Map<String, String> params = new HashMap<>();
params.put("id", "1");
params.put("name", "cloud");
AddCronJobReq addCronJobReq = new AddCronJobReq();
//每 2 分钟 执行一次
addCronJobReq.setDate("25 0/2 * * * ?");
// addCronJobReq.setDate("0/5 * * * * ?");
addCronJobReq.setJobClass("JobCloud");
addCronJobReq.setJobGroupName("JobCloudGroup");
addCronJobReq.setJobName("Cloud");
addCronJobReq.setParams(params);
addCronJobReq.setTriggerGroupName("triggerCloudGroup");
addCronJobReq.setTriggerName("triggerCloud");
quartzManager.addCronJob(addCronJobReq);
}
/**
* 定时采集电表
*
* @throws Exception
*/
public void collectionMeter() throws Exception {
// 执行定义的定时采集程序
Map<String, String> params = new HashMap<>();
params.put("id", "2");
params.put("name", "meter");
AddCronJobReq addCronJobReq = new AddCronJobReq();
//每 3 分钟 执行一次
addCronJobReq.setDate("35 0/2 * * * ?");
// addCronJobReq.setDate("0/20 * * * * ?");
addCronJobReq.setJobClass("JobMeter");
addCronJobReq.setJobGroupName("JobMeterGroup");
addCronJobReq.setJobName("Meter");
addCronJobReq.setParams(params);
addCronJobReq.setTriggerGroupName("triggerMeterGroup");
addCronJobReq.setTriggerName("triggerMeter");
quartzManager.addCronJob(addCronJobReq);
}
public void testSerialPort(){
String portname = "COM7";
//TestA();
//查看所有串口
SerialPortUtil serialPortUtil = SerialPortUtil.getSerialPortUtil();
ArrayList<String> port = serialPortUtil.findPort();
System.out.println("发现全部串口:" + port);
System.out.println("打开指定portname:" + portname);
//打开该对应portname名字的串口
CollectionLoopRunner.serialPort = serialPortUtil.openPort(portname, 2400, SerialPort.DATABITS_8, SerialPort.PARITY_EVEN, SerialPort.PARITY_ODD);
byte[] HEX = SerialPortListener.hexStr2Byte("FEFEFE6810398710810000000103901F007C16");
serialPortUtil.sendToPort(CollectionLoopRunner.serialPort, HEX);
//给对应的serialPort添加监听器
serialPortUtil.addListener(CollectionLoopRunner.serialPort, new SerialPortListener());
}
public void plcAnalytic() {
String portName = "COM1";
//查看所有串口
SerialPortUtil serialPortUtil = SerialPortUtil.getSerialPortUtil();
//打开该对应portName名字的串口
if (CollectionLoopRunner.serialPort == null) {
CollectionLoopRunner.serialPort = serialPortUtil.openPort(portName, 19200, SerialPort.DATABITS_8, SerialPort.PARITY_NONE, SerialPort.PARITY_ODD);
byte[] hex = SerialPortListener.hexStr2Byte("A55A230000000022".replaceAll("\\s*", ""));
serialPortUtil.sendToPort(CollectionLoopRunner.serialPort, hex);
//给对应的serialPort添加监听器-->设置串口的Listener
SerialPortUtil.setListenerToSerialPort(CollectionLoopRunner.serialPort, serialPortEvent -> {
if (serialPortEvent.getEventType() == SerialPortEvent.DATA_AVAILABLE) {
byte[] bytes = serialPortUtil.readFromPort(CollectionLoopRunner.serialPort);
String needData = SerialPortListener.printHexString(bytes);
//数据校验 校验数据长度、起始5A A5、数据sum位
// if (PlcUtil.makeChecksum(needData)) {
// //数据解析方法 此处编写你的解析方法,根据获取到的数据编写适合自己的方法
// }
}
});
}
}
}