diff --git a/mh-common/src/main/java/com/mh/common/core/domain/entity/AlarmRecords.java b/mh-common/src/main/java/com/mh/common/core/domain/entity/AlarmRecords.java index 45d38aa..37c9e63 100644 --- a/mh-common/src/main/java/com/mh/common/core/domain/entity/AlarmRecords.java +++ b/mh-common/src/main/java/com/mh/common/core/domain/entity/AlarmRecords.java @@ -88,6 +88,19 @@ public class AlarmRecords implements Serializable { */ private int isSend; + /** + * 推送次数,大于3次不再推送 + */ + private int sendNum; + + public int getSendNum() { + return sendNum; + } + + public void setSendNum(int sendNum) { + this.sendNum = sendNum; + } + public int getIsSend() { return isSend; } diff --git a/mh-framework/src/main/java/com/mh/framework/rabbitmq/RabbitMqConfig.java b/mh-framework/src/main/java/com/mh/framework/rabbitmq/RabbitMqConfig.java index d7d740c..5c2b11e 100644 --- a/mh-framework/src/main/java/com/mh/framework/rabbitmq/RabbitMqConfig.java +++ b/mh-framework/src/main/java/com/mh/framework/rabbitmq/RabbitMqConfig.java @@ -8,6 +8,9 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import java.util.HashMap; +import java.util.Map; + /** * @Author : Rainbow * @date : 2023/5/26 @@ -16,6 +19,12 @@ import org.springframework.context.annotation.Configuration; public class RabbitMqConfig { /**交换机*/ public static final String EXCHANGE_NAME = "exchange_eemcs"; + + /** + * 延迟队列 + */ + public static final String DELAY_EXCHANGE_NAME = "device.delayed.exchange"; + /**主机队列*/ public static final String QUEUE_CHILLER = "queue_chiller"; /**主机routing-key*/ @@ -34,6 +43,12 @@ public class RabbitMqConfig { /**温湿度 routing-key*/ public static final String ROUTING_KEY_TEMP = "topic.temp.eemcs.#"; + /** + * 报警队列 + */ + public static final String QUEUE_ALARM = "device.alarm.queue"; + + public static final String ROUTING_KEY_ALARM = "topic.alarm.eemcs.#"; /**durable参数表示交换机是否持久化,值为true表示持久化,值为false表示不持久化。 * 在RabbitMQ中,持久化交换机会被存储在磁盘上以便在服务器重启后恢复, @@ -120,4 +135,27 @@ public class RabbitMqConfig { return factory; } + // 延迟交换机(使用自定义类型x-delayed-message) + @Bean + public CustomExchange delayedExchange() { + Map args = new HashMap<>(); + args.put("x-delayed-type", "direct"); // 底层转发类型 + return new CustomExchange(DELAY_EXCHANGE_NAME, "x-delayed-message", true, false, args); + } + + // 报警队列 + @Bean + public Queue alarmQueue() { + return new Queue(QUEUE_ALARM, true); + } + + // 绑定延迟交换机与队列 + @Bean + public Binding binding(Queue alarmQueue, CustomExchange delayedExchange) { + return BindingBuilder.bind(alarmQueue) + .to(delayedExchange) + .with(ROUTING_KEY_ALARM) + .noargs(); + } + } diff --git a/mh-framework/src/main/java/com/mh/framework/rabbitmq/consumer/ReceiveHandler.java b/mh-framework/src/main/java/com/mh/framework/rabbitmq/consumer/ReceiveHandler.java index cfde10c..4108062 100644 --- a/mh-framework/src/main/java/com/mh/framework/rabbitmq/consumer/ReceiveHandler.java +++ b/mh-framework/src/main/java/com/mh/framework/rabbitmq/consumer/ReceiveHandler.java @@ -1,14 +1,14 @@ package com.mh.framework.rabbitmq.consumer; -import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONObject; +import com.mh.common.core.redis.RedisCache; import com.mh.common.model.request.AdvantechReceiver; import com.mh.common.model.request.OneTwoThreeTempData; import com.mh.framework.dealdata.DataProcessService; import com.mh.framework.rabbitmq.RabbitMqConfig; +import com.mh.system.service.operation.IAlarmRecordsService; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; -import org.apache.poi.ss.formula.functions.T; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; @@ -33,6 +33,14 @@ public class ReceiveHandler { @Autowired DataProcessService dataProcessService; + @Autowired + IAlarmRecordsService alarmRecordsService; + + @Autowired + private RedisCache redisTemplate; + + private static final String ALARM_CANCEL_PREFIX = "alarm:cancel:"; + /** * 监听主机参数 * queues:指定监听的队列名,可以接收单个队列,也可以接收多个队列的数组或列表。 @@ -165,4 +173,41 @@ public class ReceiveHandler { } } + /** + * 处理设备报警延时队列数据 + * + * @param msg + * @param channel + * @param tag + */ + @RabbitListener(bindings = @QueueBinding( + value = @Queue(value = RabbitMqConfig.QUEUE_ALARM, durable = "true"), + exchange = @Exchange( + value = RabbitMqConfig.DELAY_EXCHANGE_NAME, + ignoreDeclarationExceptions = "true", + type = ExchangeTypes.TOPIC + ), + key = {RabbitMqConfig.ROUTING_KEY_ALARM} + )) + public void handleAlarm(@Payload String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws InterruptedException, IOException { + try { + String deviceId = msg.split(":")[1]; + String cancelKey = ALARM_CANCEL_PREFIX + deviceId; +// // 检查Redis中是否存在取消标记 +// if (Boolean.TRUE.equals(redisTemplate.hasKey(cancelKey))) { +// redisTemplate.deleteObject(cancelKey); +// System.out.println("报警已取消: " + deviceId); +// return; +// } + // 执行生成报警数据 + alarmRecordsService.insertOrUpdateAlarmRecord(deviceId); + // 正常执行,手动确认ack + channel.basicAck(tag, false); + } catch (Exception e) { + log.error("data:{},ddcException:{}", msg, e); + Thread.sleep(100); + channel.basicAck(tag, false); + } + } + } diff --git a/mh-framework/src/main/java/com/mh/framework/rabbitmq/producer/SendMsgByTopic.java b/mh-framework/src/main/java/com/mh/framework/rabbitmq/producer/SendMsgByTopic.java index c9c1b54..5d5d005 100644 --- a/mh-framework/src/main/java/com/mh/framework/rabbitmq/producer/SendMsgByTopic.java +++ b/mh-framework/src/main/java/com/mh/framework/rabbitmq/producer/SendMsgByTopic.java @@ -3,6 +3,8 @@ package com.mh.framework.rabbitmq.producer; import com.mh.common.model.request.AdvantechReceiver; import com.mh.framework.rabbitmq.RabbitMqConfig; import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -40,4 +42,15 @@ public class SendMsgByTopic { rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_NAME,"topic.temp.eemcs.data",data); return "success"; } + + /**延迟队列注入rabbitmq*/ + public void sendDelayedAlarm(String deviceId, int delayMinutes) { + String message = "ALARM:" + deviceId; + + MessageProperties props = new MessageProperties(); + props.setHeader("x-delay", delayMinutes * 60 * 1000); // 延迟毫秒 + Message msg = new Message(message.getBytes(), props); + + rabbitTemplate.send(RabbitMqConfig.DELAY_EXCHANGE_NAME, RabbitMqConfig.ROUTING_KEY_ALARM, msg); + } } diff --git a/mh-quartz/src/main/java/com/mh/quartz/task/CreateAlarmTask.java b/mh-quartz/src/main/java/com/mh/quartz/task/CreateAlarmTask.java index f022b27..b38b989 100644 --- a/mh-quartz/src/main/java/com/mh/quartz/task/CreateAlarmTask.java +++ b/mh-quartz/src/main/java/com/mh/quartz/task/CreateAlarmTask.java @@ -6,18 +6,21 @@ import com.mh.common.core.domain.entity.AlarmRules; import com.mh.common.core.domain.entity.CollectionParamsManage; import com.mh.common.utils.BigDecimalUtils; import com.mh.common.utils.DateUtils; +import com.mh.framework.rabbitmq.producer.SendMsgByTopic; import com.mh.system.service.device.ICollectionParamsManageService; import com.mh.system.service.operation.IAlarmCodeService; import com.mh.system.service.operation.IAlarmRecordsService; import com.mh.system.service.operation.IAlarmRulesService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; import java.math.BigDecimal; import java.math.RoundingMode; import java.util.Date; import java.util.List; +import java.util.concurrent.TimeUnit; /** @@ -31,84 +34,25 @@ import java.util.List; @Component("createAlarmTask") public class CreateAlarmTask { - private final IAlarmRulesService alarmRulesService; - - private final IAlarmCodeService alarmCodeService; - - private final ICollectionParamsManageService collectionParamsManageService; private final IAlarmRecordsService alarmRecordService; + private final SendMsgByTopic sendMsgByTopic; + @Autowired - public CreateAlarmTask(IAlarmRulesService alarmRulesService, IAlarmCodeService alarmCodeService, ICollectionParamsManageService collectionParamsManageService, IAlarmRecordsService alarmRecordService) { - this.alarmRulesService = alarmRulesService; - this.alarmCodeService = alarmCodeService; - this.collectionParamsManageService = collectionParamsManageService; + public CreateAlarmTask(IAlarmRecordsService alarmRecordService, SendMsgByTopic sendMsgByTopic) { this.alarmRecordService = alarmRecordService; + this.sendMsgByTopic = sendMsgByTopic; } public void createAlarmTask() { log.info("创建报警记录"); - // 查询仪表报警规则记录,查看哪些规则启用了 - List alarmRules = alarmRulesService.selectAlarmRulesListByStatus(0); - // 循环查询报警规则,判断是否满足报警条件 - for (AlarmRules alarmRule : alarmRules) { - // 判断报警类型 - if ("0".equals(alarmRule.getAlarmType())) { - // 当前是越限事件 - // 查询事件类型查询对应的报警模板内容 - AlarmCode alarmCode = alarmCodeService.selectAlarmCodeByAlarmType(alarmRule.getEventType()); - // 获取当前采集参数值 - CollectionParamsManage collectionParamsManage = collectionParamsManageService.selectCollectionParamsManageById(alarmRule.getCpmId()); - // 判断当前值是否是当前事件 - AlarmRecords alarmRecords = new AlarmRecords(); - BigDecimal curValue = collectionParamsManage.getCurValue(); - Date curTime = collectionParamsManage.getCurTime(); - // 阈值 - String threshold1 = alarmRule.getThreshold1(); - if (alarmRule.getTimePeriodSet() == 0 && DateUtils.isSameDay(curTime, new Date())) { - // 执行相关操作 - insertOrUpdateRecord(alarmRule, curValue, threshold1, alarmCode, alarmRecords, collectionParamsManage); - } else if (alarmRule.getTimePeriodSet() == 1 - && DateUtils.isSameDay(collectionParamsManage.getCurTime(), new Date()) - && DateUtils.isCurrentTimeInRange(alarmRule.getBeginTime(), alarmRule.getEndTime(), curTime) - ) { - // 执行相关操作 - insertOrUpdateRecord(alarmRule, curValue, threshold1, alarmCode, alarmRecords, collectionParamsManage); - } - } - } - } - - private void insertOrUpdateRecord(AlarmRules alarmRule, BigDecimal curValue, String threshold1, AlarmCode alarmCode, AlarmRecords alarmRecords, CollectionParamsManage collectionParamsManage) { - boolean compare = BigDecimalUtils.compare(alarmRule.getCondition1(), curValue, new BigDecimal(threshold1)); - if (compare) { - // 创建报警记录 - String content = alarmCode.getMsgContent(); - content = content.replace("#{curValue}", curValue.setScale(1, RoundingMode.HALF_UP).toString()); - content = content.replace("#{setValue}", alarmRule.getCondition1() + threshold1); - alarmRecords.setContent(content); - alarmRecords.setAlarmType(alarmRule.getAlarmType()); - alarmRecords.setEventType(alarmRule.getEventType()); - alarmRecords.setAlarmLevel(alarmRule.getAlarmLevel()); - alarmRecords.setLedgerId(alarmRule.getLedgerId()); - alarmRecords.setCpmId(alarmRule.getCpmId()); - alarmRecords.setDeviceName(alarmRule.getDeviceName()); - alarmRecords.setCpmName(alarmRule.getCpmName()); - alarmRecords.setCreateTime(collectionParamsManage.getCurTime()); - // 判断报警记录是否已经存在 - AlarmRecords isExits = alarmRecordService.selectIsExist(alarmRecords); - if (isExits == null) { - alarmRecordService.insertAlarmRecord(alarmRecords); - } else { - // 更新报警记录 - isExits.setContent(content); - isExits.setCreateTime(collectionParamsManage.getCurTime()); - isExits.setStatus(0); - isExits.setIsSend(0); - alarmRecordService.updateAlarmRecord(isExits); - } + // 第一次判断 + String alarmTask = alarmRecordService.createAlarmTask(); + if (null == alarmTask) { + return; } + sendMsgByTopic.sendDelayedAlarm(alarmTask, 1); } } diff --git a/mh-quartz/src/main/java/com/mh/quartz/task/PushDataToWechatTask.java b/mh-quartz/src/main/java/com/mh/quartz/task/PushDataToWechatTask.java index 46e7ee2..132b7dc 100644 --- a/mh-quartz/src/main/java/com/mh/quartz/task/PushDataToWechatTask.java +++ b/mh-quartz/src/main/java/com/mh/quartz/task/PushDataToWechatTask.java @@ -52,37 +52,39 @@ public class PushDataToWechatTask { return; } alarmRecords1.forEach(alarmRecords2 -> { - // 查询需要推送的微信用户 - List> wechatUserList = wechatService.queryWechatUser(0); - if (null == wechatUserList || wechatUserList.isEmpty()) { - log.info("没有查询到微信用户"); - } - // 开始推送数据 - for (Map map : wechatUserList) { - PushMsgEntity pushMsgEntity = new PushMsgEntity(); - try { - // 判断模板id种类 - switch (map.get("template_id").toString()) { - case "fqAXCFXSBCqHLJjBLIjD-Wr_dN8RLsTcsatUQa3Ktx4": - // 设备异常告警提醒 - pushDeviceExceptionParams(map, pushMsgEntity, alarmRecords2); - break; - case "SiyBtZeZuF0Qo8V3NlvGwhc95-vX-a6wsvIxpAq3d_Y": - // 设备告警通知 - pushDeviceArmParams(map, pushMsgEntity, alarmRecords2); - break; - default: - break; - } - - } catch (Exception e) { - throw new RuntimeException(e); - } - if (!pushMsgEntity.getTouser().isEmpty()) { - String result = wechatService.pushMsg(pushMsgEntity); - if (Constants.SUCCESS.equals(result)) { - // 更新数据:已发送通知 - alarmRecordsService.updateAlarmRecordById(alarmRecords2.getId(), 1); + // 每个小时推送两次 + if (alarmRecords2.getSendNum() < 3) { + // 查询需要推送的微信用户 + List> wechatUserList = wechatService.queryWechatUser(0); + if (null != wechatUserList && !wechatUserList.isEmpty()) { + // 开始推送数据 + for (Map map : wechatUserList) { + PushMsgEntity pushMsgEntity = new PushMsgEntity(); + try { + // 判断模板id种类 + switch (map.get("template_id").toString()) { + case "fqAXCFXSBCqHLJjBLIjD-Wr_dN8RLsTcsatUQa3Ktx4": + // 设备异常告警提醒 + pushDeviceExceptionParams(map, pushMsgEntity, alarmRecords2); + break; + case "SiyBtZeZuF0Qo8V3NlvGwhc95-vX-a6wsvIxpAq3d_Y": + // 设备告警通知 + pushDeviceArmParams(map, pushMsgEntity, alarmRecords2); + break; + default: + break; + } + + } catch (Exception e) { + throw new RuntimeException(e); + } + if (!pushMsgEntity.getTouser().isEmpty()) { + String result = wechatService.pushMsg(pushMsgEntity); + if (Constants.SUCCESS.equals(result)) { + // 更新数据:已发送通知 + alarmRecordsService.updateAlarmRecordById(alarmRecords2.getId(), 1); + } + } } } } @@ -129,6 +131,7 @@ public class PushDataToWechatTask { /** * 推送设备异常告警提醒 + * * @param map * @param pushMsgEntity * @param alarmRecords2 @@ -154,7 +157,7 @@ public class PushDataToWechatTask { Key3 key3 = new Key3(); // 通过告警类型id得到value值 String alarmType = alarmRecords2.getAlarmType(); - String alarmType1 = DictUtils.getDictLabel( "alarm_type", alarmType); + String alarmType1 = DictUtils.getDictLabel("alarm_type", alarmType); key3.setValue(alarmType1); pushMsgEntity.setKey3(key3); diff --git a/mh-system/src/main/java/com/mh/system/mapper/operation/AlarmRecordsMapper.java b/mh-system/src/main/java/com/mh/system/mapper/operation/AlarmRecordsMapper.java index d0df964..6ae59c4 100644 --- a/mh-system/src/main/java/com/mh/system/mapper/operation/AlarmRecordsMapper.java +++ b/mh-system/src/main/java/com/mh/system/mapper/operation/AlarmRecordsMapper.java @@ -15,6 +15,6 @@ import org.apache.ibatis.annotations.Update; @Mapper public interface AlarmRecordsMapper extends BaseMapper { - @Update("update alarm_records set is_send = #{isSend} where id = #{id} ") + @Update("update alarm_records set is_send = #{isSend}, send_num = send_num + 1 where id = #{id} ") void updateIsSendById(String id, int isSend); } diff --git a/mh-system/src/main/java/com/mh/system/service/operation/IAlarmRecordsService.java b/mh-system/src/main/java/com/mh/system/service/operation/IAlarmRecordsService.java index 56cacb8..6d52711 100644 --- a/mh-system/src/main/java/com/mh/system/service/operation/IAlarmRecordsService.java +++ b/mh-system/src/main/java/com/mh/system/service/operation/IAlarmRecordsService.java @@ -26,4 +26,8 @@ public interface IAlarmRecordsService { int deleteAlarmRecordsByIds(String[] acdIds); void updateAlarmRecordById(String id, int i); + + String createAlarmTask(); + + void insertOrUpdateAlarmRecord(String cpmId); } diff --git a/mh-system/src/main/java/com/mh/system/service/operation/impl/AlarmRecordsServiceImpl.java b/mh-system/src/main/java/com/mh/system/service/operation/impl/AlarmRecordsServiceImpl.java index 0ceaf32..bfcd5a7 100644 --- a/mh-system/src/main/java/com/mh/system/service/operation/impl/AlarmRecordsServiceImpl.java +++ b/mh-system/src/main/java/com/mh/system/service/operation/impl/AlarmRecordsServiceImpl.java @@ -1,18 +1,31 @@ package com.mh.system.service.operation.impl; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; +import com.mh.common.core.domain.entity.AlarmCode; import com.mh.common.core.domain.entity.AlarmRecords; +import com.mh.common.core.domain.entity.AlarmRules; +import com.mh.common.core.domain.entity.CollectionParamsManage; +import com.mh.common.core.redis.RedisCache; +import com.mh.common.utils.BigDecimalUtils; +import com.mh.common.utils.DateUtils; import com.mh.common.utils.StringUtils; import com.mh.system.mapper.operation.AlarmRecordsMapper; +import com.mh.system.service.device.ICollectionParamsManageService; +import com.mh.system.service.operation.IAlarmCodeService; import com.mh.system.service.operation.IAlarmRecordsService; +import com.mh.system.service.operation.IAlarmRulesService; import jakarta.annotation.Resource; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import java.math.BigDecimal; +import java.math.RoundingMode; import java.text.SimpleDateFormat; import java.time.LocalDate; -import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; +import java.util.Date; import java.util.List; +import java.util.concurrent.TimeUnit; /** * @author LJF @@ -27,6 +40,18 @@ public class AlarmRecordsServiceImpl implements IAlarmRecordsService { @Resource private AlarmRecordsMapper alarmRecordsMapper; + @Resource + private IAlarmRulesService alarmRulesService; + + @Resource + private IAlarmCodeService alarmCodeService; + + @Resource + private ICollectionParamsManageService collectionParamsManageService; + + @Autowired + private RedisCache redisTemplate; + @Override public AlarmRecords selectIsExist(AlarmRecords alarmRecords) { QueryWrapper queryWrapper = new QueryWrapper<>(); @@ -76,7 +101,7 @@ public class AlarmRecordsServiceImpl implements IAlarmRecordsService { queryWrapper.like("device_name", alarmRecords.getDeviceName()); } // 报警时间范围 - if (alarmRecords.getParams() != null && !alarmRecords.getParams().isEmpty()) { + if (alarmRecords.getParams() != null && !alarmRecords.getParams().isEmpty()) { String beginTimeStr = (String) alarmRecords.getParams().get("beginTime"); String endTimeStr = (String) alarmRecords.getParams().get("endTime"); @@ -137,4 +162,112 @@ public class AlarmRecordsServiceImpl implements IAlarmRecordsService { public void updateAlarmRecordById(String id, int isSend) { alarmRecordsMapper.updateIsSendById(id, isSend); } + + @Override + public void insertOrUpdateAlarmRecord(String cpmId) { + // 查询仪表报警规则记录,查看哪些规则启用了 + List alarmRules = alarmRulesService.selectAlarmRulesListByStatus(0); + // 循环查询报警规则,判断是否满足报警条件 + for (AlarmRules alarmRule : alarmRules) { + if (!alarmRule.getCpmId().equals(cpmId)) { + continue; + } + // 判断报警类型 + if ("0".equals(alarmRule.getAlarmType())) { + // 当前是越限事件 + // 查询事件类型查询对应的报警模板内容 + AlarmCode alarmCode = alarmCodeService.selectAlarmCodeByAlarmType(alarmRule.getEventType()); + // 获取当前采集参数值 + CollectionParamsManage collectionParamsManage = collectionParamsManageService.selectCollectionParamsManageById(alarmRule.getCpmId()); + // 判断当前值是否是当前事件 + if (null == collectionParamsManage) { + continue; + } + AlarmRecords alarmRecords = new AlarmRecords(); + BigDecimal curValue = collectionParamsManage.getCurValue(); + Date curTime = collectionParamsManage.getCurTime(); + // 阈值 + String threshold1 = alarmRule.getThreshold1(); + // 设置Redis取消标记(有效期略大于延迟时间) + redisTemplate.deleteObject("alarm:cancel:" + collectionParamsManage.getId()); + if (alarmRule.getTimePeriodSet() == 0 && DateUtils.isSameDay(curTime, new Date())) { + // 执行相关操作 + insertOrUpdateRecord(alarmRule, curValue, threshold1, alarmCode, alarmRecords, collectionParamsManage); + } else if (alarmRule.getTimePeriodSet() == 1 + && DateUtils.isSameDay(collectionParamsManage.getCurTime(), new Date()) + && DateUtils.isCurrentTimeInRange(alarmRule.getBeginTime(), alarmRule.getEndTime(), curTime) + ) { + // 执行相关操作 + insertOrUpdateRecord(alarmRule, curValue, threshold1, alarmCode, alarmRecords, collectionParamsManage); + } + } + } + } + + @Override + public String createAlarmTask() { + // 查询仪表报警规则记录,查看哪些规则启用了 + List alarmRules = alarmRulesService.selectAlarmRulesListByStatus(0); + // 循环查询报警规则,判断是否满足报警条件 + for (AlarmRules alarmRule : alarmRules) { + // 判断报警类型 + if ("0".equals(alarmRule.getAlarmType())) { + // 当前是越限事件 + // 获取当前采集参数值 + CollectionParamsManage collectionParamsManage = collectionParamsManageService.selectCollectionParamsManageById(alarmRule.getCpmId()); + // 判断当前值是否是当前事件 + if (null == collectionParamsManage) { + continue; + } + AlarmRecords alarmRecords = new AlarmRecords(); + BigDecimal curValue = collectionParamsManage.getCurValue(); + Date curTime = collectionParamsManage.getCurTime(); + // 阈值 + String threshold1 = alarmRule.getThreshold1(); + if (curValue.compareTo(BigDecimal.ZERO) == 0 || curValue.compareTo(new BigDecimal(threshold1)) < 0) { + // 发送到延迟队列 + return collectionParamsManage.getId(); + } else { + // 设置Redis取消标记(有效期略大于延迟时间) + redisTemplate.setCacheObject( + "alarm:cancel:" + collectionParamsManage.getId(), + "true", + 2, TimeUnit.MINUTES // 例如:延迟5分钟,设置10分钟过期 + ); + } + } + } + return null; + } + + private void insertOrUpdateRecord(AlarmRules alarmRule, BigDecimal curValue, String threshold1, AlarmCode alarmCode, AlarmRecords alarmRecords, CollectionParamsManage collectionParamsManage) { + boolean compare = BigDecimalUtils.compare(alarmRule.getCondition1(), curValue, new BigDecimal(threshold1)); + if (compare) { + // 创建报警记录 + String content = alarmCode.getMsgContent(); + content = content.replace("#{curValue}", curValue.setScale(1, RoundingMode.HALF_UP).toString()); + content = content.replace("#{setValue}", alarmRule.getCondition1() + threshold1); + alarmRecords.setContent(content); + alarmRecords.setAlarmType(alarmRule.getAlarmType()); + alarmRecords.setEventType(alarmRule.getEventType()); + alarmRecords.setAlarmLevel(alarmRule.getAlarmLevel()); + alarmRecords.setLedgerId(alarmRule.getLedgerId()); + alarmRecords.setCpmId(alarmRule.getCpmId()); + alarmRecords.setDeviceName(alarmRule.getDeviceName()); + alarmRecords.setCpmName(alarmRule.getCpmName()); + alarmRecords.setCreateTime(collectionParamsManage.getCurTime()); + // 判断报警记录是否已经存在 + AlarmRecords isExits = selectIsExist(alarmRecords); + if (isExits == null) { + insertAlarmRecord(alarmRecords); + } else { + // 更新报警记录 + isExits.setContent(content); + isExits.setCreateTime(collectionParamsManage.getCurTime()); + isExits.setStatus(0); + isExits.setIsSend(0); + updateAlarmRecord(isExits); + } + } + } }