Skip to content

Java多线程与信号量探究

前言

最近在写一个文件批处理框架,用到了Java多线程池。业务需要:

  1. 提供一个在多线程场景下能够捕捉线程池内线程异常并且上抛到调用方的机制,以实现在多线程提交过程当中,出现异常快速失败FailFast的功能。
  2. 能够控制任务提交到线程池的速度

对于任务提交速度的控制,使用Semaphore信号量进行控制。

对于捕获多线程执行异常的情况,方案有两种:

  1. 在提交到线程池的Runnable中,对业务逻辑做异常捕获,如果抛出异常,则将标志位写入Redis,任务提交的代码每隔N个任务检查一次Redis标记位,如果发现异常,则立即停止任务提交。
  2. 在线程池中对submit()方法返回的Future<?>对象进行缓存,每隔N个任务遍历一次缓存的Future<?>,通过调用其get()方法,如果异常存在则直接抛出。

ThreadPoolExecutor

先回顾一下JDK提供的经典线程池ThreadPoolExecutor 构造方法:

java
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        // some basic verification code
    }
  1. corePoolSize 用于设置线程池核心线程数(除非用的AsynchronousQueue,否则核心线程意味着不会被线程池回收的线程)
  2. maximumPoolSize 用于设置线程池最大线程数(在keepAliveTime后,非核心线程会被线程池回收)
  3. keeyAliveTime用于设置非核心线程存活时间
  4. unit作为TimeUnit类型用于设置存活时间单位(微秒,毫秒,秒,分,时,日)
  5. workQueue提供一个任务队列给线程池使用,常用的有:
    1. LinkedBlockingQueue链表实现,可以设置队列长度,不设置的话默认为无界队列(不推荐使用无界队列,容易资源耗尽)
    2. ArrayBlockingQueue数组实现,必须设置队列长度,因为在线程池中要经常进行出入队操作,所以链表实现会比数组实现性能要高一些(纯个人猜想,没有做过验证)
    3. PriorityBlockingQueue优先级队列,数组实现,二分查找实现排序,必须设置队列长度
  6. threadFactory线程工厂,线程池内部调用工厂获取线程,不设置的话就用DefaultThreadFactory
  7. handler线程池拒绝策略,线程池达到最大且等待队列满的情况下调用拒绝策略,JDK提供了四个默认的拒绝策略实现:
    1. AbortPolicy直接拒绝,抛出RejectedExecutionException给线程池调用方
    2. CallerRunsPolicy调用者执行策略,即调用方来执行任务
    3. DiscardOldestPolicy扔掉等待队列中的头节点,即等待时长最久的任务,不抛出异常,继续执行当前提交的任务(不推荐)
    4. DiscardPolicy扔掉当前提交到线程池的任务,不抛出异常,线程池继续运行(不推荐)

上述就是线程池的全部构造参数,这里不做展开,内容很简单,可以直接看源代码就行。

添加线程的方法如下:

java
public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

JDK官方文档对于线程添加做了比较详细的注释:

  1. 如果当前线程数小于核心线程数,则添加核心线程运行当前提交到队列中的任务
  2. 如果当前线程数大于核心线程数,尝试往队列添加任务,如果能添加成功则添加非核心线程(这里做了一个双重检查,避免出现添加任务过程中线程池关闭的情况)
  3. 如果线程数已经达到线程池上限,且等待队列已满,则调用reject方法,根据构造函数提供的拒绝策略进行处理

以上就是线程池的基本使用规则。

Semaphore

