Java Multi-threading and Semaphore Investigation
Introduction
Recently, while developing a file batch processing framework using Java thread pools, the business requirements were:
- Provide a mechanism to capture and propagate exceptions from threads within the thread pool to the caller in a multi-threaded scenario, implementing a FailFast functionality when exceptions occur during multi-threaded submissions.
- Control the rate at which tasks are submitted to the thread pool
For controlling task submission speed, we use the Semaphore
mechanism.
For capturing multi-threaded execution exceptions, there are two approaches:
- Catch exceptions in the Runnable submitted to the thread pool. If an exception occurs, write a flag to Redis. The task submission code checks the Redis flag every N tasks, and if an exception is detected, immediately stops task submission.
- Cache the
Future<?>
objects returned by the pool'ssubmit()
method, check the cachedFuture<?>
objects every N tasks by calling theirget()
method, and throw any existing exceptions directly.
ThreadPoolExecutor
Let's first review the classic JDK ThreadPoolExecutor
constructor:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
// some basic verification code
}
corePoolSize
sets the core thread pool size (unless using AsynchronousQueue, core threads won't be recycled by the thread pool)maximumPoolSize
sets the maximum thread pool size (non-core threads will be recycled after keepAliveTime)keepAliveTime
sets the survival time for non-core threadsunit
as TimeUnit type sets the time unit for survival time (microseconds, milliseconds, seconds, minutes, hours, days)workQueue
provides a task queue for the thread pool to use, common ones include:LinkedBlockingQueue
linked list implementation, can set queue length, defaults to unbounded queue if not set (not recommended as it can exhaust resources)ArrayBlockingQueue
array implementation, must set queue length, linked list implementation likely performs better than array implementation due to frequent enqueue/dequeue operations (personal speculation, unverified)PriorityBlockingQueue
priority queue, array implementation, uses binary search for sorting, must set queue length
threadFactory
thread factory, thread pool internally calls factory to get threads, usesDefaultThreadFactory
if not sethandler
thread pool rejection strategy, called when pool reaches maximum and waiting queue is full. JDK provides four default rejection strategy implementations:AbortPolicy
direct rejection, throwsRejectedExecutionException
to thread pool callerCallerRunsPolicy
caller executes strategy, meaning caller executes the taskDiscardOldestPolicy
discards head node in waiting queue (longest waiting task), no exception thrown, continues executing current submitted task (not recommended)DiscardPolicy
discards currently submitted task, no exception thrown, thread pool continues running (not recommended)
These are all the constructor parameters for the thread pool. The content is simple and can be understood directly from the source code.
The method for adding threads is as follows:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
The JDK official documentation provides detailed comments on thread addition:
- If current thread count is less than core thread count, add a core thread to run the task submitted to the queue
- If current thread count is greater than core thread count, try to add task to queue. If successful, add non-core thread (double-check here to avoid pool shutdown during task addition)
- If thread count has reached pool maximum and waiting queue is full, call reject method to handle according to constructor-provided rejection strategy
These are the basic usage rules for thread pools.
Semaphore
Many languages have semaphore mechanisms, Java is no exception. Using semaphores is relatively simple, the source code isn't extensive, and common methods include:
acquire()
obtains a semaphore, blocks current thread if no semaphore is availablejavaprivate void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
release()
releases a semaphore, issuesunpark
instruction to wake waiting queue
Business Logic Implementation
By combining Semaphore
and ThreadPoolExecutor
, we can implement a theoretically viable wrapper that can control task submission speed to thread pool and capture thread exceptions (note why it's theoretical, will be described in detail later... this is why I'm writing this article)
Let's look at my initial implementation:
AsyncThrowableExecutor.java
package com.zhaoyingjie;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.*;
public class AsyncThrowableExecutor {
private final Semaphore semaphore;
private final ExecutorService executorService;
private final List<Future<?>> futures;
private final int futureListSize;
public AsyncThrowableExecutor() {
executorService = new ThreadPoolExecutor(5, 10, 100, TimeUnit.SECONDS, new LinkedBlockingQueue<>(5), new LoggingCallerRunPolicy());
semaphore = new Semaphore(15);
futureListSize = 10;
futures = new LinkedList<>();
}
public ExecutorService executorService() {
return this.executorService;
}
public void execute(final Runnable runnable) throws InterruptedException {
checkFutures();
semaphore.acquire();
try {
Future<?> future = executorService.submit(() -> {
try {
runnable.run();
} finally {
semaphore.release();
}
});
futures.add(future);
} catch (Exception e) {
semaphore.release();
throw e;
}
}
private void checkFutures() {
if (futures.size() >= futureListSize) {
try {
for (Future<?> future : futures) {
future.get();
}
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
futures.clear();
}
}
}
Main.java
package com.zhaoyingjie;
import java.util.concurrent.atomic.AtomicLong;
public class Main {
public static void main(String[] args) throws InterruptedException {
AsyncThrowableExecutor executor = new AsyncThrowableExecutor();
AtomicLong atomicLong = new AtomicLong(0);
long begin = System.currentTimeMillis();
for (int i = 0; i < 20; i++) {
try {
executor.execute(() -> {
try {
Thread.sleep(100L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
} catch (Exception e) {
System.out.printf("Error occurred in task %d, %s\n", i, e.getMessage());
}
System.out.printf("[Submitted %d task] %s\n", atomicLong.incrementAndGet(), executor.executorService());
}
long end = System.currentTimeMillis();
Thread.sleep(5000L);
System.out.printf("Atomic integer %d, cost %d ms\n", atomicLong.get(), end - begin);
}
}
LoggingCallerRunPolicy.java
package com.zhaoyingjie;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicLong;
public class LoggingCallerRunPolicy extends ThreadPoolExecutor.CallerRunsPolicy {
private final AtomicLong callerRunTimes = new AtomicLong(0);
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.printf("Caller runs %d, Executor info %s\n", callerRunTimes.incrementAndGet(), executor);
super.rejectedExecution(r, executor);
}
}
This is the complete design code for this functionality.
Problem Discovery and Investigation
Originally, I expected that by controlling the semaphore size to equal maximumPoolSize
+ linkedBlockingQueue
blocking queue size, when the main thread submits tasks, if the blocking queue is full and maximum thread count is reached, it would block at the acquire()
method call, waiting until a thread completes its previous task and calls release()
method before the main thread continues task submission. However, while the theory was perfect, reality proved different. I discovered an interesting phenomenon, with console output as follows:
[Submitted 1 task] [Running, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 0]
[Submitted 2 task] [Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
[Submitted 3 task] [Running, pool size = 3, active threads = 3, queued tasks = 0, completed tasks = 0]
[Submitted 4 task] [Running, pool size = 4, active threads = 4, queued tasks = 0, completed tasks = 0]
[Submitted 5 task] [Running, pool size = 5, active threads = 5, queued tasks = 0, completed tasks = 0]
[Submitted 6 task] [Running, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
[Submitted 7 task] [Running, pool size = 5, active threads = 5, queued tasks = 2, completed tasks = 0]
[Submitted 8 task] [Running, pool size = 5, active threads = 5, queued tasks = 3, completed tasks = 0]
[Submitted 9 task] [Running, pool size = 5, active threads = 5, queued tasks = 4, completed tasks = 0]
[Submitted 10 task] [Running, pool size = 5, active threads = 5, queued tasks = 5, completed tasks = 0]
[Submitted 11 task] [Running, pool size = 6, active threads = 6, queued tasks = 5, completed tasks = 0]
[Submitted 12 task] [Running, pool size = 7, active threads = 7, queued tasks = 5, completed tasks = 0]
[Submitted 13 task] [Running, pool size = 8, active threads = 8, queued tasks = 5, completed tasks = 0]
[Submitted 14 task] [Running, pool size = 9, active threads = 9, queued tasks = 5, completed tasks = 0]
[Submitted 15 task] [Running, pool size = 10, active threads = 10, queued tasks = 5, completed tasks = 0]
[Caller runs 1 task] [Running, pool size = 10, active threads = 10, queued tasks = 4, completed tasks = 1]
[Submitted 16 task] [Running, pool size = 10, active threads = 4, queued tasks = 0, completed tasks = 11]
[Submitted 17 task] [Running, pool size = 10, active threads = 4, queued tasks = 1, completed tasks = 11]
[Submitted 18 task] [Running, pool size = 10, active threads = 4, queued tasks = 2, completed tasks = 11]
[Submitted 19 task] [Running, pool size = 10, active threads = 4, queued tasks = 3, completed tasks = 11]
[Submitted 20 task] [Running, pool size = 10, active threads = 4, queued tasks = 4, completed tasks = 11]
As we can see, as tasks are submitted, tasks 1-5 increase pool size by 1 each time, tasks 6-10 increase queued tasks by 1 each time, and tasks 11-15 increase pool size by 1 each time, conforming to the thread pool thread addition rules we mentioned above.
However, notice line 16 of the output above - the thread pool called my custom rejection strategy and printed a log, meaning the semaphore I set didn't fully control the task-submitting thread, and the task was rejected by the thread pool, hence the log. After discovering this issue, I spent a long time reviewing the semaphore and thread pool source code but couldn't spot any problems. This phenomenon persisted regardless of adjustments to semaphore size, thread pool size, task submission count (outer for loop), and task thread occupation time (Thread.sleep()
method). Just when I was completely puzzled, I suddenly realized something. Looking carefully at line 16 of the log, the thread pool's completed tasks is 1, meaning at that instant, a thread completed task execution and called the release
method, waking up the main thread blocked at the acquire()
method to continue submitting tasks. In other words:
The semaphore and thread pool don't use the same lock object, meaning when a thread in the thread pool completes a task, it first calls the semaphore's release()
method, then returns. At this point, the thread might not be immediately recycled, but the task-submitting main thread is awakened and begins executing the submit()
method. In other words, there's a delay between semaphore release and thread pool recycling. During this delay, if a thread happens to be blocked at the acquire()
method and the thread pool queue is full, it will trigger the thread pool's rejection strategy.
Therefore, for the current design, if we want to ensure no tasks are rejected or discarded, the best solution is to adjust and optimize the parameters of the thread pool, semaphore, and waiting queue. For example, the waiting queue can be set larger, the semaphore can be set smaller within allowable range, etc. Meanwhile, using CallerRunsPolicy
ensures complete task execution even when the thread pool is full.
After adjusting the thread pool waiting queue to 30, core thread count to availableProcessor*2
, maximum thread count to availableProcessor*4
, and semaphore to 50, the frequency of task submission rejections decreased significantly. However, it's worth noting that our thread tasks might interact with databases, and thread execution time also affects this result. For example, if all threads in the thread pool wait for the same time and release semaphore locks simultaneously, the probability of the main thread's task submissions being rejected by the thread pool increases significantly. Thread task execution time needs to be estimated in your business scenario and these parameters adjusted accordingly.
These are my findings from investigating thread pool and semaphore issues during this development period. Corrections are welcome if there are any errors.
For an explanation of JDK thread pool recycling mechanism, you can refer to here, which provides a detailed and accessible description of the complete lifecycle of threads in the thread pool and analysis of some ingenious thread pool design aspects. Respect to fellow developers.