并发 - Java CompletableFuture

并发 - Java CompletableFuture

来看看CompletableFuture类

这一块其实是函数式编程的问题. 普通的流和函数式接口都已经熟悉了, 这里是特别学习CompletableFuture这个新的工具类. CompletableFuture可以被称为组合式异步编程, 这个类继承了Future接口, 并扩展了很多易于函数式编程的异步方式. 这一节的内容来自于另外一本好书 Java 8 In Action, 中文是Java 8 实战, 我用的还是第一版也就是讲Java8的, 链接中已经是第二版, 增加了Java 8 9 10的内容了. 现在如果要最新, 就是要看JDK 11的内容了, 实际上我的电脑已经装了JDK 13了.
  1. CompletableFuture的基础
  2. CompletableFuture的新特点
  3. CompletableFuture的异常处理

CompletableFuture

之前使用Future对象已经可以知道, 搭配Callable使用, 可以进行异步计算, 直到去获取结果的时候, 才变成阻塞. CompletableFuture位于java.util.concurrent包中, 同时实现了Future<T>CompletionStage<T>接口, 而CompletionStage<T>接口是一个非常大的工具类,专门就为了扩展各种函数式的流式调用而存在. 由于同时实现两个接口, 很显然CompletableFuture可以当成Future来使用, 但如果仅仅是这样就太无趣了, CompletableFuture实际上对Future的控制更加精细, 可以做到Future难以做到的事情. CompletionStage文档在此.CompletableFuture文档在此.

CompletableFuture

看一下CompletableFuture的新特点.

可以直接设置完成结果

Future一旦提交之后, 只能通过get()来等待结果. 而CompletableFuture对象, 就算还没有算出来, 还可以直接设置结果, 一旦设置, 就导致从CompletableFuture对象可以获取该结果. 看一个例子, 主线程去给一个CompletableFuture设置结果, 不设置之前, 一个线程会一直阻塞.
import java.util.Scanner;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class SetResultTest {

    private static class Test extends Thread {

        CompletableFuture<Integer> future;

        public Test(CompletableFuture<Integer> future) {
            this.future = future;
        }

        @Override
        public void run() {
            boolean got = false;

            while (!got) {
                try {
                    int result = future.get(2, TimeUnit.SECONDS);
                    System.out.println("成功获取结果: " + result);
                    got = true;
                } catch (InterruptedException | ExecutionException | TimeoutException e) {
                }
            }
        }
    }

    public static void main(String[] args) {
        CompletableFuture<Integer> future = new CompletableFuture<>();
        new Test(future).start();
        Scanner scanner = new Scanner(System.in);
        System.out.print("随便输入点什么: ");
        int l = scanner.nextLine().length();
        System.out.println("将结果设置为输入的长度: "+l);
        future.complete(l);
    }
}
这个程序创建了一个CompletableFuture任务, 但其实没有任何计算, 因此线程会一直获取不到结果, 2秒钟一次反复去获取. 如果是Future对象, 就一直卡住了. 但是CompletableFuture对象即使获取不了结果, 可以强行给其设置一个结果. 在这个程序里, 主线程一直等待输入, 然后将输入的长度设置给CompletableFuture对象, 然后就会立刻结束副线程的阻塞状态.

简洁启动异步任务

如果使用Future对象的话, 需要使用Callable对象, 然后提交给线程池, 得到返回的Future对象再从其中获取结果. CompletableFuture提供了一些工厂方法, 可以快速提交异步任务然后得到CompletableFuture对象, 再用get()获取结果即可. 工厂方法有如下几种:
  1. static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
  2. static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
  3. static CompletableFuture<Void> runAsync(Runnable runnable)
  4. static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