在很多语言中,都有信号量的机制,Java也不例外,对于信号量的使用,本身是比较简单的一件事情,源码也不多,常用方法如下:

  1. acuire()获取一个信号量,如果当前无可用信号量,则阻塞当前线程

    java
    private void doAcquireSharedInterruptibly(int arg)
            throws InterruptedException {
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head) {
                        int r = tryAcquireShared(arg);
                        if (r >= 0) {
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            failed = false;
                            return;
                        }
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        throw new InterruptedException();
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
  2. release()释放一个信号量,发出unpark指令唤起等待队列

业务逻辑实现

通过对SemaphoreThreadPoolExecutor的搭配,我们可以实现一个理论上可行的能够控制任务提交至线程池速度,并能够捕获线程池中线程异常的封装(注意为什么是理论上可行,后续会详细描述。。。这也是我写这篇文章的原因)

先来看我预先实现的代码:

AsyncThrowableExecutor.java

java
package com.zhaoyingjie;

import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.*;

public class AsyncThrowableExecutor {
    private final Semaphore semaphore;
    private final ExecutorService executorService;
    private final List<Future<?>> futures;
    private final int futureListSize;

    public AsyncThrowableExecutor() {
        executorService = new ThreadPoolExecutor(5, 10, 100, TimeUnit.SECONDS, new LinkedBlockingQueue<>(5), new LoggingCallerRunPolicy());
        semaphore = new Semaphore(15);
        futureListSize = 10;
        futures = new LinkedList<>();
    }

    public ExecutorService executorService() {
        return this.executorService;
    }

    public void execute(final Runnable runnable) throws InterruptedException {
        checkFutures();
        semaphore.acquire();
        try {
            Future<?> future = executorService.submit(() -> {
                try {
                    runnable.run();
                } finally {
                    semaphore.release();
                }
            });
            futures.add(future);
        } catch (Exception e) {
            semaphore.release();
            throw e;
        }
    }

    private void checkFutures() {
        if (futures.size() >= futureListSize) {
            try {
                for (Future<?> future : futures) {
                    future.get();
                }
            } catch (ExecutionException | InterruptedException e) {
                throw new RuntimeException(e);
            }
            futures.clear();
        }
    }
}

Main.java

java
package com.zhaoyingjie;

import java.util.concurrent.atomic.AtomicLong;

public class Main {

    public static void main(String[] args) throws InterruptedException {
        AsyncThrowableExecutor executor = new AsyncThrowableExecutor();
        AtomicLong atomicLong = new AtomicLong(0);
        long begin = System.currentTimeMillis();
        for (int i = 0; i < 20; i++) {
            try {
                executor.execute(() -> {
                    try {
                        Thread.sleep(100L);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                });
            } catch (Exception e) {
                System.out.printf("Error occurred in task %d, %s\n", i, e.getMessage());
            }
            System.out.printf("[Submitted %d task]     %s\n", atomicLong.incrementAndGet(), executor.executorService());
        }
        long end = System.currentTimeMillis();
        Thread.sleep(5000L);
        System.out.printf("Atomic integer %d, cost %d ms\n", atomicLong.get(), end - begin);
    }
}

LoggingCallerRunPolicy.java

java
package com.zhaoyingjie;

import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicLong;

public class LoggingCallerRunPolicy extends ThreadPoolExecutor.CallerRunsPolicy {
    private final AtomicLong callerRunTimes = new AtomicLong(0);

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        System.out.printf("Caller runs %d, Executor info %s\n", callerRunTimes.incrementAndGet(), executor);
        super.rejectedExecution(r, executor);
    }
}

以上就是这个功能实现的全部设计代码。

问题发现与研究

原本我预想的是,控制住信号量大小,使其刚好等于maximumPoolSize+linkedBlockingQueue阻塞队列的大小,那么在主线程提交任务的时候,如果阻塞队列满了并且达到线程池最大线程数,就会在调用acquire()方法的时候阻塞住,等到有线程执行完毕上一个任务,调用release()方法后,主线程再继续进行任务的提交,然鹅理想很丰满,现实很骨感,我发现了一个神奇的现象,控制台输出如下:

bash
[Submitted 1 task] [Running, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 0]
[Submitted 2 task] [Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
[Submitted 3 task] [Running, pool size = 3, active threads = 3, queued tasks = 0, completed tasks = 0]
[Submitted 4 task] [Running, pool size = 4, active threads = 4, queued tasks = 0, completed tasks = 0]
[Submitted 5 task] [Running, pool size = 5, active threads = 5, queued tasks = 0, completed tasks = 0]
[Submitted 6 task] [Running, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
[Submitted 7 task] [Running, pool size = 5, active threads = 5, queued tasks = 2, completed tasks = 0]
[Submitted 8 task] [Running, pool size = 5, active threads = 5, queued tasks = 3, completed tasks = 0]
[Submitted 9 task] [Running, pool size = 5, active threads = 5, queued tasks = 4, completed tasks = 0]
[Submitted 10 task] [Running, pool size = 5, active threads = 5, queued tasks = 5, completed tasks = 0]
[Submitted 11 task] [Running, pool size = 6, active threads = 6, queued tasks = 5, completed tasks = 0]
[Submitted 12 task] [Running, pool size = 7, active threads = 7, queued tasks = 5, completed tasks = 0]
[Submitted 13 task] [Running, pool size = 8, active threads = 8, queued tasks = 5, completed tasks = 0]
[Submitted 14 task] [Running, pool size = 9, active threads = 9, queued tasks = 5, completed tasks = 0]
[Submitted 15 task] [Running, pool size = 10, active threads = 10, queued tasks = 5, completed tasks = 0]
[Caller runs 1 task] [Running, pool size = 10, active threads = 10, queued tasks = 4, completed tasks = 1]
[Submitted 16 task] [Running, pool size = 10, active threads = 4, queued tasks = 0, completed tasks = 11]
[Submitted 17 task] [Running, pool size = 10, active threads = 4, queued tasks = 1, completed tasks = 11]
[Submitted 18 task] [Running, pool size = 10, active threads = 4, queued tasks = 2, completed tasks = 11]
[Submitted 19 task] [Running, pool size = 10, active threads = 4, queued tasks = 3, completed tasks = 11]
[Submitted 20 task] [Running, pool size = 10, active threads = 4, queued tasks = 4, completed tasks = 11]

可以看到,随着任务提交,第1至第5任务,pool size每次+1,第6至第10任务,queued tasks每次+1,第11至第15任务,pool size每次+1,符合我们上面提到的线程池添加线程的规则。

但是注意看上述输出的第16行,线程池调用了我自己写的拒绝策略,打印了一行日志,也就是说,我刚才设置的信号量并没有完全控制住提交任务的线程,在任务提交的时候被线程池拒绝了,所以才能看到上述日志。发现这个问题之后,我看了信号量和线程池的源代码很久,也没看出什么问题,并且这个现象随着我调整信号量的大小,线程池大小,任务提交次数(外层for循环)以及任务子线程占用时间(Thread.sleep()方法),都依旧可以复现。就在我百思不得其解的时候,我突然想到一个问题,我们仔细看上面第16行日志,此时线程池的completed tasks为1,也就是说,在那一瞬间,有一个线程完成了任务的执行,然后调用了release方法,使得原本阻塞在acquire()方法出的主线程被唤醒继续执行提交任务,也就是说:

信号量和线程池并不是用的同一个锁对象,意味着,线程池内的线程执行完任务后,先调用release()信号量方法,然后再return,那么此时线程不一定能够立刻被回收,但是提交任务的主线程被唤醒了,开始执行submit()方法,换句话说,信号量的释放和线程池的回收存在一定的延迟,这个延迟时间内,如果刚好有线程阻塞在了acquire()方法那里,并且线程池内队列已满,那么就会触发线程池拒绝策略。

所以对于目前的设计,如果要保证没有任务被拒绝或者丢弃,最好的解决方案就是调整优化线程池、信号量、等待队列的各参数大小,譬如等待队列可以设置大一些,信号量在允许范围内调小一些,等等,于此同时,使用CallerRunsPolicy, 即便出现线程池满了的情况下,依旧可以保证任务的完整执行。

我将线程池等待队列调整为30,核心线程数调整为availableProcessor*2,最大线程数调整为abailableProcessor*4,信号量设置为50之后,出现任务提交被拒绝的次数明显少了很多,但是值得注意的是,我们的线程执行的任务可能是和数据库进行交互的,线程执行时间也会对这个结果产生影响,例如,线程池内所有线程在等待相同时间后,同一时刻同时释放信号量锁,此时主线程进行任务提交产生的被线程池拒绝 的概率就会大大增加,线程执行任务这个耗时需要在自己的业务场景下进行估算,并依此对上述参数进行调整。

以上就是我在这次开发期间遇到的线程池和信号量问题的一点探究。如有错误,欢迎各位指正。

关于JDK线程池回收机制的讲解,可以参考这里,里面详细且通俗的描述了线程池内线程的完整生命周期以及一些线程池设计精巧之处的解析,向同行致敬。

许可协议

本文章采用 CC BY-NC-SA 4.0 许可协议进行发布。您可以自由地:

  • 共享 — 在任何媒介以任何形式复制、发行本作品
  • 演绎 — 修改、转换或以本作品为基础进行创作

惟须遵守下列条件:

  • 署名 — 您必须给出适当的署名,提供指向本许可协议的链接,同时标明是否(对原始作品)作了修改。您可以用任何合理的方式来署名,但是不得以任何方式暗示许可人为您或您的使用背书。
  • 非商业性使用 — 您不得将本作品用于商业目的。
  • 相同方式共享 — 如果您再混合、转换或者基于本作品进行创作,您必须基于与原先许可协议相同的许可协议分发您贡献的作品。

上次更新时间: