Java线程-Fork/Join框架学习(九)

一、前言

Fork/Join框架是Java 1.7之后引入的基于分治算法的并行框架,官网文档是这么介绍的:

  1. Fork/Join框架是ExecutorService接口的一种具体实现,可以更好的帮助您利用多个处理器;它是为那些可以递归地分割成小块的工作而设计的,该框架的目标是使用所有可用的处理能力来提高应用程序的性能。
  2. 与任何ExecutorService实现一样,Fork/Join框架也会将任务分发给线程池中的工作线程去执行,Fork/Join框架的独特之处在于它使用了一种工作窃取算法(work-stealing),也就是说完成自己的工作而处于空闲的工作线程能够从其他处于忙碌(busy)状态的工作线程处窃取等待执行的任务。
  3. Fork/Join框架的核心类是ForkJoinPool,ForkJoinPool实现了工作偷取算法,并可以执行ForkJoinTask任务,得到计算结果。

本文所使用的JDK版本是 JDK 8.0

二、Fork/Join框架介绍

1. Fork/Join简介

  Fork/Join的并行算法的原理是分治算法,通俗的来讲就是,将大任务分割成足够小的小任务,然后让线程池中不同的线程来执行这些分割出来的小任务,小任务完成之后再将小任务的结果合并成大任务的结果。也就是fork子任务,join返回结果。典型的用法如下:

Result solve(Problem problem) {
    if (problem is small) {
        directly solve problem
    } else {
        split problem into independent parts
        fork new subtasks to solve each part
        join all subtasks
        compose result from subresults
    }
}
2. Fork/Join框架中的类介绍

Fork/Join框架的几个核心的类如下:

最核心的类是ForkJoinPool,该类接受的任务对象是ForkJoinTask,ForkJoinTask是一个抽象类,它有两个常用的子类:RecursiveTask(有返回值)和RecursiveAction(无返回值),一般情况下,我们不需要直接使用ForkJoinTask,而是通过继承它的两个子类,并实现对应的抽象方法 —— compute来定义我们的任务。

ForkJoinTask类相关.png
ForkJoinPool类相关.png
3. ForkJoinPool类

我们先来看下ForkJoinPool这个类,来看下这个类的构造方法及常用的方法。

3.1 ForkJoinPool构造方法

ForkJoinPool的构造方法共有3个,但最终都会调用同一个构造方法。

public ForkJoinPool() {
    this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
         defaultForkJoinWorkerThreadFactory, null, false);
}

public ForkJoinPool(int parallelism) {
    this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
}

public ForkJoinPool(int parallelism,
                    ForkJoinWorkerThreadFactory factory,
                    UncaughtExceptionHandler handler,
                    boolean asyncMode) {
    this(checkParallelism(parallelism),
         checkFactory(factory),
         handler,
         asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
         "ForkJoinPool-" + nextPoolId() + "-worker-");
    checkPermission();
}

我们来看一下构造方法中涉及到的参数:

  1. parallelism,并行度,也可以说是工作线程数量,默认是系统可用处理器的数量,也就是逻辑CPU的个数,最小是1;
  2. ForkJoinWorkerThreadFactory,创建工作线程的工厂,工作线程的对象是ForkJoinWorkerThread
  3. UncaughtExceptionHandler,处理工作线程发生异常的异常处理类,默认是null;
  4. asyncMode,同步或异步模式,如果是true的话,那么在处理任务时工作线程的模式为FIFO 顺序,这种模式下的ForkJoinPool更像是一个队列的形式,并且任务不能被合并,默认是false;
3.2 ForkJoinPool公共池

  在很多情况下,如果没有特殊的应用需求,我们一般可以直接使用ForkJoinPool中的common池。ForkJoinPool提供了一种公共池,可以用来处理那些没有被显式提交到任何线程池的任务,并且可以通过指定系统参数的方式定义“并行度、线程工厂和异常处理类”,并且它指定了mode模式为LIFO_QUEUE,也就是说可以支持任务合并(join),来看一下它的主要代码:

