From 786da07f68f89f087754c0c558e6f0a741fc127d Mon Sep 17 00:00:00 2001 From: 25604 Date: Tue, 20 Jan 2026 11:03:13 +0800 Subject: [PATCH] =?UTF-8?q?1=E3=80=81=E4=BC=98=E5=8C=96netty=E4=B8=8A?= =?UTF-8?q?=E7=BA=BF=EF=BC=8C=E5=87=8F=E5=B0=91cpu=E4=BD=BF=E7=94=A8?= =?UTF-8?q?=E7=8E=87=EF=BC=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../netty/task/CallbackTaskScheduler.java | 48 +++++++++---------- 1 file changed, 24 insertions(+), 24 deletions(-) diff --git a/mh-framework/src/main/java/com/mh/framework/netty/task/CallbackTaskScheduler.java b/mh-framework/src/main/java/com/mh/framework/netty/task/CallbackTaskScheduler.java index 3ef3c34..1f56a05 100644 --- a/mh-framework/src/main/java/com/mh/framework/netty/task/CallbackTaskScheduler.java +++ b/mh-framework/src/main/java/com/mh/framework/netty/task/CallbackTaskScheduler.java @@ -2,10 +2,8 @@ package com.mh.framework.netty.task; import com.google.common.util.concurrent.*; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; /** * @author LJF @@ -15,45 +13,48 @@ import java.util.concurrent.Executors; * @date 2023/7/3 15:34:11 */ public class CallbackTaskScheduler extends Thread { - private ConcurrentLinkedQueue executeTaskQueue = - new ConcurrentLinkedQueue<>(); - private long sleepTime = 1000 * 10; + private final BlockingQueue executeTaskQueue = + new LinkedBlockingQueue<>(); private final ExecutorService pool = Executors.newCachedThreadPool(); - ListeningExecutorService lpool = MoreExecutors.listeningDecorator(pool); + private final ListeningExecutorService lpool = MoreExecutors.listeningDecorator(pool); private static CallbackTaskScheduler inst = new CallbackTaskScheduler(); + private final AtomicBoolean running = new AtomicBoolean(true); + private CallbackTaskScheduler() { this.start(); } + //add task public static void add(CallbackTask executeTask) { - inst.executeTaskQueue.add(executeTask); + inst.executeTaskQueue.offer(executeTask); } @Override public void run() { - while (true) { + while (running.get()) { handleTask(); - //为了避免频繁连接服务器,但是当前连接服务器过长导致失败 - //threadSleep(sleepTime); } } - private void threadSleep(long sleepTime) { - try { - Thread.sleep(sleepTime); - }catch (Exception e) { - e.printStackTrace(); - } + /** + * 停止调度器 + */ + public static void shutdown() { + inst.running.set(false); + inst.pool.shutdown(); } //任务执行 private void handleTask() { - CallbackTask executeTask = null; - while (executeTaskQueue.peek() != null) { - executeTask = executeTaskQueue.poll(); + try { + // 使用 take() 阻塞等待,直到有任务 + CallbackTask executeTask = executeTaskQueue.take(); handleTask(executeTask); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); } } + private void handleTask(CallbackTask executeTask) { ListenableFuture future = lpool.submit(new Callable() { public T call() throws Exception { @@ -70,9 +71,8 @@ public class CallbackTaskScheduler extends Thread { public void onFailure(Throwable throwable) { executeTask.onException(throwable); } - - - }, pool); + }, lpool); } + }