分支合并框架ForkJoin
原理
Fork:把一个复杂任务进行分拆,大事化小
Join:把分拆任务的结果进行合并
fork/join 框架是 Java 7 中引入的 ,它是一个工具,通过 「 分而治之 」 的方法尝试将所有可用的处理器内核使用起来帮助加速并行处理。
在实际使用过程中,这种 「 分而治之 」的方法意味着框架首先要 fork ,递归地将任务分解为较小的独立子任务,直到它们足够简单以便异步执行。然后,join 部分开始工作,将所有子任务的结果递归地连接成单个结果,或者在返回 void 的任务的情况下,程序只是等待每个子任务执行完毕。
为了提供有效的并行执行,fork/join 框架使用了一个名为 ForkJoinPool 的线程池,用于管理 ForkJoinWorkerThread 类型的工作线程。
ForkJoinPool
ForkJoinPool 是 fork/join 框架的核心,是 ExecutorService 的一个实现,用于管理工作线程,并提供了一些工具来帮助获取有关线程池状态和性能的信息。
工作线程一次只能执行一个任务。
ForkJoinPool 线程池并不会为每个子任务创建一个单独的线程,相反,池中的每个线程都有自己的双端队列用于存储任务 ( double-ended queue )( 或 deque,发音 deck )。
这种架构使用了一种名为工作窃取( work-stealing )算法来平衡线程的工作负载。
fork/join 框架是 Java 7 中引入的 ,它是一个工具,通过 「 分而治之 」 的方法尝试将所有可用的处理器内核使用起来帮助加速并行处理。
ForkJoinTask< V >
ForkJoinTask 是 ForkJoinPool 线程之中执行的任务的基本类型。我们日常使用时,一般不直接使用 ForkJoinTask ,而是扩展它的两个子类中的任意一个
任务不返回结果 ( 返回 void ) 的 RecursiveAction
返回值的任务的 RecursiveTask
这两个类都有一个抽象方法 compute() ,用于定义任务的逻辑。
我们所要做的,就是继承任意一个类,然后实现 compute() 方法。
相关类
ForkJoinPool: 分支合并池 类比=> 线程池
ForkJoinTask: ForkJoinTask 类比=> FutureTask
RecursiveTask: 递归任务:继承后可以实现递归(自己调自己)调用的任务
示例
package cduck.cn;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
class MyTask extends RecursiveTask<Integer>{
private static final Integer ADJUST_VALUE=10;
private int begin;
private int end;
private int result;
public MyTask(int begin, int end) {
this.begin = begin;
this.end = end;
}
@Override
protected Integer compute() {
if (end-begin<=ADJUST_VALUE){
for (int i=begin;i<=end;i++){
result=result+i;
}
}else{
int middle=(end+begin)/2;
MyTask task01=new MyTask(begin,middle);
MyTask task02=new MyTask(middle+1,end);
task01.fork();
task02.fork();
result =task01.join()+task02.join();
}
return result;
}
}
/**
* 分支合并框架
*
* ForkJoinPool
* ForkJoinTask
* RecursiveTask
*/
public class ForkJoinDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
MyTask myTask=new MyTask(0,100);
ForkJoinPool forkJoinPool=new ForkJoinPool();
ForkJoinTask<Integer> submit = forkJoinPool.submit(myTask);
System.out.println(submit.get());
forkJoinPool.shutdown();
}
}