|
|
|
|
@ -0,0 +1,495 @@
|
|
|
|
|
package com.mh.framework.netty; |
|
|
|
|
|
|
|
|
|
import com.alibaba.fastjson2.JSONObject; |
|
|
|
|
import com.mh.common.constant.Constants; |
|
|
|
|
import com.mh.common.core.domain.entity.CollectionParamsManage; |
|
|
|
|
import com.mh.common.core.redis.RedisCache; |
|
|
|
|
import com.mh.common.model.request.AdvantechDatas; |
|
|
|
|
import com.mh.common.model.request.AdvantechReceiver; |
|
|
|
|
import com.mh.common.model.response.ParseResult; |
|
|
|
|
import com.mh.common.utils.DateUtils; |
|
|
|
|
import com.mh.common.utils.NettyTools; |
|
|
|
|
import com.mh.common.utils.SendOrderUtils; |
|
|
|
|
import com.mh.common.utils.spring.SpringUtils; |
|
|
|
|
import com.mh.framework.netty.session.ServerSession; |
|
|
|
|
import com.mh.framework.netty.session.SessionMap; |
|
|
|
|
import com.mh.framework.netty.task.CallbackTask; |
|
|
|
|
import com.mh.framework.netty.task.CallbackTaskScheduler; |
|
|
|
|
import com.mh.framework.rabbitmq.producer.SendMsgByTopic; |
|
|
|
|
import com.mh.system.service.device.ICollectionParamsManageService; |
|
|
|
|
import com.mh.system.service.device.IGatewayManageService; |
|
|
|
|
import com.mh.system.service.strategy.DeviceParserFactory; |
|
|
|
|
import com.mh.system.service.strategy.IDeviceParser; |
|
|
|
|
import io.netty.buffer.ByteBuf; |
|
|
|
|
import io.netty.channel.ChannelHandlerContext; |
|
|
|
|
import io.netty.channel.ChannelInboundHandlerAdapter; |
|
|
|
|
import io.netty.handler.timeout.IdleState; |
|
|
|
|
import io.netty.handler.timeout.IdleStateEvent; |
|
|
|
|
import io.netty.util.ReferenceCountUtil; |
|
|
|
|
import lombok.extern.slf4j.Slf4j; |
|
|
|
|
|
|
|
|
|
import java.math.BigDecimal; |
|
|
|
|
import java.util.ArrayList; |
|
|
|
|
import java.util.Date; |
|
|
|
|
import java.util.List; |
|
|
|
|
import java.util.concurrent.TimeUnit; |
|
|
|
|
import java.util.concurrent.atomic.AtomicInteger; |
|
|
|
|
|
|
|
|
|
@Slf4j |
|
|
|
|
public class NewEchoServerHandler extends ChannelInboundHandlerAdapter { |
|
|
|
|
|
|
|
|
|
private final IGatewayManageService gatewayManageService = SpringUtils.getBean(IGatewayManageService.class); |
|
|
|
|
private final ICollectionParamsManageService collectionParamsManageService = SpringUtils.getBean(ICollectionParamsManageService.class); |
|
|
|
|
private final SendMsgByTopic sendMsgByTopic = SpringUtils.getBean(SendMsgByTopic.class); |
|
|
|
|
private final RedisCache redisCache = SpringUtils.getBean(RedisCache.class); |
|
|
|
|
private final DeviceParserFactory parserFactory = SpringUtils.getBean(DeviceParserFactory.class); |
|
|
|
|
|
|
|
|
|
private int idleCount = 1; |
|
|
|
|
// ✅ 优化1:使用AtomicInteger保证线程安全
|
|
|
|
|
private final AtomicInteger num = new AtomicInteger(0); |
|
|
|
|
private volatile int size = 0; |
|
|
|
|
private String receiveStr = ""; |
|
|
|
|
private volatile List<CollectionParamsManage> deviceCodeParamList; |
|
|
|
|
|
|
|
|
|
// ✅ 优化2:采集状态追踪(用于监控和调试)
|
|
|
|
|
private volatile long lastCollectTime = 0; |
|
|
|
|
private volatile int totalCollectCount = 0; |
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
|
public void channelActive(ChannelHandlerContext ctx) throws Exception { |
|
|
|
|
log.info("客户端连接: {}", ctx.channel().remoteAddress()); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
|
public void userEventTriggered(ChannelHandlerContext ctx, Object obj) throws Exception { |
|
|
|
|
if (obj instanceof IdleStateEvent) { |
|
|
|
|
IdleStateEvent event = (IdleStateEvent) obj; |
|
|
|
|
if (IdleState.READER_IDLE.equals(event.state())) { |
|
|
|
|
log.warn("读取超时,第{}次未收到设备响应", idleCount); |
|
|
|
|
receiveStr = ""; |
|
|
|
|
|
|
|
|
|
// ✅ 优化3:使用原子操作递增num,避免卡在同一个设备上
|
|
|
|
|
incrementNum(); |
|
|
|
|
|
|
|
|
|
// 检查是否有控制指令正在执行
|
|
|
|
|
if (redisCache.hasKey("order_send")) { |
|
|
|
|
log.warn("读取超时且有控制指令在执行,跳过发送采集指令, num={}", num.get()); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// 继续发送下一个采集指令
|
|
|
|
|
sendCollectOrder(ctx); |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
super.userEventTriggered(ctx, obj); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
|
public void channelRead(ChannelHandlerContext ctx, Object msg) { |
|
|
|
|
try { |
|
|
|
|
ByteBuf buf = (ByteBuf) msg; |
|
|
|
|
byte[] bytes = new byte[buf.readableBytes()]; |
|
|
|
|
buf.readBytes(bytes); |
|
|
|
|
buf.clear(); |
|
|
|
|
|
|
|
|
|
if (bytes.length <= 1024) { |
|
|
|
|
receiveStr = receiveStr + bytesToHexString(bytes); |
|
|
|
|
receiveStr = receiveStr.replace("null", "").replace(" ", ""); |
|
|
|
|
} |
|
|
|
|
} catch (Exception e) { |
|
|
|
|
log.error("channelRead异常", e); |
|
|
|
|
} finally { |
|
|
|
|
ReferenceCountUtil.release(msg); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
|
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { |
|
|
|
|
receiveStr = receiveStr.toUpperCase(); |
|
|
|
|
|
|
|
|
|
try { |
|
|
|
|
if (isHeartbeat(receiveStr)) { |
|
|
|
|
// 处理心跳
|
|
|
|
|
handleHeartbeat(ctx); |
|
|
|
|
} else if (shouldParseData(receiveStr)) { |
|
|
|
|
// 处理数据解析
|
|
|
|
|
handleDataParsing(ctx); |
|
|
|
|
} else if (isControlResponse(receiveStr)) { |
|
|
|
|
// 处理控制返回
|
|
|
|
|
handleControlResponse(ctx); |
|
|
|
|
} |
|
|
|
|
} catch (Exception e) { |
|
|
|
|
log.error("数据处理异常: {}", receiveStr, e); |
|
|
|
|
} finally { |
|
|
|
|
ctx.flush(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private boolean isHeartbeat(String data) { |
|
|
|
|
return data.length() == 8 && data.startsWith("2400"); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private boolean shouldParseData(String data) { |
|
|
|
|
return (data.length() == 18 || data.length() == 12 || data.length() == 14) |
|
|
|
|
&& !redisCache.hasKey("order_send_read"); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private boolean isControlResponse(String data) { |
|
|
|
|
return data.length() == 16 || (data.length() > 20 && data.length() < 100); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private void handleHeartbeat(ChannelHandlerContext ctx) throws InterruptedException { |
|
|
|
|
dealSession(ctx); |
|
|
|
|
idleCount = 1; |
|
|
|
|
// String port = receiveStr.substring(4, 8);
|
|
|
|
|
gatewayManageService.updateGatewayStatus(receiveStr); |
|
|
|
|
|
|
|
|
|
if (!redisCache.hasKey(receiveStr)) { |
|
|
|
|
collectionParamsManageService.createDtuCollectionParams(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
List<CollectionParamsManage> cachedList = redisCache.getCacheObject(receiveStr); |
|
|
|
|
if (cachedList != null) { |
|
|
|
|
deviceCodeParamList = cachedList; |
|
|
|
|
size = deviceCodeParamList.size(); |
|
|
|
|
receiveStr = ""; |
|
|
|
|
// ✅ 优化4:心跳包重置num为0
|
|
|
|
|
num.set(0); |
|
|
|
|
|
|
|
|
|
if (size > 0 && idleCount < 2) { |
|
|
|
|
Thread.sleep(200); |
|
|
|
|
sendCollectOrder(ctx); |
|
|
|
|
idleCount++; |
|
|
|
|
} else { |
|
|
|
|
ctx.channel().close(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private void handleDataParsing(ChannelHandlerContext ctx) throws InterruptedException { |
|
|
|
|
idleCount = 1; |
|
|
|
|
|
|
|
|
|
if (redisCache.hasKey("order_send_read") && redisCache.hasKey("order_send_register")) { |
|
|
|
|
handleSpecialReadResponse(); |
|
|
|
|
} else { |
|
|
|
|
nextSendOrder(ctx); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private void handleSpecialReadResponse() { |
|
|
|
|
Object orderSendRegister = redisCache.getCacheObject("order_send_register"); |
|
|
|
|
String[] split = String.valueOf(orderSendRegister).split("_"); |
|
|
|
|
|
|
|
|
|
CollectionParamsManage params = new CollectionParamsManage(); |
|
|
|
|
params.setDataType(Integer.valueOf(split[4])); |
|
|
|
|
params.setParamType(split[3]); |
|
|
|
|
params.setOtherName(split[5]); |
|
|
|
|
params.setQuality("0"); |
|
|
|
|
|
|
|
|
|
parseAndSendData(receiveStr, params); |
|
|
|
|
redisCache.deleteObject("order_send_read"); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private void handleControlResponse(ChannelHandlerContext ctx) throws InterruptedException { |
|
|
|
|
idleCount = 1; |
|
|
|
|
nextSendOrder(ctx); |
|
|
|
|
controlOrder(ctx); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private void nextSendOrder(ChannelHandlerContext ctx) throws InterruptedException { |
|
|
|
|
// 先解析当前接收到的数据(如果不是控制响应)
|
|
|
|
|
if (receiveStr.length() != 16 && receiveStr.length() < 20) { |
|
|
|
|
parseAndSendData(receiveStr, deviceCodeParamList.get(num.get())); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// 清空接收字符串
|
|
|
|
|
receiveStr = ""; |
|
|
|
|
|
|
|
|
|
// 判断是否有远程指令发送
|
|
|
|
|
if (redisCache.hasKey("order_send")) { |
|
|
|
|
log.warn("有远程设置指令发送,暂停采集,但保持num递增以维持轮询顺序, currentNum={}", num.get()); |
|
|
|
|
// ✅ 优化5:即使有控制指令,也要递增num,避免数据采集断层
|
|
|
|
|
incrementNum(); |
|
|
|
|
// 不发送下一个采集指令,等待控制指令完成
|
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// 正常情况:递增num并发送下一个采集指令
|
|
|
|
|
incrementNum(); |
|
|
|
|
sendCollectOrder(ctx); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* 核心方法:使用策略模式解析数据 |
|
|
|
|
*/ |
|
|
|
|
private void parseAndSendData(String rawData, CollectionParamsManage params) { |
|
|
|
|
try { |
|
|
|
|
if (!parserFactory.hasParser(params.getParamType(), params.getMtType())) { |
|
|
|
|
log.warn("未找到对应的解析器, paramType: {}", params.getParamType()); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
IDeviceParser parser = parserFactory.getParser(params.getParamType(), params.getMtType()); |
|
|
|
|
ParseResult result = parser.parse(rawData, params); |
|
|
|
|
|
|
|
|
|
if (!result.isSuccess()) { |
|
|
|
|
log.warn("数据解析失败: {}, 原因: {}", rawData, result.getErrorMessage()); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
sendDataToMQ(result, params); |
|
|
|
|
|
|
|
|
|
} catch (Exception e) { |
|
|
|
|
log.error("数据解析异常: {}", rawData, e); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private void sendDataToMQ(ParseResult result, CollectionParamsManage params) { |
|
|
|
|
AdvantechReceiver receiver = new AdvantechReceiver(); |
|
|
|
|
receiver.setTs(DateUtils.dateToString(new Date(), Constants.DATE_FORMAT)); |
|
|
|
|
|
|
|
|
|
List<AdvantechDatas> dataList = new ArrayList<>(); |
|
|
|
|
AdvantechDatas data = new AdvantechDatas(); |
|
|
|
|
data.setValue(result.getValue()); |
|
|
|
|
data.setTag(result.getTagName()); |
|
|
|
|
data.setQuality(result.getQuality()); |
|
|
|
|
dataList.add(data); |
|
|
|
|
|
|
|
|
|
receiver.setD(dataList); |
|
|
|
|
sendMsgByTopic.sendToDeviceMQ(JSONObject.toJSONString(receiver)); |
|
|
|
|
|
|
|
|
|
// 特殊业务逻辑:贵宾楼水箱温度
|
|
|
|
|
if ("2".equals(params.getGatewayId()) && params.getOtherName().contains("实际温度")) { |
|
|
|
|
sendExtraTemperatureData(result.getValue()); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private void sendExtraTemperatureData(BigDecimal temperature) { |
|
|
|
|
AdvantechReceiver receiver = new AdvantechReceiver(); |
|
|
|
|
receiver.setTs(DateUtils.dateToString(new Date(), Constants.DATE_FORMAT)); |
|
|
|
|
|
|
|
|
|
List<AdvantechDatas> dataList = new ArrayList<>(); |
|
|
|
|
AdvantechDatas data = new AdvantechDatas(); |
|
|
|
|
data.setValue(temperature); |
|
|
|
|
data.setTag("贵宾楼水箱-温度"); |
|
|
|
|
data.setQuality(0); |
|
|
|
|
dataList.add(data); |
|
|
|
|
|
|
|
|
|
receiver.setD(dataList); |
|
|
|
|
sendMsgByTopic.sendToDeviceMQ(JSONObject.toJSONString(receiver)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private void controlOrder(ChannelHandlerContext ctx) { |
|
|
|
|
if (!redisCache.hasKey("order_send")) { |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
Object orderSend = redisCache.getCacheObject("order_send"); |
|
|
|
|
String orderSendStr = String.valueOf(orderSend); |
|
|
|
|
String orderSendRegisterStr = ""; |
|
|
|
|
|
|
|
|
|
if (redisCache.hasKey("order_send_register")) { |
|
|
|
|
Object orderSendRegister = redisCache.getCacheObject("order_send_register"); |
|
|
|
|
String[] split = String.valueOf(orderSendRegister).split("_"); |
|
|
|
|
orderSendRegisterStr = split[1]; |
|
|
|
|
} else { |
|
|
|
|
orderSendRegisterStr = orderSendStr.substring(4, 8); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
String readOrder = buildReadOrder(orderSendStr, orderSendRegisterStr); |
|
|
|
|
if (readOrder != null && !readOrder.isEmpty()) { |
|
|
|
|
redisCache.setCacheObject("order_send_read", readOrder, 10, TimeUnit.SECONDS); |
|
|
|
|
ctx.writeAndFlush(com.mh.common.utils.ModbusUtils.createByteBuf(readOrder)); |
|
|
|
|
log.info("发送读取指令: {}", readOrder); |
|
|
|
|
|
|
|
|
|
NettyTools.setReceiveMsg("order_wait", receiveStr); |
|
|
|
|
redisCache.deleteObject("order_send"); |
|
|
|
|
// ⚠️ 关键修复:不要在这里清空receiveStr,让后续流程处理
|
|
|
|
|
// receiveStr = ""; // 删除这行
|
|
|
|
|
|
|
|
|
|
try { |
|
|
|
|
Thread.sleep(500); |
|
|
|
|
} catch (InterruptedException e) { |
|
|
|
|
Thread.currentThread().interrupt(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// ✅ 新增:控制指令发送后,立即恢复采集轮询
|
|
|
|
|
resumeCollectionAfterControl(ctx); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* 控制指令完成后恢复数据采集 |
|
|
|
|
* 确保num不会因控制指令而停滞 |
|
|
|
|
*/ |
|
|
|
|
private void resumeCollectionAfterControl(ChannelHandlerContext ctx) { |
|
|
|
|
// ✅ 优化6:递增num到下一个设备
|
|
|
|
|
incrementNum(); |
|
|
|
|
|
|
|
|
|
log.info("控制指令完成,继续采集下一个设备,num={}, totalCollectCount={}", num.get(), totalCollectCount); |
|
|
|
|
|
|
|
|
|
// 延迟后发送下一个采集指令
|
|
|
|
|
try { |
|
|
|
|
Thread.sleep(500); // 等待控制指令响应稳定
|
|
|
|
|
sendCollectOrder(ctx); |
|
|
|
|
} catch (InterruptedException e) { |
|
|
|
|
Thread.currentThread().interrupt(); |
|
|
|
|
log.error("恢复采集中断", e); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private String buildReadOrder(String orderSendStr, String registerAddr) { |
|
|
|
|
if (receiveStr.contains(orderSendStr)) { |
|
|
|
|
return com.mh.common.utils.ModbusUtils.createReadOrder( |
|
|
|
|
orderSendStr.substring(0, 2), "03", registerAddr, "1" |
|
|
|
|
); |
|
|
|
|
} else if (receiveStr.substring(0, 8).equals(orderSendStr.substring(0, 8))) { |
|
|
|
|
return com.mh.common.utils.ModbusUtils.createReadOrder( |
|
|
|
|
orderSendStr.substring(0, 2), "03", registerAddr, "2" |
|
|
|
|
); |
|
|
|
|
} |
|
|
|
|
return null; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private void dealSession(ChannelHandlerContext ctx) { |
|
|
|
|
String deviceCode = receiveStr + ctx.channel().remoteAddress(); |
|
|
|
|
String meterNum = receiveStr; |
|
|
|
|
|
|
|
|
|
ServerSession session = new ServerSession(ctx.channel(), deviceCode); |
|
|
|
|
|
|
|
|
|
CallbackTaskScheduler.add(new CallbackTask<Boolean>() { |
|
|
|
|
@Override |
|
|
|
|
public Boolean execute() throws Exception { |
|
|
|
|
return action(session, deviceCode, ctx); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
|
public void onBack(Boolean result) { |
|
|
|
|
if (result) { |
|
|
|
|
log.info("设备保存会话: {}", session.getSessionId()); |
|
|
|
|
} else { |
|
|
|
|
SessionMap.inst().updateSession(deviceCode, session, meterNum); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
|
public void onException(Throwable t) { |
|
|
|
|
log.error("设备登录异常: {}", session.getSessionId(), t); |
|
|
|
|
ServerSession.closeSession(ctx); |
|
|
|
|
} |
|
|
|
|
}); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private boolean action(ServerSession session, String deviceCode, ChannelHandlerContext ctx) { |
|
|
|
|
checkUser(deviceCode, session); |
|
|
|
|
session.bind(); |
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private boolean checkUser(String deviceCode, ServerSession session) { |
|
|
|
|
if (SessionMap.inst().hasLogin(deviceCode)) { |
|
|
|
|
log.warn("设备已经登录: {}", deviceCode); |
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
|
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { |
|
|
|
|
log.error("通道异常", cause); |
|
|
|
|
ctx.close(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
|
public void channelInactive(ChannelHandlerContext ctx) throws Exception { |
|
|
|
|
log.info("客户端断开: {}", ctx.channel().remoteAddress()); |
|
|
|
|
ctx.close(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private String bytesToHexString(byte[] bytes) { |
|
|
|
|
StringBuilder sb = new StringBuilder(); |
|
|
|
|
for (byte b : bytes) { |
|
|
|
|
sb.append(String.format("%02X", b)); |
|
|
|
|
} |
|
|
|
|
return sb.toString(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* ✅ 优化7:原子性地递增num,处理循环边界 |
|
|
|
|
* 使用CAS操作保证线程安全 |
|
|
|
|
*/ |
|
|
|
|
private void incrementNum() { |
|
|
|
|
int currentNum; |
|
|
|
|
int nextNum; |
|
|
|
|
do { |
|
|
|
|
currentNum = num.get(); |
|
|
|
|
nextNum = currentNum + 1; |
|
|
|
|
if (nextNum >= size) { |
|
|
|
|
nextNum = 0; // 循环回到第一个设备
|
|
|
|
|
log.debug("num循环重置: {} -> 0, size={}", currentNum, size); |
|
|
|
|
} |
|
|
|
|
} while (!num.compareAndSet(currentNum, nextNum)); |
|
|
|
|
|
|
|
|
|
// 更新采集统计
|
|
|
|
|
lastCollectTime = System.currentTimeMillis(); |
|
|
|
|
totalCollectCount++; |
|
|
|
|
|
|
|
|
|
log.debug("num递增: {} -> {}, totalCollectCount={}", currentNum, nextNum, totalCollectCount); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* ✅ 优化8:统一的采集指令发送方法 |
|
|
|
|
* 包含边界检查和状态验证 |
|
|
|
|
*/ |
|
|
|
|
private void sendCollectOrder(ChannelHandlerContext ctx) { |
|
|
|
|
if (deviceCodeParamList == null || deviceCodeParamList.isEmpty()) { |
|
|
|
|
log.warn("设备参数列表为空,无法发送采集指令"); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
int currentIndex = num.get(); |
|
|
|
|
if (currentIndex < 0 || currentIndex >= size) { |
|
|
|
|
log.error("num索引越界: {}, size={}, 重置为0", currentIndex, size); |
|
|
|
|
num.set(0); |
|
|
|
|
currentIndex = 0; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
try { |
|
|
|
|
Thread.sleep(1000); // 控制采集频率
|
|
|
|
|
SendOrderUtils.sendAllOrder(deviceCodeParamList.get(currentIndex), ctx, currentIndex, size); |
|
|
|
|
log.debug("发送采集指令: num={}, deviceName={}", currentIndex, |
|
|
|
|
deviceCodeParamList.get(currentIndex).getOtherName()); |
|
|
|
|
} catch (InterruptedException e) { |
|
|
|
|
Thread.currentThread().interrupt(); |
|
|
|
|
log.error("发送采集指令中断", e); |
|
|
|
|
} catch (Exception e) { |
|
|
|
|
log.error("发送采集指令异常, num={}", currentIndex, e); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* ✅ 优化9:获取当前采集状态(用于监控) |
|
|
|
|
*/ |
|
|
|
|
public CollectionStats getCollectionStats() { |
|
|
|
|
CollectionStats stats = new CollectionStats(); |
|
|
|
|
stats.setCurrentNum(num.get()); |
|
|
|
|
stats.setSize(size); |
|
|
|
|
stats.setTotalCollectCount(totalCollectCount); |
|
|
|
|
stats.setLastCollectTime(lastCollectTime); |
|
|
|
|
stats.setDeviceCodeParamListSize(deviceCodeParamList != null ? deviceCodeParamList.size() : 0); |
|
|
|
|
return stats; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* 采集状态DTO(内部类) |
|
|
|
|
*/ |
|
|
|
|
@lombok.Data |
|
|
|
|
public static class CollectionStats { |
|
|
|
|
private int currentNum; |
|
|
|
|
private int size; |
|
|
|
|
private int totalCollectCount; |
|
|
|
|
private long lastCollectTime; |
|
|
|
|
private int deviceCodeParamListSize; |
|
|
|
|
} |
|
|
|
|
} |