高并发编程/并行任务组件ForkJoinPool设计图分解(高手篇)

image.png

ForkJoinTask 是 Java 并发编程中的强大工具,专为大规模并行计算设计。它通过将大型任务分解成小块(fork),并在多个处理器上并行执行这些小块,然后将结果合并(join),实现了高效的并行处理。这种分治策略不仅简化了并行编程,还充分利用了多核处理器的能力,特别适用于计算密集型任务。如果你是并发编程的爱好者或需要处理复杂计算任务的开发者,ForkJoinTask 提供了一种优雅且高效的解决方案。

历史热点文章

0、ForkJoinPool 业务执行流程

f7c7b706c070a72e3bff1cc7f9ebca4d_2b7ddba198484bd28382d5a34c8f559d.png

图说明:

  1. 提交任务到 ForkJoinPool:将一个 ForkJoinTask 任务提交到 ForkJoinPool 中执行。
  2. 任务分解(Fork) :任务被分解成多个子任务,这是 ForkJoinTask 设计的核心,允许任务递归地分解成更小的子任务。
  3. 子任务1、子任务2、更多子任务... :表示分解出来的多个子任务。
  4. 子任务执行:每个子任务被执行。
  5. 检查是否还有子任务:在子任务执行完成后,检查是否还有更多的子任务需要分解和执行。
  6. 任务合并(Join) :所有子任务完成后,父任务会合并所有子任务的结果。
  7. 返回结果:合并后的结果被返回。
  8. 结束:流程的终点。

1、ForkJoinTask 设计目的

ForkJoinTask 是 Java 7 引入的一个并行计算框架的核心组件,特别适用于可以被自然地分解为子任务的工作负载,这些子任务可以独立执行并最终合并结果。以下是 ForkJoinTask 的设计因素:

  1. 分治算法的并行化ForkJoinTask 的设计初衷是为了支持分治算法(Divide and Conquer)的并行计算。这种算法将大问题分解成小问题,递归解决小问题,然后将结果合并以解决原始问题。
  2. 利用多核处理器:随着多核处理器的普及,ForkJoinTask 提供了一种高效的方式来利用多核处理器的并行能力,通过将任务分解并行执行,从而提高程序的执行效率。
  3. 工作窃取算法ForkJoinPool 实现了工作窃取算法,允许空闲线程从其他线程的任务队列中“窃取”任务来执行,减少线程空闲时间,提高资源利用率。
  4. 递归分解任务ForkJoinTask 允许任务在执行过程中动态地将自身分解成更小的子任务,这些子任务可以并行执行,最终合并结果。
  5. 线程池友好ForkJoinTask 专为 ForkJoinPool 设计,与标准的 ThreadPoolExecutor 相比,它提供了更细粒度的任务管理,适合于可以并行执行的任务。
  6. 任务合并ForkJoinTask 的子类通常包含一个 join 方法,用于获取任务的结果。当一个任务被分解成多个子任务时,可以通过调用父任务的 join 方法来等待所有子任务完成并合并结果。
  7. 灵活性和扩展性ForkJoinTask 提供了 forkjoin 方法,允许开发者手动控制任务的分解和结果的合并,提供了极高的灵活性和扩展性。
  8. 内置任务:Java 提供了一些内置的 ForkJoinTask 实现,如 RecursiveActionRecursiveTaskRecursiveAction 用于没有返回结果的任务,而 RecursiveTask 用于有返回结果的任务。

2、ForkJoinTask 相关组件概述

  1. ForkJoinPool
    • ForkJoinPoolForkJoinTask 执行的核心,它实现了 ExecutorService 接口,并通过工作窃取算法来平衡线程间的工作负载。每个工作线程都有一个自己的任务队列,当一个线程完成任务后,它会尝试从其他线程的任务队列中“窃取”任务来执行。这种机制有助于平衡负载,使得所有线程都能保持忙碌状态,从而提高 CPU 的利用率。
  2. ForkJoinTask
    • ForkJoinTask 是一个抽象类,代表在 ForkJoinPool 中执行的轻量级任务。它有两个重要的子类:RecursiveActionRecursiveTaskRecursiveAction 用于没有返回结果的任务,而 RecursiveTask 用于有返回结果的任务。这两个子类都需要实现 compute() 方法来定义任务的逻辑。
  3. 工作窃取算法
    • 工作窃取算法允许空闲线程从其他线程的任务队列中“窃取”任务来执行。默认情况下,工作线程从自己的任务队列头部获取任务。当队列为空时,线程会从其他忙碌线程的队列尾部“窃取”任务,或者从全局入口队列中获取任务,因为这些地方最有可能存在较大的工作量。
  4. 任务的分解与合并
    • ForkJoinTask 的实现中,任务通常被递归地分解成更小的子任务(Fork),直到它们足够小,可以直接异步执行。然后,这些子任务的结果被递归地合并(Join)成一个单一的结果。
  5. ForkJoinTask 的方法
    • fork():将任务放入队列并安排异步执行。
    • join():等待任务完成并返回结果。
    • invoke():结合 fork()join(),启动任务,等待其结束并返回结果。
  6. 提交任务到 ForkJoinPool
    • 可以通过 ForkJoinPoolinvoke()execute()submit() 方法提交 ForkJoinTask 任务。invokeAll() 方法可以同时提交多个任务,并返回一个 Future 列表。

