ThreadPool 之 线程池概览

[注] 本文中的源码基于 JDK1.8,源码中的注释为 JDK 中注释的翻译加上个人的理解。如有错误欢迎指正。

引言

因为进程的切换相当耗费资源,加上 CPU 的发展,操作系统中引入了线程的概念。相比于进程的上下文切换,线程的切换更轻量级,但是不代表没有开销,而且大部分多线程的生命周期都比较短,会发生频繁的线程创建、销毁动作,这也是相当消耗资源的,因此引入了线程池。

合理利用线程池能够带来三个好处。第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。第二:提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。第三:提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

线程池的结构

ThreadPool 的实现结构如下图所示:

ThreadPool 结构

Executor 接口

ThreadPoolExecutor 是最终的线程池实现类,顶层接口是 Executor,查看 Executor 源码,这个接口中只有一个方法:

public interface ExecutorSource {

        /**
         * 在未来某个时间执行参数中的命令,这个命令可能在一个新的线程、线程池中的线程或者一个调用线程(?)中被执行
         */
        void execute(Runnable command);
}

ExecutorService 接口

ExecutorService 接口继承了 Executor 接口,添加了一些对线程池的管理:

public interface ExecutorService extends Executor {

    /**
     * 有序地执行完之前提交的任务,但是不会接受新的任务。如果线程池已经被关闭,调用此方法没有额外的影响。
     */
    void shutdown();
    
    /**
     * 尝试停止所有正在运行的任务,停止等待中的线程,返回正在等待执行的任务列表
     */
    List<Runnable> shutdownNow();
    
    /**
     * 获取线程池是否已经被关闭
     */
    boolean isShutdown();

    /**
     * 如果所有的任务都已经被关闭了,返回 true,除非先调用 shutdown 或者 shutdownNow ,否则永远不会返回 true
     */
    boolean isTerminated();
    
    /**
     * 阻塞直到关闭请求后所有任务被完成,或者时间超时,或者线程被中断,不管哪一种情况先发生,根据先发生的情况返回值
     * 也就是说,获取线程池是否关闭,指定了一个时间,在这个时间之前被关闭的话,返回 true,如果超时还没有关闭,返回 false
     */
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;

    /**
     * 提交一个任务,会返回一个 Future , Future 可以返回任务的结果,当任务被成功完成之后可以通过 get 方法获取结果
     */
    <T> Future<T> submit(Callable<T> task);

    /**
     * 提交一个实现了 Runnable 接口的任务以及返回的结果(???不懂),返回 Future
     */
    <T> Future<T> submit(Runnable task, T result);

    /**
     * 提交一个实现了 Runnable 接口的任务,返回 Future
     */
    Future<?> submit(Runnable task);

    /**
     * 执行给定的任务,当所有任务完成后返回带有任务状态和结果的 Future 列表。
     * Future.isDone对于返回的列表的每个元素都是正确的。 要注意的是,完成的任务可能会正常终止或抛出异常。
     * 如果在执行任务过程中修改了任务的集合,则这个方法的结果是未定义的。
     */
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;

    /**
     * 和 invokeAll(Collection<? extends Callable<T>> tasks) 类似
     * 只不过如果在截止时间之前没完成的任务都会被取消,不再执行
     */
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;

    /**
     * 执行给定的任务,直到一个任务完成或者抛出异常,其他未执行的任务将不再执行,返回被执行完的任务的结果
     */
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;

    /**
     * 和 invokeAny(Collection<? extends Callable<T>> tasks) 类似,加上了超时机制
     */
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

AbstractExecutorService 抽象类

AbstractExecutorService 抽象类实现了 ExecutorService 接口,对一些方法实现了默认实现。

public abstract class AbstractExecutorService implements ExecutorService {

    /**
     * 根据传进来的 Runnable 和 value 构造一个 RunnableFuture
     * RunnableFuture 是继承了 Runnable 和 Future 接口的接口
     */
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }

