声明:原创文章,转载请注明出处。http://www.jianshu.com/u/e02df63eaa87
一、介绍
1、Fork/Join框架是Java7提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。
Fork:把一个大任务切分为若干子任务并行的执行。
Join:合并这些子任务的执行结果,最后得到这个大任务的结果。
上图中,上部的Task依赖于下部的Task执行,只有当各个子任务执行完成后,才能得到Task0的返回结果。
2、原理
分割任务。需要有一个fork类来将主任务分割成子任务,存在嵌套分隔的情况。
执行任务并合并结果。分割的子任务分别放在各自的双端队列中,之后启动几个线程分别从其所有的双端队列中获取任务执行。子任务执行完的结果都统一放在一个结果队列中,启动一个线程从此队列中获取并合并数据。
Fork/Join使用两个类来完成以上两件事情:
- ForkJoinTask:使用ForkJoin框架,必须首先创建一个ForkJoin任务。它提供在任务中执行fork()和join()操作,通常情况下我们不需要直接继承ForkJoinTask类,而只需要继承它的子类,Fork/Join框架提供了以下两个子类:
- RecursiveAction:用于没有返回结果的任务。
- RecursiveTask :用于有返回结果的任务。
- ForkJoinPool :ForkJoinTask需要通过ForkJoinPool来执行,任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务。
二、例子
通过一个例子,简单介绍fork/join框架的使用,计算任务是:计算1 + 2 + ... + 100
1、首先是继承类,由于我们需要子任务返回结果,因此选择带有返回结果的RecursiveTask
。
2、继承RecursiveTask
类后,需要实现其父类的抽象方法compute()
。
3、任务分隔, 设定阈值为10,如果超过该阈值,则进行分隔。
public class CountTask extends RecursiveTask<Integer> {
private static final int THRESHOLD = 10; // 阈值
private int start;
private int end;
public CountTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
boolean isNeedSplit = (end - start) > THRESHOLD;
int sum = 0;
if (!isNeedSplit) {
for (int i = start; i <= end; ++i) {
sum += i;
}
return sum;
}
// 分隔任务
int mid = (start + end) >> 1;
CountTask lowTask = new CountTask(start, mid);
CountTask highTask = new CountTask(mid + 1, end);
// 子任务执行
lowTask.fork();
highTask.fork();
// 获取子任务计算结果
return lowTask.join() + highTask.join();
}
public static void main(String[] args) throws Exception {
ForkJoinPool forkJoinPool = new ForkJoinPool();
// 创建新任务
CountTask task = new CountTask(1, 100);
// 提交任务
Future<Integer> ret = forkJoinPool.submit(task);
// 输出结果
System.out.println(ret.get());
}
}