Fork/Join是JDK7引入的新多线程工具, 其本质就是把任务分解成能够将结果合并的一系列任务, 然后将这些任务分配个不同的线程进行操作, 然后再把结果合并起来.
所以这个核心就是如何分解任务, 提交给线程执行, 以及如何合并任务. 来学习一下.
- Fork/Join的核心类
- 使用例子
Fork/Join的核心类
回想Linux就知道, fork()用于启动一个新进程. 在java里的Fork/Join则是启动新线程, join是等待, 也就是等待返回结果, 然后组装.
涉及的主要类有:
java.util.concurrent.ForkJoinPool
, 这是在F/J中实际负责工作的线程池, 要向其中提交分解后的任务
java.util.concurrent.ForkJoinTask<V>
, 带泛型的任务类, 是抽象类, 向FJ线程池中提交的任务必须是这个类型
java.util.concurrent.RecursiveTask<V>
, 这个是ForkJoinTask的一个子类, 也是抽象类, 表示带有返回值的任务
java.util.concurrent.RecursiveAction
, 也是ForkJoinTask的一个子类, 也是抽象类, 表示不带有返回值的任务, 因此无泛型
ForkJoinPool的核心方法就一个, 就是public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task)
, 即向线程池中提交一个任务(可以是分解也可是不分解的)
使用例子
使用FJ的核心在于, 先要合理的分解任务, 然后将任务进行提交, 最后等待任务.
一般需要按照如下流程进行操作:
- 创建任务, 即对ForkJoinTask的两个子类进行继承, 并重写其中的compute()方法, 如果带有返回值, 则返回值应该与泛型一致.
- 在compute()方法中, 如果任务需要分解, 则创建新的任务对象, 同时调用 invokeAll()方法将所有的新任务作为参数传入, 表示启动新任务
- 启动新任务之后, 立刻调用新任务的.join()方法, 获取结果, 然后对结果进行必要的计算并且返回.
以二分法计算一个比较大的数组的FJ写法如下:
import java.util.concurrent.RecursiveTask;
//通过继承创建一个带返回值的任务类
public class ArraySumTask extends RecursiveTask<Long> {
public long[] array;
public int startIndex;
public int endIndex;
public ArraySumTask(long[] array, int startIndex, int endIndex) {
this.array = array;
this.startIndex = startIndex;
this.endIndex = endIndex;
}
//核心, 必须覆盖compute()方法
@Override
protected Long compute() {
//如果要计算的数组范围小于等于100个, 就直接计算
if (endIndex - startIndex <= 99) {
System.out.println("索引差100, start");
long sum = 0;
for (int i = startIndex; i <= endIndex; i++) {
sum = sum + array[i];
}
return sum;
//如果要计算的数字范围大于100个, 就从中间拆分
} else {
//像二分法一样拆出中间的索引, 然后用这个索引将数组分为两部分
int middleIndex = (startIndex + endIndex) / 2;
System.out.println("启动startIndex=" + startIndex + " endIndex=" + (middleIndex - 1) + "的新任务");
ArraySumTask task1 = new ArraySumTask(array, startIndex, middleIndex - 1);
System.out.println("启动startIndex=" + middleIndex + " endIndex=" + endIndex + "的新任务");
ArraySumTask task2 = new ArraySumTask(array, middleIndex, endIndex);
//启动分出来的两个任务
invokeAll(task1, task2);
//获取任务的返回值, 然后相加得到全部的结果
Long result1 = task1.join();
Long result2 = task2.join();
return result1 + result2;
}
}
}
有了任务类后, 创建线程池然后提交任务:
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
public class ForkAndJoinModeTest {
public static Random random = new Random();
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 创建2000个随机数组成的数组:
long[] array = new long[2000];
long expectedSum = 0;
for (int i = 0; i < array.length; i++) {
array[i] = random.nextInt(100);
expectedSum += array[i];
}
System.out.println("Expected sum: " + expectedSum);
//创建池子
ForkJoinPool pool = new ForkJoinPool();
//创建最初的任务
ArraySumTask task = new ArraySumTask(array, 0, 1999);
//提交任务
Long result = pool.submit(task).get();
System.out.println(result);
}
}
可以看到这个例子本质上有点像递归, 当然也可以按照线性来区分, 比如固定的步长, 都可以.
使用FJ的时候一定要注意线程数量, 不要分的过分多, 否则性能会严重下降.
这里注意的是《Java高并发程序设计》中使用了subTask.fork(), 这种方法是将任务交给另外一个线程, 但自己不干活.
所以这里的例子实际上来自于廖雪峰的网站上的例子, 使用invoke来提交, 这样会自动将n-1个线程交给其他线程, 留一个自己干活, 就充分利用了FJ模式. 关于这个问题的讲解可以看这里.