这一块其实是函数式编程的问题. 普通的流和函数式接口都已经熟悉了, 这里是特别学习CompletableFuture这个新的工具类.
CompletableFuture可以被称为组合式异步编程, 这个类继承了Future接口, 并扩展了很多易于函数式编程的异步方式.
这一节的内容来自于另外一本好书 Java 8 In Action, 中文是Java 8 实战, 我用的还是第一版也就是讲Java8的, 链接中已经是第二版, 增加了Java 8 9 10的内容了.
现在如果要最新, 就是要看JDK 11的内容了, 实际上我的电脑已经装了JDK 13了.
- CompletableFuture的基础
- CompletableFuture的新特点
- CompletableFuture的异常处理
CompletableFuture
之前使用Future对象已经可以知道, 搭配Callable使用, 可以进行异步计算, 直到去获取结果的时候, 才变成阻塞.
CompletableFuture位于java.util.concurrent包中, 同时实现了Future<T>
和CompletionStage<T>
接口, 而CompletionStage<T>
接口是一个非常大的工具类,专门就为了扩展各种函数式的流式调用而存在.
由于同时实现两个接口, 很显然CompletableFuture
可以当成Future来使用, 但如果仅仅是这样就太无趣了, CompletableFuture
实际上对Future的控制更加精细, 可以做到Future难以做到的事情.
CompletionStage
的文档在此.CompletableFuture
的文档在此.
CompletableFuture
看一下CompletableFuture的新特点.
可以直接设置完成结果
Future一旦提交之后, 只能通过get()来等待结果. 而CompletableFuture对象, 就算还没有算出来, 还可以直接设置结果, 一旦设置, 就导致从CompletableFuture对象可以获取该结果.
看一个例子, 主线程去给一个CompletableFuture
设置结果, 不设置之前, 一个线程会一直阻塞.
import java.util.Scanner;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class SetResultTest {
private static class Test extends Thread {
CompletableFuture<Integer> future;
public Test(CompletableFuture<Integer> future) {
this.future = future;
}
@Override
public void run() {
boolean got = false;
while (!got) {
try {
int result = future.get(2, TimeUnit.SECONDS);
System.out.println("成功获取结果: " + result);
got = true;
} catch (InterruptedException | ExecutionException | TimeoutException e) {
}
}
}
}
public static void main(String[] args) {
CompletableFuture<Integer> future = new CompletableFuture<>();
new Test(future).start();
Scanner scanner = new Scanner(System.in);
System.out.print("随便输入点什么: ");
int l = scanner.nextLine().length();
System.out.println("将结果设置为输入的长度: "+l);
future.complete(l);
}
}
这个程序创建了一个CompletableFuture
任务, 但其实没有任何计算, 因此线程会一直获取不到结果, 2秒钟一次反复去获取. 如果是Future对象, 就一直卡住了.
但是CompletableFuture
对象即使获取不了结果, 可以强行给其设置一个结果. 在这个程序里, 主线程一直等待输入, 然后将输入的长度设置给CompletableFuture
对象, 然后就会立刻结束副线程的阻塞状态.
简洁启动异步任务
如果使用Future对象的话, 需要使用Callable对象, 然后提交给线程池, 得到返回的Future对象再从其中获取结果.
CompletableFuture
提供了一些工厂方法, 可以快速提交异步任务然后得到CompletableFuture
对象, 再用get()获取结果即可.
工厂方法有如下几种:
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
static CompletableFuture<Void> runAsync(Runnable runnable)
static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
有返回值的就直接返回泛型, 没有返回值的就返回泛型是Void的对象. 可见其中都不用传一个完整的线程对象, 只要传一个函数式接口就成.
写点代码试验一下:
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class AsyncQuest {
private static Random random = new Random();
//模拟耗时运算
private static int cal(int param) {
int sleepTime = param * 1000;
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
return sleepTime;
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
//简便启动异步任务
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> cal(2));
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> cal(4));
CompletableFuture<Integer> future3 = CompletableFuture.supplyAsync(() -> cal(6));
System.out.println("到此不阻塞");
//阻塞等待结果
System.out.println(future1.get());
System.out.println(future2.get());
System.out.println(future3.get());
}
}
重载中的 Executor可以指定线程池, 如果不指定, 就是在系统的默认线程池中运行, 这个默认线程池在运行时来自于 ForkJoinPool.common
. 注意这个线程池的所有线程都是守护线程, 所以不能让主线程提前结束, 否则任务不会执行.
连续异步调用
如果想在Future中实现连续异步调用, 那基本上不可能, 除非将所有异步都放到同一个Future对象中进行计算.
而有了CompletableFuture后, 异步调用也可以进行分段, 类似于流式的调用, 可以认为CompletableFuture就是一个流式的Future.
前边的supplyAsync
和runAsync
相当于一个开始, 一个数据生产, 之后可以再组合 thenApply
, thenAccept
等.
文档中的thenApply签名:thenApply(Function<? super T,? extends U> fn)
, 就是接受上一个返回的结果, 返回最终需要的类型.
文档中的thenAccept签名:thenAccept(Consumer<? super T> action)
, 这就是直接消费结果了, 消费之后返回的就是Void了.
文档中的thenCompose签名:thenCompose(Function<? super T,? extends CompletionStage<U>> fn)
, 这个是通过上一个结果, 返回一个新的CompletionStage<U>对象
文档中的thenCombine签名:<U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn)
, 这个是将另外一个泛型为UCompletableFuture对象与当前类型为T的对象合并, 返回结果类型为V的对象. 这个也非常有用.
来将上一段代码修改一下试试:
public static void main(String[] args) throws ExecutionException, InterruptedException {
//把第一步计算得到的Integer转换成String类型, 注意最前边的结果类型就要同步将泛型修改成String类型
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> cal(2)).thenApply(String::valueOf);
//把第一步计算得到的Integer消费掉, 注意最前边的结果类型就要同步将泛型修改成String类型. 消费掉之后, 泛型更改成Void
CompletableFuture<Void> future2 = CompletableFuture.supplyAsync(() -> cal(4)).thenAccept(System.out::println);
//把第一步计算得到的Integer变成另外一个CompletableFuture对象
CompletableFuture<Double> future3 = CompletableFuture.supplyAsync(() -> cal(6)).thenCompose(x ->
CompletableFuture.supplyAsync(() -> (double) x * 1000));
//合并int泛型和double泛型的对象, 返回一个字符串泛型. 注意后边的BiFunction函数式接口的第一个参数是当前的类型也就是double, 第二个是另外一个CompletableFuture的泛型, 也就是int类型.
CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> 10);
CompletableFuture<String> future02 = CompletableFuture.supplyAsync(() -> 10.00).thenCombine(future01, (x, y) -> String.valueOf(x) + "|" + String.valueOf(y));
System.out.println("到此不阻塞");
//阻塞等待结果
System.out.println(future1.get());
System.out.println(future2.get());
System.out.println(future3.get());
System.out.println(future02.get());
}
CompletableFuture的异常处理
CompletableFuture的异常处理也是采取函数式编程的方式, 提供了一个exceptionally()
的方法, 在其中可以使用异常对象来进行异常处理, 这样就不会将异常处理暴露在外边,非常方便.
类似于:
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> cal(2)).exceptionally(ex -> {
System.out.println(ex.toString());
return -1;
}).thenAccept(System.out::println);
future.get();
}
这里要注意的是, 在处理过程中, 除了捕获异常, 还必须返回一个默认值, 以便后续进行处理, 所以就可以返回一个特殊的值, 用于标记出现了错误, 而且这个也不影响最后从CompletableFuture
中获取特殊的值.
好, 到这里CompletableFuture的基本用法就看完了, Java的并发也暂时告一段落, 接下来准备看一下React, 还有乐理了. 五线谱看不懂是有点捉急的.