并发 - Java并发工具 Fork/Join 模式

并发 - Java并发工具 Fork/Join 模式

Fork/Join是JDK7引入的新多线程工具, 其本质就是把任务分解成能够将结果合并的一系列任务, 然后将这些任务分配个不同的线程进行操作, 然后再把结果合并起来. 所以这个核心就是如何分解任务, 提交给线程执行, 以及如何合并任务. 来学习一下. Fork/Join的核心类 使用例子 Fork/

Fork/Join是JDK7引入的新多线程工具, 其本质就是把任务分解成能够将结果合并的一系列任务, 然后将这些任务分配个不同的线程进行操作, 然后再把结果合并起来. 所以这个核心就是如何分解任务, 提交给线程执行, 以及如何合并任务. 来学习一下.
  1. Fork/Join的核心类
  2. 使用例子

Fork/Join的核心类

回想Linux就知道, fork()用于启动一个新进程. 在java里的Fork/Join则是启动新线程, join是等待, 也就是等待返回结果, 然后组装. 涉及的主要类有:
  1. java.util.concurrent.ForkJoinPool, 这是在F/J中实际负责工作的线程池, 要向其中提交分解后的任务
  2. java.util.concurrent.ForkJoinTask<V>, 带泛型的任务类, 是抽象类, 向FJ线程池中提交的任务必须是这个类型
  3. java.util.concurrent.RecursiveTask<V>, 这个是ForkJoinTask的一个子类, 也是抽象类, 表示带有返回值的任务
  4. java.util.concurrent.RecursiveAction, 也是ForkJoinTask的一个子类, 也是抽象类, 表示不带有返回值的任务, 因此无泛型
ForkJoinPool的核心方法就一个, 就是public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task), 即向线程池中提交一个任务(可以是分解也可是不分解的)

使用例子

使用FJ的核心在于, 先要合理的分解任务, 然后将任务进行提交, 最后等待任务. 一般需要按照如下流程进行操作:
  1. 创建任务, 即对ForkJoinTask的两个子类进行继承, 并重写其中的compute()方法, 如果带有返回值, 则返回值应该与泛型一致.
  2. 在compute()方法中, 如果任务需要分解, 则创建新的任务对象, 同时调用 invokeAll()方法将所有的新任务作为参数传入, 表示启动新任务
  3. 启动新任务之后, 立刻调用新任务的.join()方法, 获取结果, 然后对结果进行必要的计算并且返回.
以二分法计算一个比较大的数组的FJ写法如下:
import java.util.concurrent.RecursiveTask;
//通过继承创建一个带返回值的任务类
public class ArraySumTask extends RecursiveTask<Long> {

    public long[] array;

    public int startIndex;

    public int endIndex;

    public ArraySumTask(long[] array, int startIndex, int endIndex) {
        this.array = array;
        this.startIndex = startIndex;
        this.endIndex = endIndex;
    }

    //核心, 必须覆盖compute()方法
    @Override
    protected Long compute() {
        //如果要计算的数组范围小于等于100个, 就直接计算
        if (endIndex - startIndex <= 99) {

            System.out.println("索引差100, start");

            long sum = 0;

            for (int i = startIndex; i <= endIndex; i++) {
                sum = sum + array[i];
            }
            return sum;
            //如果要计算的数字范围大于100个, 就从中间拆分
        } else {
            //像二分法一样拆出中间的索引, 然后用这个索引将数组分为两部分
            int middleIndex = (startIndex + endIndex) / 2;
            System.out.println("启动startIndex=" + startIndex + " endIndex=" + (middleIndex - 1) + "的新任务");
            ArraySumTask task1 = new ArraySumTask(array, startIndex, middleIndex - 1);
            System.out.println("启动startIndex=" + middleIndex + " endIndex=" + endIndex + "的新任务");
            ArraySumTask task2 = new ArraySumTask(array, middleIndex, endIndex);

            //启动分出来的两个任务
            invokeAll(task1, task2);

            //获取任务的返回值, 然后相加得到全部的结果
            Long result1 = task1.join();
            Long result2 = task2.join();

            return result1 + result2;
        }
    }
}
有了任务类后, 创建线程池然后提交任务:
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;

public class ForkAndJoinModeTest {

    public static Random random = new Random();

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        // 创建2000个随机数组成的数组:
        long[] array = new long[2000];
        long expectedSum = 0;
        for (int i = 0; i < array.length; i++) {
            array[i] = random.nextInt(100);
            expectedSum += array[i];
        }
        System.out.println("Expected sum: " + expectedSum);

        //创建池子
        ForkJoinPool pool = new ForkJoinPool();
        //创建最初的任务
        ArraySumTask task = new ArraySumTask(array, 0, 1999);
        //提交任务
        Long result = pool.submit(task).get();

        System.out.println(result);
    }
}
可以看到这个例子本质上有点像递归, 当然也可以按照线性来区分, 比如固定的步长, 都可以. 使用FJ的时候一定要注意线程数量, 不要分的过分多, 否则性能会严重下降. 这里注意的是《Java高并发程序设计》中使用了subTask.fork(), 这种方法是将任务交给另外一个线程, 但自己不干活. 所以这里的例子实际上来自于廖雪峰的网站上的例子, 使用invoke来提交, 这样会自动将n-1个线程交给其他线程, 留一个自己干活, 就充分利用了FJ模式. 关于这个问题的讲解可以看这里.
LICENSED UNDER CC BY-NC-SA 4.0
Comment