Java Reinforcement 12 收集器与并行流

Java Reinforcement 12 收集器与并行流

在流中一开始没有讲收集器, 而是学习了规约的一般操作. 这是因为收集器本质上就是一种归约, 但是更加抽象. 收集器 预定义收集器 广义的归约汇总 并行流与ForkJoin框架 收集器 与流搭配使用的收集器是新的java.util.stream.Collector接口, 这个接口本质上也是规约操作,

在流中一开始没有讲收集器, 而是学习了规约的一般操作. 这是因为收集器本质上就是一种归约, 但是更加抽象.
  1. 收集器
  2. 预定义收集器
  3. 广义的归约汇总
  4. 并行流与ForkJoin框架

收集器

与流搭配使用的收集器是新的java.util.stream.Collector接口, 这个接口本质上也是规约操作, 汇总成一个结果. 注意汇总成一个集合依然是汇总成一个结果. 传递给流的.collect()方法的参数, 是一个java.util.stream.collector接口实现, 也就是给Stream中元素如何进行汇总的方法. 最常用的是Collector.toList(), 用于将元素收集成一个List集合.

预定义收集器

预定义收集器就一些Collector接口中定义好的静态方法, 返回一个函数对象以当成collect()方法的参数. 有以下一些常用的:
  1. Collectors.counting(),无参数,返回流元素的个数, 还可以直接对stream()对象使用.count(), 这个方法本身就是终端操作
  2. Collectors.maxBy()和,Collectors.minBy(), 参数是是一个Comparator.
  3. Collectors.summingInt(), 参数是一个把流元素映射为int的函数, 用于汇总. 对于Long和Double类型也有类型特化的Collectors.summingLong和Collectors.summingDouble
  4. Collectors.averagingInt(),参数也是一个把流元素映射为int的函数, 求平均值. 同样有类型特化版本.
  5. Collectors.summarizingInt(),参数也是一个把流元素映射为int的函数, 这个方法会一次性求出最大, 最小, 平均值, 求和, 个数, 结果是一个IntSummaryStatistics对象, 可以取出上边的结果. 同样也有类型特化版本.
  6. Collectors.joining(), 无参数的版本, 是将流元素中每个对象的toString()的结果连接起来, 有一个重载的版本可以传入字符串参数当成分隔符.
  7. Collectors.joining(), 无参数的版本, 是将流元素中每个对象的toString()的结果连接起来, 有一个重载的版本可以传入字符串参数当成分隔符.
  8. Collectors.groupingBy, 下边详细讲述
  9. Collectors.partitioningBy, 本质上就是按照布尔值的分组, 下边详细讲述
  10. Collectors.toSet(), 收集成集合, 自动去重
  11. Collectors.toCollection(), 收集成指定的集合, 使用在collect()方法内部的时候, 第二个参数要传一个集合类型的构造器, 比如menuStream.collect(toCollection(), ArrayList::new);
除了上边这些之外, 还有一个分组静态方法Collectors.groupingBy 所谓分组, 就是将流按照分组的关键字, 收集成为一个Map对象. 内置的Collectors.groupingBy()接受一个用于分组的函数, 用来按分组的函数返回的类型作为键, 然后将对应的元素收集到这个键对应的值(一个列表)中, 看定义:
public static <T, K> Collector<T, ?, Map<K, List<T>>> groupingBy(Function<? super T, ? extends K> classifier) {
    return groupingBy(classifier, toList());
}
通过定义可以看出, Map<K, List<T>>就是要返回的类型, K类型是Function<? super T, ? extends K>的返回类型, T是流元素的类型. 分组还可以多级分组, groupingBy方法有一个重载, 第一个参数依然是Function<? super T, ? extends K>, 第二个参数又是一个collector类型的函数. 第二个收集器执行得到的结果, 作为第grouppingBy的值. 分区是分组的特殊情况, 就是将流元素映射成布尔值, 根据布尔值来分组, 这个方法接收的是一个Predicate谓词函数, 也已经很熟悉了. 同样也有重载版本, 可以传递第二个收集器.

广义的归约汇总

实际上所有的收集器, 都是一个reducing工厂方法定义的归约过程的常用情况抽取出来编写而成的代码, 只是为了方便使用. 本质上都是使用reducing工厂方法. reducing静态方法需要三个参数, 第一个是归约操作的起始值, 第二个是处理每一个元素的函数, 也就是普通传递给各个静态方法的函数, 第三个是一个BinaryOperator<T, T>(BiFunction<T,T,T>), 这是表示如何累计经过第二个参数转换之后的流中的每两个元素, 也就是处理叠加过程. reducing静态方法有一个单参数版本, 只需要传递三参数版本的第三个参数, 把流的第一个元素当成起点, 转换函数是恒等函数, 这样就等于从流的第一个元素开始不断按照行为参数处理每两个元素. 由于没有初始值, 所以单参数版本返回的是一个Optional对象. 所以即使不写代码, 也可以知道比如toList()的reducing版本大概是什么样子, 第一个参数是新建的List对象, 至于自定义收集器的方法, 就以后有余力再看吧. 学到现在给自己又定下了两个原则, 1 是现在遇到集合就尽量使用stream操作, 2 是文件IO要逐步用流处理.

并行流

流中间串一个.parallel()方法就可以将流改成并行的. 听上去很美好, 但不是这么简单. 调用.sequential()可以变成顺序流 ,但这只是做一个标记, 哪个方法最后调用, 就成为对应的设置. 此外, 要顺序流执行的代码, 还不能够访问共享数据, 否则就会出问题, 这与普通的多线程问题是一样的. 最方便并行的, 就是可以将问题分治, 然后再合并起来的问题, 而不是反复在同一个变量上操作的问题. 举一个并行流会出问题的例子:
public class TestTime {