3、ForkJoinTask 业务组件设计

c51cb9b290a8d0568c26211d031e1a01_ff0dac8c606748be8d4dc7075cba1f8d.png
  1. 核心组件
    • ForkJoinPool:整个框架的中心节点。
    • 工作窃取算法:包括空闲线程窃取任务和平衡工作负载。
    • 双端队列:每个线程维护的任务队列。
    • 任务分解(Fork/Join):任务被分解成子任务,并行执行后合并结果。
  2. 任务管理
    • 任务类型:包括 RecursiveActionRecursiveTask
    • 线程管理:动态管理线程数量。
    • 局部性优化:任务通常由提交它们的线程执行。
  3. 异常与控制
    • 异常处理:捕获并处理任务执行过程中的异常。
    • 任务提交:包括 executesubmitinvoke 方法。
    • 任务同步:包括 getjoininvoke 方法来同步任务执行和获取结果。
    • 取消和超时:支持任务取消和超时控制。
  4. 监控与配置
    • 管理界面:监控和控制线程池行为。
    • 公平性和优先级:默认不支持,但可以通过自定义实现。
    • 任务的不可中断性:任务默认是不可中断的。

4、ForkJoinTask 工作窃取算法流程

e902b76a776f0a92fea60a6da4aad465_1f40d5648b264b50b16ba41ad78d1d16.png

步骤:

  1. 开始:并行计算的起点。
  2. 线程X - 任务队列X:每个线程都有自己的任务队列。
  3. 线程X执行任务:线程从自己的任务队列中取出任务并执行。
  4. 线程X空闲? :检查线程是否空闲(即任务队列为空)。
  5. 窃取任务:如果线程空闲,它会尝试从其他线程的任务队列中窃取任务。

5、ForkJoinTask 常用方法

5.1. 任务执行和控制

  • fork() :异步执行任务,不等待结果。
  • join() :等待任务完成并获取结果。

5.2. 结果处理

  • getRawResult() :获取任务的原始结果。
  • setRawResult(T value) :设置任务的结果。

5.3. 异常和取消

  • quietlyJoin(ForkJoinTask<> task) :等待任务完成,不抛出中断异常。
  • cancel(boolean mayInterruptIfRunning) :尝试取消任务的执行。

5.4. 任务组合和依赖

  • invokeAll(ForkJoinTask<>... tasks) :启动多个任务并等待它们全部完成。
  • invoke(ForkJoinTask<> task) :启动一个任务并等待其完成,返回结果。

5.5. 辅助方法

  • helpQuiesce() :帮助处理池中的任务,直到池中没有更多的任务可以执行。
  • tryUnfork() :撤销一个已经 fork() 但尚未开始的任务。

5.6. 任务重用

  • reinitialize() :重置任务的状态,使其可以被重新使用。

6、ForkJoinTask 应用案例

6.1 入门案例

业务数据,表示一年中每一天的销售额(单位:元):

int[] dailySales = {
    10000, 15000, 12000, 18000, 16000, 20000, 21000, 19000, 17000, 15000,
    14000, 13000, 11000, 12000, 13000, 14000, 15000, 16000, 17000, 18000,
    19000, 20000, 21000, 22000, 23000, 24000, 25000, 26000, 27000, 28000,
    29000, 30000, 31000, 32000, 33000, 34000, 35000, 36000, 37000, 38000,
    39000, 40000, 41000, 42000, 43000, 44000, 45000, 46000, 47000, 48000,
    49000, 50000
};

代码实现

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

