package com.mh; import com.mh.common.core.domain.entity.GatewayManage; import com.mh.common.core.domain.entity.MqttSubscription; import com.mh.common.utils.StringUtils; import com.mh.framework.mqtt.service.IMqttTopicService; import com.mh.framework.netty.EchoServer; import com.mh.system.service.device.ICollectionParamsManageService; import com.mh.system.service.device.IGatewayManageService; import com.mh.system.service.mqtt.IMqttSubscriptionService; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.stereotype.Component; import java.util.List; import java.util.stream.Collectors; /** * @author LJF * @version 1.0 * @project EEMCS * @description 项目初始化之后的系列操作 * @date 2025-02-14 16:35:50 */ @Component public class MHRunner implements ApplicationRunner { private final IMqttSubscriptionService iMqttSubscriptionService; private final IMqttTopicService iMqttTopicService; private final ICollectionParamsManageService collectionParamsManageService; private final IGatewayManageService gatewayManageService; public MHRunner(IMqttSubscriptionService iMqttSubscriptionService, IMqttTopicService iMqttTopicService, ICollectionParamsManageService collectionParamsManageService, IGatewayManageService gatewayManageService) { this.iMqttSubscriptionService = iMqttSubscriptionService; this.iMqttTopicService = iMqttTopicService; this.collectionParamsManageService = collectionParamsManageService; this.gatewayManageService = gatewayManageService; } @Override public void run(ApplicationArguments args) throws Exception { // 初始化mqtt订阅记录 initializeMqttSubscription(); // 生成DTU采集参数 // createDtuCollectionParams(); // 启动netty服务端 // startNettyServer(); } private void startNettyServer() { List gatewayManages = gatewayManageService.selectGwManageList(new GatewayManage()); if (gatewayManages != null && !gatewayManages.isEmpty()) { // 根据端口号分组 gatewayManages.stream().collect(Collectors.groupingBy(GatewayManage::getPort)).forEach((k, v) -> { // 启动网关 GatewayManage gatewayManage = v.getFirst(); new Thread(() -> new EchoServer(gatewayManage.getPort()).start()).start(); }); } } private void createDtuCollectionParams() { collectionParamsManageService.createDtuCollectionParams(); } /** * 初始化mqtt订阅记录 */ private void initializeMqttSubscription() { MqttSubscription mqttSubscription = new MqttSubscription(); mqttSubscription.setStatus("0"); List mqttSubscriptions = iMqttSubscriptionService.selectMqttSubList(mqttSubscription); for (MqttSubscription subscription : mqttSubscriptions) { try { if (!StringUtils.isEmpty(subscription.getTopic())) { iMqttTopicService.subscribe(subscription.getTopic(), subscription.getQos()); } } catch (Exception e) { throw new RuntimeException(e); } } } }