ForkJoin 框架 是 jdk7 提供的一个用于执行并行任务的框架
简介
ForkJoin
是一个执行并行任务的框架 , 它是把一个大任务 分割成若干个 小任务 , 再将若干个小任务的计算结果汇总起来, 得到大任务的计算结果 . ForkJoin
采用的是 工作窃取算法 , 工作窃取算法是指某个线程从其他队列中窃取任务进行执行的过程, 其运行流程如图
对于常见的一个大型任务,我们可以把这个大的任务切割成很多个小任务,然后这些小任务会放在不同的队列中,每一个队列都有一个相应的的工作执行线程来执行,当一个线程所需要执行的队列中,任务执行完之后,这个线程就会被闲置,为了提高线程的利用率,这些空闲的线程可以从其他的任务队列中窃取一些任务,来避免使自身资源浪费. 看上面的图示 , 线程1 和线程2 会访问同一个队列, 为了减少窃取线程和被窃取线程之间的竞争 ,通常我们会使用双端队列, 被窃取任务的队列,永远从队列的头部获取任务 , 窃取任务的队列, 永远从双端队列的尾部获取任务.
优缺点
- 优点
充分利用线程进行并行计算 ,并减少了线程间的竞争 - 缺点
在某些情况下, 也会有竞争 , 比如双端队列里只有一个任务, 同时还消耗了很多的系统资源 , 比如创建了多个线程, 多个双端队列 .
ForkJoin 局限性
- 任务只能通过
Fork
和Join
对于ForkJoin
而言 , 当一个任务正在等待它使用join
创建的子任务结束时 ,执行这个任务的工作线程 , 查找其他未被执行的任务 , 并开始执行. 通过这种方式 , 线程重复利用他们的时间,来提高应用程序的性能, 但是也因此,ForkJoin
执行的任务有一些局限性 ,任务只能通过Fork
和Join
这两个同步机制,如同用其他同步机制, 那么工作线程在工作时, 就不能执行其他任务了 . -
ForkJoin
拆分的子任务不应该去执行 IO操作(读写数据文件) - 任务不能抛出检查异常
ForkJoin 核心
其核心是两个类, 前者负责实现,包括上面说到的工作窃取算法 ,它管理工作线程,提供关于任务的状态以及它们的执行信息 , 后者负责ForkJoinTask
主要负责在任务提供 fork
和join
的机制
代码示例
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;
/**
* Created by Charles
* RecursiveTask 字面意思就是递归任务 把大的任务拆分成小任务
*
*/
@Slf4j
public class ForkJoinTaskExample extends RecursiveTask<Integer> {
public static final int threshold = 2;
private int start;
private int end;
public ForkJoinTaskExample(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
int sum = 0;
//如果任务足够小就计算任务
boolean canCompute = (end - start) <= threshold;
if (canCompute) {
for (int i = start; i <= end; i++) {
sum += i;
}
} else {
// 如果任务大于阈值,就分裂成两个子任务计算
int middle = (start + end) / 2;
ForkJoinTaskExample leftTask = new ForkJoinTaskExample(start, middle);
ForkJoinTaskExample rightTask = new ForkJoinTaskExample(middle + 1, end);
// 执行子任务
leftTask.fork();
rightTask.fork();
// 等待任务执行结束合并其结果
int leftResult = leftTask.join();
int rightResult = rightTask.join();
// 合并子任务
sum = leftResult + rightResult;
}
return sum;
}
public static void main(String[] args) {
ForkJoinPool forkjoinPool = new ForkJoinPool();
//生成一个计算任务,计算1+2+3+4
ForkJoinTaskExample task = new ForkJoinTaskExample(1, 100);
//执行一个任务
Future<Integer> result = forkjoinPool.submit(task);
try {
log.info("result:{}", result.get());
} catch (Exception e) {
log.error("exception", e);
}
}
}