Browse Source

1、优化netty通信时,CPU占用过高;

dev
25604 2 weeks ago
parent
commit
554cef5ccd
  1. 40
      user-service/src/main/java/com/mh/user/netty/task/CallbackTaskScheduler.java

40
user-service/src/main/java/com/mh/user/netty/task/CallbackTaskScheduler.java

@ -2,10 +2,8 @@ package com.mh.user.netty.task;
import com.google.common.util.concurrent.*; import com.google.common.util.concurrent.*;
import java.util.concurrent.Callable; import java.util.concurrent.*;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/** /**
* @author LJF * @author LJF
@ -15,12 +13,12 @@ import java.util.concurrent.Executors;
* @date 2023/7/3 15:34:11 * @date 2023/7/3 15:34:11
*/ */
public class CallbackTaskScheduler extends Thread { public class CallbackTaskScheduler extends Thread {
private ConcurrentLinkedQueue<CallbackTask> executeTaskQueue = private final BlockingQueue<CallbackTask> executeTaskQueue =
new ConcurrentLinkedQueue<>(); new LinkedBlockingQueue<>();
private long sleepTime = 1000 * 10;
private final ExecutorService pool = Executors.newCachedThreadPool(); private final ExecutorService pool = Executors.newCachedThreadPool();
ListeningExecutorService lpool = MoreExecutors.listeningDecorator(pool); private final ListeningExecutorService lpool = MoreExecutors.listeningDecorator(pool);
private static CallbackTaskScheduler inst = new CallbackTaskScheduler(); private static CallbackTaskScheduler inst = new CallbackTaskScheduler();
private final AtomicBoolean running = new AtomicBoolean(true);
private CallbackTaskScheduler() { private CallbackTaskScheduler() {
this.start(); this.start();
@ -28,32 +26,32 @@ public class CallbackTaskScheduler extends Thread {
//add task //add task
public static <T> void add(CallbackTask<T> executeTask) { public static <T> void add(CallbackTask<T> executeTask) {
inst.executeTaskQueue.add(executeTask); inst.executeTaskQueue.offer(executeTask);
} }
@Override @Override
public void run() { public void run() {
while (true) { while (running.get()) {
handleTask(); handleTask();
//为了避免频繁连接服务器,但是当前连接服务器过长导致失败
//threadSleep(sleepTime);
} }
} }
private void threadSleep(long sleepTime) { /**
try { * 停止调度器
Thread.sleep(sleepTime); */
} catch (Exception e) { public static void shutdown() {
e.printStackTrace(); inst.running.set(false);
} inst.pool.shutdown();
} }
//任务执行 //任务执行
private void handleTask() { private void handleTask() {
CallbackTask executeTask = null; try {
while (executeTaskQueue.peek() != null) { // 使用 take() 阻塞等待,直到有任务
executeTask = executeTaskQueue.poll(); CallbackTask executeTask = executeTaskQueue.take();
handleTask(executeTask); handleTask(executeTask);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} }
} }

Loading…
Cancel
Save