diff --git a/user-service/src/main/java/com/mh/user/netty/task/CallbackTaskScheduler.java b/user-service/src/main/java/com/mh/user/netty/task/CallbackTaskScheduler.java index c242f04..e960b9a 100644 --- a/user-service/src/main/java/com/mh/user/netty/task/CallbackTaskScheduler.java +++ b/user-service/src/main/java/com/mh/user/netty/task/CallbackTaskScheduler.java @@ -2,10 +2,8 @@ package com.mh.user.netty.task; import com.google.common.util.concurrent.*; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; /** * @author LJF @@ -15,12 +13,12 @@ import java.util.concurrent.Executors; * @date 2023/7/3 15:34:11 */ public class CallbackTaskScheduler extends Thread { - private ConcurrentLinkedQueue executeTaskQueue = - new ConcurrentLinkedQueue<>(); - private long sleepTime = 1000 * 10; + private final BlockingQueue executeTaskQueue = + new LinkedBlockingQueue<>(); private final ExecutorService pool = Executors.newCachedThreadPool(); - ListeningExecutorService lpool = MoreExecutors.listeningDecorator(pool); + private final ListeningExecutorService lpool = MoreExecutors.listeningDecorator(pool); private static CallbackTaskScheduler inst = new CallbackTaskScheduler(); + private final AtomicBoolean running = new AtomicBoolean(true); private CallbackTaskScheduler() { this.start(); @@ -28,32 +26,32 @@ public class CallbackTaskScheduler extends Thread { //add task public static void add(CallbackTask executeTask) { - inst.executeTaskQueue.add(executeTask); + inst.executeTaskQueue.offer(executeTask); } @Override public void run() { - while (true) { + while (running.get()) { handleTask(); - //为了避免频繁连接服务器,但是当前连接服务器过长导致失败 - //threadSleep(sleepTime); } } - private void threadSleep(long sleepTime) { - try { - Thread.sleep(sleepTime); - } catch (Exception e) { - e.printStackTrace(); - } + /** + * 停止调度器 + */ + public static void shutdown() { + inst.running.set(false); + inst.pool.shutdown(); } //任务执行 private void handleTask() { - CallbackTask executeTask = null; - while (executeTaskQueue.peek() != null) { - executeTask = executeTaskQueue.poll(); + try { + // 使用 take() 阻塞等待,直到有任务 + CallbackTask executeTask = executeTaskQueue.take(); handleTask(executeTask); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); } }