分支合并框架


分支合并框架ForkJoin

原理

Fork:把一个复杂任务进行分拆,大事化小
Join:把分拆任务的结果进行合并

fork/join 框架是 Java 7 中引入的 ,它是一个工具,通过 「 分而治之 」 的方法尝试将所有可用的处理器内核使用起来帮助加速并行处理。

在实际使用过程中,这种 「 分而治之 」的方法意味着框架首先要 fork ,递归地将任务分解为较小的独立子任务,直到它们足够简单以便异步执行。然后,join 部分开始工作,将所有子任务的结果递归地连接成单个结果,或者在返回 void 的任务的情况下,程序只是等待每个子任务执行完毕。

为了提供有效的并行执行,fork/join 框架使用了一个名为 ForkJoinPool 的线程池,用于管理 ForkJoinWorkerThread 类型的工作线程。

ForkJoinPool

ForkJoinPool 是 fork/join 框架的核心,是 ExecutorService 的一个实现,用于管理工作线程,并提供了一些工具来帮助获取有关线程池状态和性能的信息。

工作线程一次只能执行一个任务。

ForkJoinPool 线程池并不会为每个子任务创建一个单独的线程,相反,池中的每个线程都有自己的双端队列用于存储任务 ( double-ended queue )( 或 deque,发音 deck )。

这种架构使用了一种名为工作窃取( work-stealing )算法来平衡线程的工作负载。

fork/join 框架是 Java 7 中引入的 ,它是一个工具,通过 「 分而治之 」 的方法尝试将所有可用的处理器内核使用起来帮助加速并行处理。

ForkJoinTask< V >

ForkJoinTask 是 ForkJoinPool 线程之中执行的任务的基本类型。我们日常使用时,一般不直接使用 ForkJoinTask ,而是扩展它的两个子类中的任意一个

任务不返回结果 ( 返回 void ) 的 RecursiveAction
返回值的任务的 RecursiveTask
这两个类都有一个抽象方法 compute() ,用于定义任务的逻辑。

我们所要做的,就是继承任意一个类,然后实现 compute() 方法。

相关类

ForkJoinPool: 分支合并池 类比=> 线程池

ForkJoinTask: ForkJoinTask 类比=> FutureTask

RecursiveTask: 递归任务:继承后可以实现递归(自己调自己)调用的任务

示例

package cduck.cn;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;

class MyTask extends RecursiveTask<Integer>{
    private  static final Integer ADJUST_VALUE=10;

    private int begin;
    private int end;
    private int result;

    public MyTask(int begin, int end) {
        this.begin = begin;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        if (end-begin<=ADJUST_VALUE){
            for (int i=begin;i<=end;i++){
                result=result+i;
            }

        }else{
            int middle=(end+begin)/2;
            MyTask task01=new MyTask(begin,middle);
            MyTask task02=new MyTask(middle+1,end);
            task01.fork();
            task02.fork();
            result =task01.join()+task02.join();
         }


        return result;
    }
}

/**
 * 分支合并框架
 *
 * ForkJoinPool
 * ForkJoinTask
 * RecursiveTask
 */
public class ForkJoinDemo {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        MyTask myTask=new MyTask(0,100);

        ForkJoinPool forkJoinPool=new ForkJoinPool();
        ForkJoinTask<Integer> submit = forkJoinPool.submit(myTask);

        System.out.println(submit.get());

        forkJoinPool.shutdown();
    }
}

文章作者: fFee-ops
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 fFee-ops !
评论
  目录