private static ForkJoinPool makeCommonPool() {
    int parallelism = -1;
    ForkJoinWorkerThreadFactory factory = null;
    UncaughtExceptionHandler handler = null;
    try {  // ignore exceptions in accessing/parsing properties
        String pp = System.getProperty
            ("java.util.concurrent.ForkJoinPool.common.parallelism");
        String fp = System.getProperty
            ("java.util.concurrent.ForkJoinPool.common.threadFactory");
        String hp = System.getProperty
            ("java.util.concurrent.ForkJoinPool.common.exceptionHandler");
        if (pp != null)
            parallelism = Integer.parseInt(pp);
        if (fp != null)
            factory = ((ForkJoinWorkerThreadFactory)ClassLoader.
                       getSystemClassLoader().loadClass(fp).newInstance());
        if (hp != null)
            handler = ((UncaughtExceptionHandler)ClassLoader.
                       getSystemClassLoader().loadClass(hp).newInstance());
    } catch (Exception ignore) {
    }
    if (factory == null) {
        if (System.getSecurityManager() == null)
            factory = defaultForkJoinWorkerThreadFactory;
        else // use security-managed default
            factory = new InnocuousForkJoinWorkerThreadFactory();
    }
    if (parallelism < 0 && // default 1 less than #cores
        (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
        parallelism = 1;
    if (parallelism > MAX_CAP)
        parallelism = MAX_CAP;
    return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,
                            "ForkJoinPool.commonPool-worker-");
}

首先,支持我们定义系统参数parallelismthreadFactoryexceptionHandler,其次,该方法最后也调用了ForkJoinPool的构造方法,并且指定了mode模式为LIFO_QUEUE

当然,ForkJoinPool提供了commonPool方法可以直接获取公共池:

public static ForkJoinPool commonPool() {
    // assert common != null : "static init error";
    return common;
}
3.3 执行ForkJoinTask

使用ForkJoinPool ,我们有三个方法来执行ForkJoinTask任务:invoke方法submit方法execute方法

public <T> T invoke(ForkJoinTask<T> task)
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task)
public void execute(ForkJoinTask<?> task)
  1. invoke方法,用来执行有返回值的任务,并且该方法是阻塞的,直到任务执行完毕,该方法才会停止阻塞并返回任务的执行结果;
  2. execute方法,用来执行没有返回值的任务,该方法同样是阻塞的,并且除了从Executor接口中继承的execute方法外,ForkJoinPool 也定义了用来执行ForkJoinTask 的 execute方法;
  3. submit方法,用来执行有返回值的任务,该方法是非阻塞的,调用之后将任务提交给 ForkJoinPool 去执行便立即返回,返回已经提交到ForkJoinPool去执行的task,同样该方法除了从ExecutorService 接口继承的submit方法外,也重载了用来执行ForkJoinTask的方法;
4. 工作窃取算法

我们这里来简单介绍下work-stealing算法的基本调度策略:

  1. 线程池中的每一个工作线程维护自己的调度队列中的可运行任务;
  2. 队列是一个双端队列,既支持后进先出(LIFO的push和pop操作),还支持先进先出 (FIFO的take操作);
  3. 对于一个给定的工作线程来说,任务所产生的子任务将会被放入到工作者自己的双端队列中;
  4. 工作线程使用后进先出 (LIFO,最新的元素优先) 的顺序,通过弹出任务来处理队列中的任务;
  5. 当一个工作线程的本地没有任务去运行的时候,它将使用先进先出(FIFO)的规则尝试随机的从别的工作线程中拿(『窃取』)一个任务去运行;
  6. 当一个工作线程触及了join操作时,如果需要join的任务尚未完成,那会先处理其他任务,直到目标任务被告知已经结束(通过isDone方法);
  7. 当一个工作线程无任务可执行,并且无任务可窃取或者这中间发生了异常,获取任务和失败处理的时候,它就会退出(通过yield、sleep或者优先级调整)并经过一段时间之后再度尝试直到所有的工作线程都被告知他们都处于空闲的状态。在这种情况下,他们都会阻塞直到其他的任务再度被上层调用;

  使用后进先出 (LIFO) 用来处理每个工作线程的自己任务,但是使用先进先出 (FIFO) 规则用于获取别的任务,这是一种被广泛使用的进行递归Fork/Join设计的一种调优手段。让窃取任务的线程从队列拥有者相反的方向进行操作会减少线程竞争,同样体现了递归分治算法的大任务优先策略。

5. ForkJoinTask类

  我们再来简单说下ForkJoinTask类。前面我们也说过,该抽象类继承自ForkJoinTask接口,所以它可以有返回值。Fork/Join框架最主要的两个流程就是fork流程和join流程,所以我们主要来看下 ForkJoinTask 的fork方法join方法

5.1 fork方法

fork方法用于将大任务拆分为小任务,然后执行小任务,来简单看下代码:

public final ForkJoinTask<V> fork() {
    Thread t;
    if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
        ((ForkJoinWorkerThread)t).workQueue.push(this);
    else
        ForkJoinPool.common.externalPush(this);
    return this;
}
  • 首先取到当前线程,然后判断线程类型是否是ForkJoinPool中的工作线程;
  • 如果是,说明是fork分割的子任务,然后将任务添加到这个线程对应的任务队列中,等待被执行;
  • 如果当前线程不是ForkJoinWorkerThread类型的线程,那么就会将该任务提交到公共池的随机的队列中去;
5.2 join方法

join方法会获取所有子任务的执行结果,然后递归的合并结果:

public final V join() {
    int s;
    if ((s = doJoin() & DONE_MASK) != NORMAL)
        reportException(s);
    return getRawResult();
}

这里源码就部多说了,有点复杂,只简单说下大致的流程:

  1. 获取当前线程,然后判断线程类型是否是ForkJoinPool中的工作线程;
  2. 如果不是,阻塞当前线程(awaitJoin),等待任务执行完成;
  3. 如果是,检查任务的执行状态,如果任务已经完成直接返回结果;如果没有完成,并且在自己的任务队列内,则执行该任务;
  4. 而如果任务被其他工作线程窃取,则窃取这个偷取者队列内的任务(FIFO),然后帮助这个窃取者执行它的任务。基本思想是:偷取者帮助我执行任务,我去帮助偷取者执行它的任务;
  5. 在帮助偷取者执行任务后,如果调用者发现自己队列已经有任务,则依次弹出自己的任务(LIFO)并执行;
  6. 如果窃取者已经把自己的任务做完,正在等待着需要join的任务时,则找到偷取者的偷取者,帮助它完成它的任务;
  7. 循环执行上面的操作;

子任务执行完的结果会统一放在一个队列里,然后启动一个线程从队列里拿数据,最后合并这些数据。
至于源码的解读,可详细参考地址:JUC源码分析-线程池篇(五):ForkJoinPool - 2,博主分析的特别详细。

6. 代码示例

下面通过两个简单的例子来看一下ForkJoinPool和ForkJoinTask的使用。

6.1 RecursiveAction无返回值

第一个例子来看一下RecursiveAction的使用,无返回值,打印一些随机数:

package task;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.TimeUnit;

/**
 * 打印数值,无返回值
 */
class RecursiveActionTest extends RecursiveAction {
    /**
     * 阈值,每个"小任务"最多只打印5个数
     */
    private static final int MAX = 5;

    private int start;
    private int end;

    private RecursiveActionTest(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected void compute() {
        // 当end-start的值小于MAX时候,开始打印
        if ((end - start) < MAX) {
            for (int i = start; i < end; i++) {
                System.out.println(Thread.currentThread().getName() + "的i值:" + i);
            }
        } else {

            // 将大任务分解成两个小任务
            int middle = (start + end) / 2;
            RecursiveActionTest left = new RecursiveActionTest(start, middle);
            RecursiveActionTest right = new RecursiveActionTest(middle, end);

            // 并行执行两个小任务
            left.fork();
            right.fork();
        }
    }

    public static void main(String[] args) throws Exception {
        // 默认线程数 逻辑CPU的个数
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        // 提交可分解的PrintTask任务
        forkJoinPool.submit(new RecursiveActionTest(0, 20));
        // 阻塞当前线程直到 ForkJoinPool 中所有的任务都执行结束
        forkJoinPool.awaitTermination(2, TimeUnit.SECONDS);
        // 关闭线程池
        forkJoinPool.shutdown();
    }
}

打印结果:

ForkJoinPool-1-worker-2的i值:5
ForkJoinPool-1-worker-3的i值:13
ForkJoinPool-1-worker-1的i值:0
ForkJoinPool-1-worker-0的i值:3
ForkJoinPool-1-worker-1的i值:1
ForkJoinPool-1-worker-3的i值:14
ForkJoinPool-1-worker-2的i值:6

这里只列举了部分值,可以看到,ForkJoinPool启动了4个线程来执行这个任务,因为我电脑的逻辑CPU个数是4个,并且可以看到分解后的任务是并行执行的,并不是顺序执行的。

6.1 RecursiveTask有返回值

下面来看一下RecursiveTask,计算一个整数数组的和:

package task;

import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

/**
 * 计算一个大的整数数组的和
 *
 * @author zhangkeke
 * @since 2018/9/27 15:09
 */
public class RecursiveTaskTest extends RecursiveTask<Integer> {
    /**
     * 阈值,每个小任务处理的数量
     */
    private static final int THRESHOLD = 5;
    private int[] array;
    private int low;
    private int high;

    private RecursiveTaskTest(int[] array, int low, int high) {
        this.array = array;
        this.low = low;
        this.high = high;
    }

