diff --git a/mh-framework/src/main/java/com/mh/framework/netty/task/CallbackTaskScheduler.java b/mh-framework/src/main/java/com/mh/framework/netty/task/CallbackTaskScheduler.java index 3ef3c34..1f56a05 100644 --- a/mh-framework/src/main/java/com/mh/framework/netty/task/CallbackTaskScheduler.java +++ b/mh-framework/src/main/java/com/mh/framework/netty/task/CallbackTaskScheduler.java @@ -2,10 +2,8 @@ package com.mh.framework.netty.task; import com.google.common.util.concurrent.*; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; /** * @author LJF @@ -15,45 +13,48 @@ import java.util.concurrent.Executors; * @date 2023/7/3 15:34:11 */ public class CallbackTaskScheduler extends Thread { - private ConcurrentLinkedQueue executeTaskQueue = - new ConcurrentLinkedQueue<>(); - private long sleepTime = 1000 * 10; + private final BlockingQueue executeTaskQueue = + new LinkedBlockingQueue<>(); private final ExecutorService pool = Executors.newCachedThreadPool(); - ListeningExecutorService lpool = MoreExecutors.listeningDecorator(pool); + private final ListeningExecutorService lpool = MoreExecutors.listeningDecorator(pool); private static CallbackTaskScheduler inst = new CallbackTaskScheduler(); + private final AtomicBoolean running = new AtomicBoolean(true); + private CallbackTaskScheduler() { this.start(); } + //add task public static void add(CallbackTask executeTask) { - inst.executeTaskQueue.add(executeTask); + inst.executeTaskQueue.offer(executeTask); } @Override public void run() { - while (true) { + while (running.get()) { handleTask(); - //为了避免频繁连接服务器,但是当前连接服务器过长导致失败 - //threadSleep(sleepTime); } } - private void threadSleep(long sleepTime) { - try { - Thread.sleep(sleepTime); - }catch (Exception e) { - e.printStackTrace(); - } + /** + * 停止调度器 + */ + public static void shutdown() { + inst.running.set(false); + inst.pool.shutdown(); } //任务执行 private void handleTask() { - CallbackTask executeTask = null; - while (executeTaskQueue.peek() != null) { - executeTask = executeTaskQueue.poll(); + try { + // 使用 take() 阻塞等待,直到有任务 + CallbackTask executeTask = executeTaskQueue.take(); handleTask(executeTask); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); } } + private void handleTask(CallbackTask executeTask) { ListenableFuture future = lpool.submit(new Callable() { public T call() throws Exception { @@ -70,9 +71,8 @@ public class CallbackTaskScheduler extends Thread { public void onFailure(Throwable throwable) { executeTask.onException(throwable); } - - - }, pool); + }, lpool); } + }