// 继承 RecursiveTask 并实现 compute 方法,用于计算数组的和
class SumTask extends RecursiveTask<Long> {
    private static final long serialVersionUID = 1L;
    private final int[] array; // 要计算的数组
    private final int start;   // 数组的开始索引
    private final int end;     // 数组的结束索引

    // 构造函数,初始化数组和索引范围
    public SumTask(int[] array, int start, int end) {
        this.array = array;
        this.start = start;
        this.end = end;
    }

    @Override
    // 实现 compute 方法,该方法将被并行执行
    protected Long compute() {
        long sum = 0; // 用于累积求和的结果
        int length = end - start; // 计算当前任务处理的数组长度

        // 如果任务足够小,直接计算
        if (length <= 10000) {
            for (int i = start; i < end; i++) {
                sum += array[i];
            }
        } else {
            // 如果任务较大,分成两个子任务
            int middle = (start + end) / 2;
            // 创建子任务1,处理数组的前半部分
            SumTask subTask1 = new SumTask(array, start, middle);
            // 创建子任务2,处理数组的后半部分
            SumTask subTask2 = new SumTask(array, middle, end);

            // 执行子任务,并行计算
            subTask1.fork();
            subTask2.fork();

            // 等待子任务完成并合并结果
            sum = subTask1.join() + subTask2.join();
        }
        return sum;
    }
}

public class ParallelSumCalculation {
    public static void main(String[] args) {
        // 拟一年的每天销售额数据
        int[] dailySales = {
            // ... (一年的每天销售额数据)
        };

        // 创建 ForkJoinPool 线程池
        ForkJoinPool pool = new ForkJoinPool();
        // 创建 SumTask 任务,处理整个销售额数组
        SumTask task = new SumTask(dailySales, 0, dailySales.length);
        // 执行任务并获取结果
        long totalSales = pool.invoke(task);

        System.out.println("一年的总销售额为: " + totalSales + "元");
    }
}

6.2 订单总销售额计算

一个电子商务平台需要计算所有订单的总销售额。这个平台有大量的订单数据,分布在不同的数据库分片中。为了快速得到总销售额,我们可以利用ForkJoinTask来并行处理每个数据库分片的数据,然后将结果合并。

计算总销售额代码

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
import java.util.Arrays;

// 订单类,包含订单的ID和销售额
class Order {
    long id;
    double sales;

    public Order(long id, double sales) {
        this.id = id;
        this.sales = sales;
    }
}

// 计算订单总销售额的任务
class CalculateSalesTask extends RecursiveTask<Double> {
    private static final int THRESHOLD = 100; // 分解任务的阈值
    private Order[] orders;
    private int start;
    private int end;

    public CalculateSalesTask(Order[] orders, int start, int end) {
        this.orders = orders;
        this.start = start;
        this.end = end;
    }

    @Override
    protected Double compute() {
        double totalSales = 0.0;
        if (end - start < THRESHOLD) {
            // 任务足够小,直接计算
            for (int i = start; i < end; i++) {
                totalSales += orders[i].sales;
            }
        } else {
            // 任务较大,分成两个子任务
            int middle = (start + end) / 2;
            CalculateSalesTask subTask1 = new CalculateSalesTask(orders, start, middle);
            CalculateSalesTask subTask2 = new CalculateSalesTask(orders, middle, end);

            // 执行子任务
            subTask1.fork();
            subTask2.fork();

            // 等待子任务完成并合并结果
            totalSales = subTask1.join() + subTask2.join();
        }
        return totalSales;
    }
}

public class ECommerceSalesCalculation {
    public static void main(String[] args) {
        // 订单数据
        Order[] orders = new Order[1000];
        for (int i = 0; i < orders.length; i++) {
            orders[i] = new Order(i, Math.random() * 10000); // 随机生成销售额
        }

        // 创建 ForkJoinPool 线程池
        ForkJoinPool pool = new ForkJoinPool();
        // 创建任务,处理整个订单数组
        CalculateSalesTask task = new CalculateSalesTask(orders, 0, orders.length);
        // 执行任务并获取结果
        double totalSales = pool.invoke(task);

        System.out.println("总销售额为: " + totalSales);
    }
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,968评论 6 482
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,601评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 153,220评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,416评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,425评论 5 374
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,144评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,432评论 3 401
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,088评论 0 261
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,586评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,028评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,137评论 1 334
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,783评论 4 324
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,343评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,333评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,559评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,595评论 2 355
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,901评论 2 345

推荐阅读更多精彩内容