    @Override
    protected Integer compute() {
        int sum = 0;
        if (high - low <= THRESHOLD) {
            // 小于阈值则直接计算
            for (int i = low; i < high; i++) {
                sum += array[i];
            }
        } else {
            // 一个大任务分割成两个子任务
            int mid = (low + high) >>> 1;
            RecursiveTaskTest left = new RecursiveTaskTest(array, low, mid);
            RecursiveTaskTest right = new RecursiveTaskTest(array, mid + 1, high);

            // 异步执行
            left.fork();
            right.fork();
            // 以上两行也可以使用 invokeAll(left,right);
            // invokeAll方法会执行很多任务,并且会阻塞,直到这些任务都执行完成

            // 结果合并
            sum = left.join() + right.join();
        }
        return sum;
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        int[] array = new int[20];
        for (int i = 0; i < array.length; i++) {
            array[i] = new Random().nextInt(100);
        }
        System.out.println(Arrays.toString(array));

        // 开始创建任务
        RecursiveTaskTest sumTask = new RecursiveTaskTest(array, 0, array.length - 1);
        // 创建ForkJoinPool线程池
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        // 提交任务到线程池
        forkJoinPool.submit(sumTask);
        // 获取结果,get方法会阻塞
        Integer result = sumTask.get();
        System.out.println("计算结果:" + result);
    }
}

计算结果为:

[38, 98, 45, 49, 76, 74, 43, 18, 11, 73, 56, 0, 91, 7, 80, 2, 85, 88, 96, 87]
计算结果:801

该部分代码来自:https://blog.csdn.net/ouyang_peng/article/details/46491217

6.3 fork方法问题

在网上看到有人说在进行计算的时候,fork方法有点问题,代码是:

left.fork();
right.fork();

应该使用invokeAll的方式,这样可以避免线程的浪费,实现线程的重用:

invokeAll(left, right);

记不清这是哪里的问题了,有时间再研究下这块。

三、总结

  到这里基本上就大致介绍完了Fork/Join框架的流程了,不过我们需要注意下该框架的适用场景,其实也就是分治算法的适用场景,毕竟Fork/Join框架也可以看作是分治算法的并发版本了。

  1. Fork/Join框架的适用场景,简单来说就是密集型计算,也就是我们的任务可以拆分成足够小的任务,并且可用根据小任务的结果来组装大任务的结果;比如求一个大数组中的最大值/最小值,求和,排序等类似操作。
  2. 需要注意的是,选取划分子任务的粒度(分割任务的阈值,也就是临界值)对ForkJoinPool执行任务的效率有很大影响,使用Fork/Join框架并不一定比顺序执行任务的效率高。
    • 如果阈值选取过大,任务分割的不够细,则不能充分利用CPU资源;
    • 阈值太小,则可能会产生过多的子任务,那么子任务的调度开销可能会大于并行计算的性能开销,并且我们还需要考虑创建子任务、fork()子任务、线程调度以及合并子任务处理结果的耗时以及相应的内存消耗;
  3. 官方文档给出的粗略经验是:任务应该执行100~10000个基本的计算步骤。决定子任务的粒度的最好办法是实践,通过实际测试结果来确定这个阈值才是最明智的做法。

再简单说下 ForkJoinPoolThreadPoolExecutor 的区别:

  1. ForkJoinPool 和 ThreadPoolExecutor 都是 ExecutorService(线程池)的实现,但ForkJoinPool 的独特点在于:ThreadPoolExecutor 只能执行 Runnable 和 Callable 任务,而 ForkJoinPool 不仅可以执行 Runnable 和 Callable 任务,还可以执行 Fork/Join 型任务ForkJoinTask,从而满足并行地实现分治算法的需要;
  2. ThreadPoolExecutor 中任务的执行顺序是按照其在共享队列中的顺序来执行的,所以后面的任务需要等待前面任务执行完毕后才能执行,而 ForkJoinPool 每个线程有自己的任务队列,并在此基础上实现了 Work-Stealing 的功能,使得在某些情况下 ForkJoinPool 能更大程度的提高并发效率。

除了JDK的文档,本文还参考自:
《Java并发编程实战》
并发编程网 - Fork and Join: Java也可以轻松地编写并发程序
并发编程网 - 聊聊并发(八)——Fork/Join框架介绍
Java 多线程(5):Fork/Join 型线程池与 Work-Stealing 算法

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,491评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,856评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,745评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,196评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,073评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,112评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,531评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,215评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,485评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,578评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,356评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,215评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,583评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,898评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,174评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,497评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,697评论 2 335