有返回值的就直接返回泛型, 没有返回值的就返回泛型是Void的对象. 可见其中都不用传一个完整的线程对象, 只要传一个函数式接口就成. 写点代码试验一下:
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class AsyncQuest {

    private static Random random = new Random();

    //模拟耗时运算
    private static int cal(int param) {
        int sleepTime = param * 1000;
        try {
            Thread.sleep(sleepTime);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return sleepTime;

    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //简便启动异步任务
        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> cal(2));
        CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> cal(4));
        CompletableFuture<Integer> future3 = CompletableFuture.supplyAsync(() -> cal(6));

        System.out.println("到此不阻塞");

        //阻塞等待结果
        System.out.println(future1.get());
        System.out.println(future2.get());
        System.out.println(future3.get());

    }
}
重载中的 Executor可以指定线程池, 如果不指定, 就是在系统的默认线程池中运行, 这个默认线程池在运行时来自于 ForkJoinPool.common. 注意这个线程池的所有线程都是守护线程, 所以不能让主线程提前结束, 否则任务不会执行.

连续异步调用

如果想在Future中实现连续异步调用, 那基本上不可能, 除非将所有异步都放到同一个Future对象中进行计算. 而有了CompletableFuture后, 异步调用也可以进行分段, 类似于流式的调用, 可以认为CompletableFuture就是一个流式的Future. 前边的supplyAsyncrunAsync相当于一个开始, 一个数据生产, 之后可以再组合 thenApply, thenAccept等. 文档中的thenApply签名:thenApply(Function<? super T,? extends U> fn), 就是接受上一个返回的结果, 返回最终需要的类型. 文档中的thenAccept签名:thenAccept(Consumer<? super T> action), 这就是直接消费结果了, 消费之后返回的就是Void了. 文档中的thenCompose签名:thenCompose(Function<? super T,? extends CompletionStage<U>> fn), 这个是通过上一个结果, 返回一个新的CompletionStage<U>对象 文档中的thenCombine签名:<U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn), 这个是将另外一个泛型为UCompletableFuture对象与当前类型为T的对象合并, 返回结果类型为V的对象. 这个也非常有用. 来将上一段代码修改一下试试:
public static void main(String[] args) throws ExecutionException, InterruptedException {

    //把第一步计算得到的Integer转换成String类型, 注意最前边的结果类型就要同步将泛型修改成String类型
    CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> cal(2)).thenApply(String::valueOf);

    //把第一步计算得到的Integer消费掉, 注意最前边的结果类型就要同步将泛型修改成String类型. 消费掉之后, 泛型更改成Void
    CompletableFuture<Void> future2 = CompletableFuture.supplyAsync(() -> cal(4)).thenAccept(System.out::println);

    //把第一步计算得到的Integer变成另外一个CompletableFuture对象
    CompletableFuture<Double> future3 = CompletableFuture.supplyAsync(() -> cal(6)).thenCompose(x ->
            CompletableFuture.supplyAsync(() -> (double) x * 1000));

    //合并int泛型和double泛型的对象, 返回一个字符串泛型. 注意后边的BiFunction函数式接口的第一个参数是当前的类型也就是double, 第二个是另外一个CompletableFuture的泛型, 也就是int类型.
    CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> 10);
    CompletableFuture<String> future02 = CompletableFuture.supplyAsync(() -> 10.00).thenCombine(future01, (x, y) -> String.valueOf(x) + "|" + String.valueOf(y));

    System.out.println("到此不阻塞");

    //阻塞等待结果
    System.out.println(future1.get());
    System.out.println(future2.get());
    System.out.println(future3.get());
    System.out.println(future02.get());
}

CompletableFuture的异常处理

CompletableFuture的异常处理也是采取函数式编程的方式, 提供了一个exceptionally()的方法, 在其中可以使用异常对象来进行异常处理, 这样就不会将异常处理暴露在外边,非常方便. 类似于:
public static void main(String[] args) throws ExecutionException, InterruptedException {

    CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> cal(2)).exceptionally(ex -> {
        System.out.println(ex.toString());
        return -1;
    }).thenAccept(System.out::println);
    
    future.get();

}
这里要注意的是, 在处理过程中, 除了捕获异常, 还必须返回一个默认值, 以便后续进行处理, 所以就可以返回一个特殊的值, 用于标记出现了错误, 而且这个也不影响最后从CompletableFuture中获取特殊的值. 好, 到这里CompletableFuture的基本用法就看完了, Java的并发也暂时告一段落, 接下来准备看一下React, 还有乐理了. 五线谱看不懂是有点捉急的.
LICENSED UNDER CC BY-NC-SA 4.0
Comment