ForkJoinTask
是 Java 并发编程中的强大工具,专为大规模并行计算设计。它通过将大型任务分解成小块(fork),并在多个处理器上并行执行这些小块,然后将结果合并(join),实现了高效的并行处理。这种分治策略不仅简化了并行编程,还充分利用了多核处理器的能力,特别适用于计算密集型任务。如果你是并发编程的爱好者或需要处理复杂计算任务的开发者,ForkJoinTask
提供了一种优雅且高效的解决方案。
历史热点文章
- 解锁大语言模型参数:零基础掌握大型语言模型参数奥秘与实践指南
- 高性能连接池之HikariCP框架分析:高性能逐条分解(架构师篇)
- 缓存雪崩/穿透/击穿/失效原理图/14种缓存数据特征+10种数据一致性方案
- Java 8函数式编程全攻略:43种函数式业务代码实战案例解析(收藏版)
- 一个项目代码讲清楚DO/PO/BO/AO/E/DTO/DAO/ POJO/VO
- 17个Mybatis Plugs注解:Mybatis Plugs插件架构设计与注解案例(必须收藏)
0、ForkJoinPool 业务执行流程
图说明:
-
提交任务到 ForkJoinPool:将一个
ForkJoinTask
任务提交到ForkJoinPool
中执行。 -
任务分解(Fork) :任务被分解成多个子任务,这是
ForkJoinTask
设计的核心,允许任务递归地分解成更小的子任务。 - 子任务1、子任务2、更多子任务... :表示分解出来的多个子任务。
- 子任务执行:每个子任务被执行。
- 检查是否还有子任务:在子任务执行完成后,检查是否还有更多的子任务需要分解和执行。
- 任务合并(Join) :所有子任务完成后,父任务会合并所有子任务的结果。
- 返回结果:合并后的结果被返回。
- 结束:流程的终点。
1、ForkJoinTask 设计目的
ForkJoinTask
是 Java 7 引入的一个并行计算框架的核心组件,特别适用于可以被自然地分解为子任务的工作负载,这些子任务可以独立执行并最终合并结果。以下是 ForkJoinTask
的设计因素:
-
分治算法的并行化:
ForkJoinTask
的设计初衷是为了支持分治算法(Divide and Conquer)的并行计算。这种算法将大问题分解成小问题,递归解决小问题,然后将结果合并以解决原始问题。 -
利用多核处理器:随着多核处理器的普及,
ForkJoinTask
提供了一种高效的方式来利用多核处理器的并行能力,通过将任务分解并行执行,从而提高程序的执行效率。 -
工作窃取算法:
ForkJoinPool
实现了工作窃取算法,允许空闲线程从其他线程的任务队列中“窃取”任务来执行,减少线程空闲时间,提高资源利用率。 -
递归分解任务:
ForkJoinTask
允许任务在执行过程中动态地将自身分解成更小的子任务,这些子任务可以并行执行,最终合并结果。 -
线程池友好:
ForkJoinTask
专为ForkJoinPool
设计,与标准的ThreadPoolExecutor
相比,它提供了更细粒度的任务管理,适合于可以并行执行的任务。 -
任务合并:
ForkJoinTask
的子类通常包含一个join
方法,用于获取任务的结果。当一个任务被分解成多个子任务时,可以通过调用父任务的join
方法来等待所有子任务完成并合并结果。 -
灵活性和扩展性:
ForkJoinTask
提供了fork
和join
方法,允许开发者手动控制任务的分解和结果的合并,提供了极高的灵活性和扩展性。 -
内置任务:Java 提供了一些内置的
ForkJoinTask
实现,如RecursiveAction
和RecursiveTask
。RecursiveAction
用于没有返回结果的任务,而RecursiveTask
用于有返回结果的任务。
2、ForkJoinTask 相关组件概述
-
ForkJoinPool:
-
ForkJoinPool
是ForkJoinTask
执行的核心,它实现了ExecutorService
接口,并通过工作窃取算法来平衡线程间的工作负载。每个工作线程都有一个自己的任务队列,当一个线程完成任务后,它会尝试从其他线程的任务队列中“窃取”任务来执行。这种机制有助于平衡负载,使得所有线程都能保持忙碌状态,从而提高 CPU 的利用率。
-
-
ForkJoinTask:
-
ForkJoinTask
是一个抽象类,代表在ForkJoinPool
中执行的轻量级任务。它有两个重要的子类:RecursiveAction
和RecursiveTask
。RecursiveAction
用于没有返回结果的任务,而RecursiveTask
用于有返回结果的任务。这两个子类都需要实现compute()
方法来定义任务的逻辑。
-
-
工作窃取算法:
- 工作窃取算法允许空闲线程从其他线程的任务队列中“窃取”任务来执行。默认情况下,工作线程从自己的任务队列头部获取任务。当队列为空时,线程会从其他忙碌线程的队列尾部“窃取”任务,或者从全局入口队列中获取任务,因为这些地方最有可能存在较大的工作量。
-
任务的分解与合并:
- 在
ForkJoinTask
的实现中,任务通常被递归地分解成更小的子任务(Fork),直到它们足够小,可以直接异步执行。然后,这些子任务的结果被递归地合并(Join)成一个单一的结果。
- 在
-
ForkJoinTask 的方法:
-
fork()
:将任务放入队列并安排异步执行。 -
join()
:等待任务完成并返回结果。 -
invoke()
:结合fork()
和join()
,启动任务,等待其结束并返回结果。
-
-
提交任务到 ForkJoinPool:
- 可以通过
ForkJoinPool
的invoke()
、execute()
或submit()
方法提交ForkJoinTask
任务。invokeAll()
方法可以同时提交多个任务,并返回一个Future
列表。
- 可以通过
3、ForkJoinTask 业务组件设计
-
核心组件:
-
ForkJoinPool
:整个框架的中心节点。 -
工作窃取算法
:包括空闲线程窃取任务和平衡工作负载。 -
双端队列
:每个线程维护的任务队列。 -
任务分解(Fork/Join)
:任务被分解成子任务,并行执行后合并结果。
-
-
任务管理:
-
任务类型
:包括RecursiveAction
和RecursiveTask
。 -
线程管理
:动态管理线程数量。 -
局部性优化
:任务通常由提交它们的线程执行。
-
-
异常与控制:
-
异常处理
:捕获并处理任务执行过程中的异常。 -
任务提交
:包括execute
、submit
、invoke
方法。 -
任务同步
:包括get
、join
、invoke
方法来同步任务执行和获取结果。 -
取消和超时
:支持任务取消和超时控制。
-
-
监控与配置:
-
管理界面
:监控和控制线程池行为。 -
公平性和优先级
:默认不支持,但可以通过自定义实现。 -
任务的不可中断性
:任务默认是不可中断的。
-
4、ForkJoinTask 工作窃取算法流程
步骤:
- 开始:并行计算的起点。
- 线程X - 任务队列X:每个线程都有自己的任务队列。
- 线程X执行任务:线程从自己的任务队列中取出任务并执行。
- 线程X空闲? :检查线程是否空闲(即任务队列为空)。
- 窃取任务:如果线程空闲,它会尝试从其他线程的任务队列中窃取任务。
5、ForkJoinTask 常用方法
5.1. 任务执行和控制
- fork() :异步执行任务,不等待结果。
- join() :等待任务完成并获取结果。
5.2. 结果处理
- getRawResult() :获取任务的原始结果。
- setRawResult(T value) :设置任务的结果。
5.3. 异常和取消
- quietlyJoin(ForkJoinTask<> task) :等待任务完成,不抛出中断异常。
- cancel(boolean mayInterruptIfRunning) :尝试取消任务的执行。
5.4. 任务组合和依赖
- invokeAll(ForkJoinTask<>... tasks) :启动多个任务并等待它们全部完成。
- invoke(ForkJoinTask<> task) :启动一个任务并等待其完成,返回结果。
5.5. 辅助方法
- helpQuiesce() :帮助处理池中的任务,直到池中没有更多的任务可以执行。
-
tryUnfork() :撤销一个已经
fork()
但尚未开始的任务。
5.6. 任务重用
- reinitialize() :重置任务的状态,使其可以被重新使用。
6、ForkJoinTask 应用案例
6.1 入门案例
业务数据,表示一年中每一天的销售额(单位:元):
int[] dailySales = {
10000, 15000, 12000, 18000, 16000, 20000, 21000, 19000, 17000, 15000,
14000, 13000, 11000, 12000, 13000, 14000, 15000, 16000, 17000, 18000,
19000, 20000, 21000, 22000, 23000, 24000, 25000, 26000, 27000, 28000,
29000, 30000, 31000, 32000, 33000, 34000, 35000, 36000, 37000, 38000,
39000, 40000, 41000, 42000, 43000, 44000, 45000, 46000, 47000, 48000,
49000, 50000
};
代码实现
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
// 继承 RecursiveTask 并实现 compute 方法,用于计算数组的和
class SumTask extends RecursiveTask<Long> {
private static final long serialVersionUID = 1L;
private final int[] array; // 要计算的数组
private final int start; // 数组的开始索引
private final int end; // 数组的结束索引
// 构造函数,初始化数组和索引范围
public SumTask(int[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
@Override
// 实现 compute 方法,该方法将被并行执行
protected Long compute() {
long sum = 0; // 用于累积求和的结果
int length = end - start; // 计算当前任务处理的数组长度
// 如果任务足够小,直接计算
if (length <= 10000) {
for (int i = start; i < end; i++) {
sum += array[i];
}
} else {
// 如果任务较大,分成两个子任务
int middle = (start + end) / 2;
// 创建子任务1,处理数组的前半部分
SumTask subTask1 = new SumTask(array, start, middle);
// 创建子任务2,处理数组的后半部分
SumTask subTask2 = new SumTask(array, middle, end);
// 执行子任务,并行计算
subTask1.fork();
subTask2.fork();
// 等待子任务完成并合并结果
sum = subTask1.join() + subTask2.join();
}
return sum;
}
}
public class ParallelSumCalculation {
public static void main(String[] args) {
// 拟一年的每天销售额数据
int[] dailySales = {
// ... (一年的每天销售额数据)
};
// 创建 ForkJoinPool 线程池
ForkJoinPool pool = new ForkJoinPool();
// 创建 SumTask 任务,处理整个销售额数组
SumTask task = new SumTask(dailySales, 0, dailySales.length);
// 执行任务并获取结果
long totalSales = pool.invoke(task);
System.out.println("一年的总销售额为: " + totalSales + "元");
}
}
6.2 订单总销售额计算
一个电子商务平台需要计算所有订单的总销售额。这个平台有大量的订单数据,分布在不同的数据库分片中。为了快速得到总销售额,我们可以利用ForkJoinTask
来并行处理每个数据库分片的数据,然后将结果合并。
计算总销售额代码
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
import java.util.Arrays;
// 订单类,包含订单的ID和销售额
class Order {
long id;
double sales;
public Order(long id, double sales) {
this.id = id;
this.sales = sales;
}
}
// 计算订单总销售额的任务
class CalculateSalesTask extends RecursiveTask<Double> {
private static final int THRESHOLD = 100; // 分解任务的阈值
private Order[] orders;
private int start;
private int end;
public CalculateSalesTask(Order[] orders, int start, int end) {
this.orders = orders;
this.start = start;
this.end = end;
}
@Override
protected Double compute() {
double totalSales = 0.0;
if (end - start < THRESHOLD) {
// 任务足够小,直接计算
for (int i = start; i < end; i++) {
totalSales += orders[i].sales;
}
} else {
// 任务较大,分成两个子任务
int middle = (start + end) / 2;
CalculateSalesTask subTask1 = new CalculateSalesTask(orders, start, middle);
CalculateSalesTask subTask2 = new CalculateSalesTask(orders, middle, end);
// 执行子任务
subTask1.fork();
subTask2.fork();
// 等待子任务完成并合并结果
totalSales = subTask1.join() + subTask2.join();
}
return totalSales;
}
}
public class ECommerceSalesCalculation {
public static void main(String[] args) {
// 订单数据
Order[] orders = new Order[1000];
for (int i = 0; i < orders.length; i++) {
orders[i] = new Order(i, Math.random() * 10000); // 随机生成销售额
}
// 创建 ForkJoinPool 线程池
ForkJoinPool pool = new ForkJoinPool();
// 创建任务,处理整个订单数组
CalculateSalesTask task = new CalculateSalesTask(orders, 0, orders.length);
// 执行任务并获取结果
double totalSales = pool.invoke(task);
System.out.println("总销售额为: " + totalSales);
}
}