Java多线程与信号量探究
前言
最近在写一个文件批处理框架,用到了Java多线程池。业务需要:
- 提供一个在多线程场景下能够捕捉线程池内线程异常并且上抛到调用方的机制,以实现在多线程提交过程当中,出现异常快速失败FailFast的功能。
- 能够控制任务提交到线程池的速度
对于任务提交速度的控制,使用Semaphore
信号量进行控制。
对于捕获多线程执行异常的情况,方案有两种:
- 在提交到线程池的Runnable中,对业务逻辑做异常捕获,如果抛出异常,则将标志位写入Redis,任务提交的代码每隔N个任务检查一次Redis标记位,如果发现异常,则立即停止任务提交。
- 在线程池中对
submit()
方法返回的Future<?>
对象进行缓存,每隔N个任务遍历一次缓存的Future<?>
,通过调用其get()
方法,如果异常存在则直接抛出。
ThreadPoolExecutor
先回顾一下JDK提供的经典线程池ThreadPoolExecutor
构造方法:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
// some basic verification code
}
corePoolSize
用于设置线程池核心线程数(除非用的AsynchronousQueue,否则核心线程意味着不会被线程池回收的线程)maximumPoolSize
用于设置线程池最大线程数(在keepAliveTime后,非核心线程会被线程池回收)keeyAliveTime
用于设置非核心线程存活时间unit
作为TimeUnit
类型用于设置存活时间单位(微秒,毫秒,秒,分,时,日)workQueue
提供一个任务队列给线程池使用,常用的有:LinkedBlockingQueue
链表实现,可以设置队列长度,不设置的话默认为无界队列(不推荐使用无界队列,容易资源耗尽)ArrayBlockingQueue
数组实现,必须设置队列长度,因为在线程池中要经常进行出入队操作,所以链表实现会比数组实现性能要高一些(纯个人猜想,没有做过验证)PriorityBlockingQueue
优先级队列,数组实现,二分查找实现排序,必须设置队列长度
threadFactory
线程工厂,线程池内部调用工厂获取线程,不设置的话就用DefaultThreadFactory
handler
线程池拒绝策略,线程池达到最大且等待队列满的情况下调用拒绝策略,JDK提供了四个默认的拒绝策略实现:AbortPolicy
直接拒绝,抛出RejectedExecutionException
给线程池调用方CallerRunsPolicy
调用者执行策略,即调用方来执行任务DiscardOldestPolicy
扔掉等待队列中的头节点,即等待时长最久的任务,不抛出异常,继续执行当前提交的任务(不推荐)DiscardPolicy
扔掉当前提交到线程池的任务,不抛出异常,线程池继续运行(不推荐)
上述就是线程池的全部构造参数,这里不做展开,内容很简单,可以直接看源代码就行。
添加线程的方法如下:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
JDK官方文档对于线程添加做了比较详细的注释:
- 如果当前线程数小于核心线程数,则添加核心线程运行当前提交到队列中的任务
- 如果当前线程数大于核心线程数,尝试往队列添加任务,如果能添加成功则添加非核心线程(这里做了一个双重检查,避免出现添加任务过程中线程池关闭的情况)
- 如果线程数已经达到线程池上限,且等待队列已满,则调用reject方法,根据构造函数提供的拒绝策略进行处理
以上就是线程池的基本使用规则。
Semaphore
在很多语言中,都有信号量的机制,Java也不例外,对于信号量的使用,本身是比较简单的一件事情,源码也不多,常用方法如下:
acuire()
获取一个信号量,如果当前无可用信号量,则阻塞当前线程javaprivate void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
release()
释放一个信号量,发出unpark
指令唤起等待队列
业务逻辑实现
通过对Semaphore
和ThreadPoolExecutor
的搭配,我们可以实现一个理论上可行的能够控制任务提交至线程池速度,并能够捕获线程池中线程异常的封装(注意为什么是理论上可行,后续会详细描述。。。这也是我写这篇文章的原因)
先来看我预先实现的代码:
AsyncThrowableExecutor.java
package com.zhaoyingjie;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.*;
public class AsyncThrowableExecutor {
private final Semaphore semaphore;
private final ExecutorService executorService;
private final List<Future<?>> futures;
private final int futureListSize;
public AsyncThrowableExecutor() {
executorService = new ThreadPoolExecutor(5, 10, 100, TimeUnit.SECONDS, new LinkedBlockingQueue<>(5), new LoggingCallerRunPolicy());
semaphore = new Semaphore(15);
futureListSize = 10;
futures = new LinkedList<>();
}
public ExecutorService executorService() {
return this.executorService;
}
public void execute(final Runnable runnable) throws InterruptedException {
checkFutures();
semaphore.acquire();
try {
Future<?> future = executorService.submit(() -> {
try {
runnable.run();
} finally {
semaphore.release();
}
});
futures.add(future);
} catch (Exception e) {
semaphore.release();
throw e;
}
}
private void checkFutures() {
if (futures.size() >= futureListSize) {
try {
for (Future<?> future : futures) {
future.get();
}
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
futures.clear();
}
}
}
Main.java
package com.zhaoyingjie;
import java.util.concurrent.atomic.AtomicLong;
public class Main {
public static void main(String[] args) throws InterruptedException {
AsyncThrowableExecutor executor = new AsyncThrowableExecutor();
AtomicLong atomicLong = new AtomicLong(0);
long begin = System.currentTimeMillis();
for (int i = 0; i < 20; i++) {
try {
executor.execute(() -> {
try {
Thread.sleep(100L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
} catch (Exception e) {
System.out.printf("Error occurred in task %d, %s\n", i, e.getMessage());
}
System.out.printf("[Submitted %d task] %s\n", atomicLong.incrementAndGet(), executor.executorService());
}
long end = System.currentTimeMillis();
Thread.sleep(5000L);
System.out.printf("Atomic integer %d, cost %d ms\n", atomicLong.get(), end - begin);
}
}
LoggingCallerRunPolicy.java
package com.zhaoyingjie;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicLong;
public class LoggingCallerRunPolicy extends ThreadPoolExecutor.CallerRunsPolicy {
private final AtomicLong callerRunTimes = new AtomicLong(0);
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.printf("Caller runs %d, Executor info %s\n", callerRunTimes.incrementAndGet(), executor);
super.rejectedExecution(r, executor);
}
}
以上就是这个功能实现的全部设计代码。
问题发现与研究
原本我预想的是,控制住信号量大小,使其刚好等于maximumPoolSize
+linkedBlockingQueue
阻塞队列的大小,那么在主线程提交任务的时候,如果阻塞队列满了并且达到线程池最大线程数,就会在调用acquire()
方法的时候阻塞住,等到有线程执行完毕上一个任务,调用release()
方法后,主线程再继续进行任务的提交,然鹅理想很丰满,现实很骨感,我发现了一个神奇的现象,控制台输出如下:
[Submitted 1 task] [Running, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 0]
[Submitted 2 task] [Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
[Submitted 3 task] [Running, pool size = 3, active threads = 3, queued tasks = 0, completed tasks = 0]
[Submitted 4 task] [Running, pool size = 4, active threads = 4, queued tasks = 0, completed tasks = 0]
[Submitted 5 task] [Running, pool size = 5, active threads = 5, queued tasks = 0, completed tasks = 0]
[Submitted 6 task] [Running, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
[Submitted 7 task] [Running, pool size = 5, active threads = 5, queued tasks = 2, completed tasks = 0]
[Submitted 8 task] [Running, pool size = 5, active threads = 5, queued tasks = 3, completed tasks = 0]
[Submitted 9 task] [Running, pool size = 5, active threads = 5, queued tasks = 4, completed tasks = 0]
[Submitted 10 task] [Running, pool size = 5, active threads = 5, queued tasks = 5, completed tasks = 0]
[Submitted 11 task] [Running, pool size = 6, active threads = 6, queued tasks = 5, completed tasks = 0]
[Submitted 12 task] [Running, pool size = 7, active threads = 7, queued tasks = 5, completed tasks = 0]
[Submitted 13 task] [Running, pool size = 8, active threads = 8, queued tasks = 5, completed tasks = 0]
[Submitted 14 task] [Running, pool size = 9, active threads = 9, queued tasks = 5, completed tasks = 0]
[Submitted 15 task] [Running, pool size = 10, active threads = 10, queued tasks = 5, completed tasks = 0]
[Caller runs 1 task] [Running, pool size = 10, active threads = 10, queued tasks = 4, completed tasks = 1]
[Submitted 16 task] [Running, pool size = 10, active threads = 4, queued tasks = 0, completed tasks = 11]
[Submitted 17 task] [Running, pool size = 10, active threads = 4, queued tasks = 1, completed tasks = 11]
[Submitted 18 task] [Running, pool size = 10, active threads = 4, queued tasks = 2, completed tasks = 11]
[Submitted 19 task] [Running, pool size = 10, active threads = 4, queued tasks = 3, completed tasks = 11]
[Submitted 20 task] [Running, pool size = 10, active threads = 4, queued tasks = 4, completed tasks = 11]
可以看到,随着任务提交,第1至第5任务,pool size每次+1,第6至第10任务,queued tasks每次+1,第11至第15任务,pool size每次+1,符合我们上面提到的线程池添加线程的规则。
但是注意看上述输出的第16行,线程池调用了我自己写的拒绝策略,打印了一行日志,也就是说,我刚才设置的信号量并没有完全控制住提交任务的线程,在任务提交的时候被线程池拒绝了,所以才能看到上述日志。发现这个问题之后,我看了信号量和线程池的源代码很久,也没看出什么问题,并且这个现象随着我调整信号量的大小,线程池大小,任务提交次数(外层for循环)以及任务子线程占用时间(Thread.sleep()
方法),都依旧可以复现。就在我百思不得其解的时候,我突然想到一个问题,我们仔细看上面第16行日志,此时线程池的completed tasks为1,也就是说,在那一瞬间,有一个线程完成了任务的执行,然后调用了release
方法,使得原本阻塞在acquire()
方法出的主线程被唤醒继续执行提交任务,也就是说:
信号量和线程池并不是用的同一个锁对象,意味着,线程池内的线程执行完任务后,先调用release()
信号量方法,然后再return,那么此时线程不一定能够立刻被回收,但是提交任务的主线程被唤醒了,开始执行submit()
方法,换句话说,信号量的释放和线程池的回收存在一定的延迟,这个延迟时间内,如果刚好有线程阻塞在了acquire()
方法那里,并且线程池内队列已满,那么就会触发线程池拒绝策略。
所以对于目前的设计,如果要保证没有任务被拒绝或者丢弃,最好的解决方案就是调整优化线程池、信号量、等待队列的各参数大小,譬如等待队列可以设置大一些,信号量在允许范围内调小一些,等等,于此同时,使用CallerRunsPolicy
, 即便出现线程池满了的情况下,依旧可以保证任务的完整执行。
我将线程池等待队列调整为30,核心线程数调整为availableProcessor*2
,最大线程数调整为abailableProcessor*4
,信号量设置为50之后,出现任务提交被拒绝的次数明显少了很多,但是值得注意的是,我们的线程执行的任务可能是和数据库进行交互的,线程执行时间也会对这个结果产生影响,例如,线程池内所有线程在等待相同时间后,同一时刻同时释放信号量锁,此时主线程进行任务提交产生的被线程池拒绝 的概率就会大大增加,线程执行任务这个耗时需要在自己的业务场景下进行估算,并依此对上述参数进行调整。
以上就是我在这次开发期间遇到的线程池和信号量问题的一点探究。如有错误,欢迎各位指正。
关于JDK线程池回收机制的讲解,可以参考这里,里面详细且通俗的描述了线程池内线程的完整生命周期以及一些线程池设计精巧之处的解析,向同行致敬。