From 554cef5ccdd97c67fd35b4557e0e09932482c1ab Mon Sep 17 00:00:00 2001 From: 25604 Date: Thu, 15 Jan 2026 15:26:52 +0800 Subject: [PATCH] =?UTF-8?q?1=E3=80=81=E4=BC=98=E5=8C=96netty=E9=80=9A?= =?UTF-8?q?=E4=BF=A1=E6=97=B6=EF=BC=8CCPU=E5=8D=A0=E7=94=A8=E8=BF=87?= =?UTF-8?q?=E9=AB=98=EF=BC=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../netty/task/CallbackTaskScheduler.java | 40 +++++++++---------- 1 file changed, 19 insertions(+), 21 deletions(-) diff --git a/user-service/src/main/java/com/mh/user/netty/task/CallbackTaskScheduler.java b/user-service/src/main/java/com/mh/user/netty/task/CallbackTaskScheduler.java index c242f04..e960b9a 100644 --- a/user-service/src/main/java/com/mh/user/netty/task/CallbackTaskScheduler.java +++ b/user-service/src/main/java/com/mh/user/netty/task/CallbackTaskScheduler.java @@ -2,10 +2,8 @@ package com.mh.user.netty.task; import com.google.common.util.concurrent.*; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; /** * @author LJF @@ -15,12 +13,12 @@ import java.util.concurrent.Executors; * @date 2023/7/3 15:34:11 */ public class CallbackTaskScheduler extends Thread { - private ConcurrentLinkedQueue executeTaskQueue = - new ConcurrentLinkedQueue<>(); - private long sleepTime = 1000 * 10; + private final BlockingQueue executeTaskQueue = + new LinkedBlockingQueue<>(); private final ExecutorService pool = Executors.newCachedThreadPool(); - ListeningExecutorService lpool = MoreExecutors.listeningDecorator(pool); + private final ListeningExecutorService lpool = MoreExecutors.listeningDecorator(pool); private static CallbackTaskScheduler inst = new CallbackTaskScheduler(); + private final AtomicBoolean running = new AtomicBoolean(true); private CallbackTaskScheduler() { this.start(); @@ -28,32 +26,32 @@ public class CallbackTaskScheduler extends Thread { //add task public static void add(CallbackTask executeTask) { - inst.executeTaskQueue.add(executeTask); + inst.executeTaskQueue.offer(executeTask); } @Override public void run() { - while (true) { + while (running.get()) { handleTask(); - //为了避免频繁连接服务器,但是当前连接服务器过长导致失败 - //threadSleep(sleepTime); } } - private void threadSleep(long sleepTime) { - try { - Thread.sleep(sleepTime); - } catch (Exception e) { - e.printStackTrace(); - } + /** + * 停止调度器 + */ + public static void shutdown() { + inst.running.set(false); + inst.pool.shutdown(); } //任务执行 private void handleTask() { - CallbackTask executeTask = null; - while (executeTaskQueue.peek() != null) { - executeTask = executeTaskQueue.poll(); + try { + // 使用 take() 阻塞等待,直到有任务 + CallbackTask executeTask = executeTaskQueue.take(); handleTask(executeTask); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); } }