项目场景:
线程池的地方用的还是挺多的,一般来说用的多的还是execute方法,submit方法还是用的挺少的,一般ThreadPoolExecutor
的 submit
方法通常用于将一个任务提交到线程池中执行。这个方法会返回一个 Future
对象,可以用来检查任务的执行状态,获取任务的返回值或者取消任务的执行。
使用 submit
方法可以将任务提交到线程池中,由线程池中的线程来执行任务,从而避免了为每个任务创建线程的开销。同时,线程池可以限制同时执行的任务数量,避免资源被过度占用。
问题描述
提示:部分代码
某台服务器上配置了一个agent服务用来做命令执行,发现队列老是堆积。消费不过来明明用了线程池也发现任务队列没有满,奇怪。
项目日志:
Add task [com.timelinecapital.util.agent.commands.ops.CommandSHLoginExecuteRunner@7751c119] ThreadPool status: FixedThreadPool:ActiveCount:0,CompletedTaskCount:1680,TaskCount:1681,PoolSize:6,CorePoolSize:6,QueueSize:1
项目代码:
ThreadPoolExecutor service = null;
@Test
public void testPools() throws ExecutionException, InterruptedException, TimeoutException {
final int corePoolSize = Runtime.getRuntime().availableProcessors();
log.info("corePoolSize:{}", corePoolSize);
final int maximumPoolSize = corePoolSize * 2;
final long keepAliveTime = 20;
final TimeUnit unit = TimeUnit.SECONDS;
final BlockingQueue<Runnable> workQueue = new LinkedBlockingDeque<>();
final ThreadFactory threadFactory = new ThreadFactory() {
private final AtomicInteger mThreadNum = new AtomicInteger(1);
@Override
public Thread newThread(final Runnable r) {
return new Thread(r, "command-thread-" + mThreadNum.getAndIncrement());
}
};
service = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime,
unit, workQueue, threadFactory);
for (int i = 0; i < maximumPoolSize; i++) {
log.info("pool========{}", showStatus());
final Future<?> future = service.submit((Callable) () -> {
log.info("thread name start:{}========", Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(3);
log.info("thread name end:{}========", Thread.currentThread().getName());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return null;
});
Object o = future.get(10, TimeUnit.SECONDS);
}
log.info("complate");
try {
Thread.currentThread().join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
String showStatus() {
final ThreadPoolExecutor executor = (ThreadPoolExecutor) service;
final StringBuilder info = new StringBuilder();
info.append("FixedThreadPool:");
info.append("ActiveCount:").append(executor.getActiveCount()).append(",");
info.append("CompletedTaskCount:").append(executor.getCompletedTaskCount()).append(",");
info.append("TaskCount:").append(executor.getTaskCount()).append(",");
info.append("PoolSize:").append(executor.getPoolSize()).append(",");
info.append("CorePoolSize:").append(executor.getCorePoolSize()).append(",");
final BlockingQueue<Runnable> queue = executor.getQueue();
if (queue != null) {
info.append("QueueSize:").append(queue.size());
}
return info.toString();
}
原因分析:
提示:跑了一次看到日志按照单线程的方式执行,瞬间顿悟。
2023-08-04 11:10:25 INFO UtilsTest :105 - corePoolSize:12
2023-08-04 11:10:25 INFO UtilsTest :121 - pool========FixedThreadPool:ActiveCount:0,CompletedTaskCount:0,TaskCount:0,PoolSize:0,CorePoolSize:12,QueueSize:0
2023-08-04 11:10:25 INFO UtilsTest :123 - thread name start:command-thread-1========
2023-08-04 11:10:28 INFO UtilsTest :126 - thread name end:command-thread-1========
2023-08-04 11:10:28 INFO UtilsTest :121 - pool========FixedThreadPool:ActiveCount:0,CompletedTaskCount:1,TaskCount:1,PoolSize:1,CorePoolSize:12,QueueSize:0
2023-08-04 11:10:28 INFO UtilsTest :123 - thread name start:command-thread-2========
2023-08-04 11:10:31 INFO UtilsTest :126 - thread name end:command-thread-2========
2023-08-04 11:10:31 INFO UtilsTest :121 - pool========FixedThreadPool:ActiveCount:0,CompletedTaskCount:2,TaskCount:2,PoolSize:2,CorePoolSize:12,QueueSize:0
2023-08-04 11:10:31 INFO UtilsTest :123 - thread name start:command-thread-3========
2023-08-04 11:10:34 INFO UtilsTest :126 - thread name end:command-thread-3========
2023-08-04 11:10:34 INFO UtilsTest :121 - pool========FixedThreadPool:ActiveCount:0,CompletedTaskCount:3,TaskCount:3,PoolSize:3,CorePoolSize:12,QueueSize:0
2023-08-04 11:10:34 INFO UtilsTest :123 - thread name start:command-thread-4========
原来submit的方式用错了,不应该直接这么get的,这样就跟没有开线程池一样,因为future.get(10, TimeUnit.SECONDS)
会阻塞线程继续执行,线程池的最大使用效率没有返回出来,只用到一个单线程在执行,结果等于没有用。
从查看submit的源码来看,其实也是调用了java.util.concurrent.Executor#execute
方法,只是换了线程实现而已,又让我想起那句话,之前不懂代码的时候看代码是代码,后面懂代码了,看代码就是看方法,现在深入代码底层看代码还是代码,惯性是个恐怖的事情。
/**
* @throws NullPointerException if the task is null
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
*/
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
解决方案:
提示:取消立马获 future.get(10, TimeUnit.SECONDS)方式
最后只能修改业务逻辑,因为对执行结果不是特别需求,所有可以改成execute方式,当然如果逻辑对返回值的需求特别的可以解耦,使用生产者消费者模式,一边计算一边处理,实现逻辑可以这样,在submit返回的Future对象存储在一个集合里面,在另一边可以批次处理也可以单次处理,批次处理就判断所有的submit执行完之后处理,单次处理就使用队列集合,一次取一个值理论情况下不会阻塞太久。
总结
习惯了用execute就忘记了submit的正确使用方式,惯性是很恐怖的,还是得多多跑跑单元测试。