    /**
     * 根据 Runnable 构建 RunnableFuture
     */
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }

    /**
     * 提交任务,从这里可以看到,内部还是调用了 execute 方法
     */
    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

    /**
     * 同 submit,只不过构建的是一个带有 result 的 FutureTask
     */
    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }

    /**
     * 同上
     */
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

    /**
     * invokeAny 内部都调用这个方法
     * 三个参数,第一个表示要提交的任务,第二个表示是否是有时间限制的,第三个表示时间
     */
    private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                              boolean timed, long nanos)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (tasks == null)
            throw new NullPointerException();
        int ntasks = tasks.size();
        if (ntasks == 0)
            throw new IllegalArgumentException();
        ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
        
        // 将自己本身作为参数传入 ExecutorCompletionService 的构造函数
        // 在这个类内部只用了 AbstractExecutorService 的 submit 方法
        ExecutorCompletionService<T> ecs = new ExecutorCompletionService<T>(this);

        try {
            // 记录异常,如果最终没有获得任何结果,就抛出这个异常
            ExecutionException ee = null;
            // 记录终止操作的时间
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            Iterator<? extends Callable<T>> it = tasks.iterator();

            // 提交一个任务,其他的等下看情况再提交
            futures.add(ecs.submit(it.next()));
            --ntasks;
            int active = 1;

            for (;;) {
                Future<T> f = ecs.poll(); // 从队列中出队一个任务的结果
                if (f == null) { // 如果没有结果,那么就继续从提交的任务中选取下一个执行
                    if (ntasks > 0) {
                        --ntasks;
                        futures.add(ecs.submit(it.next()));
                        ++active;
                    }
                    else if (active == 0)
                        break;
                    else if (timed) {
                        f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
                        if (f == null)
                            throw new TimeoutException();
                        nanos = deadline - System.nanoTime();
                    }
                    else
                        f = ecs.take();
                }
                if (f != null) { // 如果已经有任务执行完毕,那么就返回结果
                    --active;
                    try {
                        return f.get();
                    } catch (ExecutionException eex) {
                        ee = eex;
                    } catch (RuntimeException rex) {
                        ee = new ExecutionException(rex);
                    }
                }
            }
            // 执行到这里还没有返回,那么就抛出异常
            if (ee == null)
                ee = new ExecutionException();
            throw ee;

        } finally { // 最后将没有执行的任务取消
            for (int i = 0, size = futures.size(); i < size; i++)
                futures.get(i).cancel(true);
        }
    }

    // 其他的 invokeAny 都是基于 doInvokeAny 的,就不贴源码了
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        // code......
    }

    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
        // code......
    }

    /**
     * 执行所有任务,直到所有任务完成或者出现异常才返回
     */
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException {
        if (tasks == null)
            throw new NullPointerException();
        ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
        boolean done = false;
        try {
            // 执行所有的任务
            for (Callable<T> t : tasks) {
                RunnableFuture<T> f = newTaskFor(t);
                futures.add(f);
                execute(f);
            }
            for (int i = 0, size = futures.size(); i < size; i++) {
                Future<T> f = futures.get(i);
                if (!f.isDone()) { // 如果任务还没有完成,就等待任务完成
                    try {
                        f.get();
                    } catch (CancellationException ignore) {
                    } catch (ExecutionException ignore) {
                    }
                }
            }
            done = true;
            return futures;
        } finally { // 如果发生了异常,取消没有执行的任务
            if (!done)
                for (int i = 0, size = futures.size(); i < size; i++)
                    futures.get(i).cancel(true);
        }
    }

    /**
     * 和一般的 invokeAll 类似,但是加上了判断是否在指定时间内执行完毕
     * 只要到指定时间,不管任务执行完没有,都直接返回结果
     */
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                         long timeout, TimeUnit unit)
        throws InterruptedException {
        if (tasks == null)
            throw new NullPointerException();
        long nanos = unit.toNanos(timeout);
        ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
        boolean done = false;
        try {
            for (Callable<T> t : tasks)
                futures.add(newTaskFor(t));

            final long deadline = System.nanoTime() + nanos;
            final int size = futures.size();

            // Interleave time checks and calls to execute in case
            // executor doesn't have any/much parallelism.
            for (int i = 0; i < size; i++) {
                execute((Runnable)futures.get(i));
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L)
                    return futures;
            }

            for (int i = 0; i < size; i++) {
                Future<T> f = futures.get(i);
                if (!f.isDone()) {
                    if (nanos <= 0L)
                        return futures;
                    try {
                        f.get(nanos, TimeUnit.NANOSECONDS);
                    } catch (CancellationException ignore) {
                    } catch (ExecutionException ignore) {
                    } catch (TimeoutException toe) {
                        return futures;
                    }
                    nanos = deadline - System.nanoTime();
                }
            }
            done = true;
            return futures;
        } finally { // 如果最后有任务没有执行完,取消没执行的任务
            if (!done)
                for (int i = 0, size = futures.size(); i < size; i++)
                    futures.get(i).cancel(true);
        }
    }

}

由于贴的代码太多,如果所有内容都写到一篇文章里太过冗长,因此分开写了,下篇 ThreadPool 之线程池实现类 ThreadPoolExecutor 将研究线程池的最终实现类 ThreadPoolExecutor

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,602评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,442评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,878评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,306评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,330评论 5 373
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,071评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,382评论 3 400
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,006评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,512评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,965评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,094评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,732评论 4 323
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,283评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,286评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,512评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,536评论 2 354
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,828评论 2 345

推荐阅读更多精彩内容