    public static long measureSumPerf(Function<Long, Long> adder, long n) {
        long fastest = Long.MAX_VALUE;
        for (int i = 0; i < 10; i++) {
            long start = System.nanoTime();
            long sum = adder.apply(n);
            long duration = (System.nanoTime() - start) / 1_000_000;
            System.out.println("Result: " + sum);
            if (duration < fastest) fastest = duration;
        }
        return fastest;
    }

    public static long noSideEffectSum(long n) {
        Accumulator accumulator = new Accumulator();
        LongStream.rangeClosed(1, n).forEach(accumulator::add);
        return accumulator.total;
    }

    public static long sideEffectParallelSum(long n) {
        Accumulator accumulator = new Accumulator();
        LongStream.rangeClosed(1, n).parallel().forEach(accumulator::add);
        return accumulator.total;
    }

    public static void main(String[] args) {
        //多线程竞争的情况
        System.out.println("SideEffect parallel sum done in: " + measureSumPerf(TestTime::sideEffectParallelSum, 10_000_000L) + "msecs");

        //单线程处理,不会有竞争的情况
        System.out.println("SideEffect parallel sum done in: " + measureSumPerf(TestTime::noSideEffectSum, 10_000_000L) + "msecs");
    }

}
sideEffectParallelSum方法将noSideEffectSum中的流运算改成了并行计算, 结果因为累加器在反复的读写同一个实例变量造成多线程竞争, 这说明流并不带有同步机制, 如果强制改成多线程, 就会得到错误的结果. 一般要注意集合操作的话, ArrayList, Intstream.range等方法很好, 而Stream.iterate和链表数据结构都不太适合使用并行, 因为始终会操作同一个对象. 提到并行的话, 就看一看Java 7 中新增的Fork/Join框架, 得益于看过CSAPP, fork在系统编程里用于开启一个新的进程, 然后可以join等待其他进程结束再来执行. 这个模式仿照了系统编程的模式, 只不过fork是开启一个新的线程, 而join可以等待线程执行结束. 在Thinking in Java 里提到了内部的线程池一般都使用ExecutorService的静态方法来获取, 然后给其中传入Runnable对象. 现在有了fork/join之后, 编写代码的方式有所改变, 可以在一个线程里开启任务, 然后指定一个任务fork到一个新的线程去. 这个框架的核心并不是先获取线程池, 而是获取一个ForkJoinPool, 这其实是一个ExecutorService类型. 使用方法是编写任务, 然后在ForkJoinPool里执行任务:
import java.util.concurrent.RecursiveTask;

//编写任务, 继承java.util.concurrent.RecursiveTask<Long>, 这是ForkJoin框架的任务, 是一个Future<V>接口的实现类
//这个接口只有一个抽象方法, 叫做compute(), 返回泛型类型

public class ForkJoinSumCalculator extends RecursiveTask<Long> {

    //要操作一个数组, 由于不能多线程竞争, 因此引入数组的开始与结束的变量, 以合理拆分数组给各个线程
    private final long[] numbers;
    private final int start;
    private final int end;

    //不拆分的最小值
    public static final long THRESHOLD = 10_000;

    //私有构造器, 由于仅仅是在内部创建线程, 因此不暴露给外界
    private ForkJoinSumCalculator(long[] numbers, int start, int end) {
        this.numbers = numbers;
        this.start = start;
        this.end = end;
    }

    //工具方法, 用于汇总numbers数组指定的区域
    private long computeSequentially() {
        long sum = 0;
        for (int i = start; i < end; i++) {
            sum += numbers[i];

        }
        return sum;
    }

    //核心的compute方法, 将当前的任务分解为两个任务, 分解方法是分解索引, 然后将分解后的索引和同一个数组对象分成两个任务
    @Override
    protected Long compute() {
        int length = end - start;
        if (length <= THRESHOLD) {
            return computeSequentially();
        }
        //左半边任务
        ForkJoinSumCalculator leftTask =
                new ForkJoinSumCalculator(numbers, start, start + length / 2);
        //左半边任务交给新的线程
        leftTask.fork();
        //右半边任务, 直接在当前线程内启动
        ForkJoinSumCalculator rightTask =
                new ForkJoinSumCalculator(numbers, start + length / 2, end);
        Long rightResult = rightTask.compute();
        //等待左半边任务的结束, 然后合并结果
        Long leftResult = leftTask.join();
        return leftResult + rightResult;
    }

    //静态方法用于启动任务
    public static long forkJoinSum(long n) {
        long[] numbers = LongStream.rangeClosed(1, n).toArray();
        //创建一个最初的任务, 然后交给ForkJoinPool()来执行
        ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers, 0, numbers.length);
        return new ForkJoinPool().invoke(task);
    }


    public static void main(String[] args) {
        long result = forkJoinSum(999);
        System.out.println(result);
    }
}
可见Fork/join并不是像系统编程的fork/join, 会在执行到的代码处就地分支, .fork()可以理解为将一个compute()函数对象传递给新的线程, 这样在线程中共享的数据, 可以通过类的构造器来进行传递, 这比使用Runnable和Callable要方便很多. 在使用的时候, 先获取ForkJoinPool, 然后调用invoke(任务对象)来启动任务. 至于自定义的Spliterator知道这个东西就行了, 以后在深入吧, 还有自定义的Collector也是一样. 目前能懂原理就好.
LICENSED UNDER CC BY-NC-SA 4.0
Comment