CompletableFuture
旺财gg 人气:01.概述
1.0 创建 CompletableFuture 的对象的工厂方法
static CompletableFuture<Void> runAsync(Runnable runnable) static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
runAsync 的参数是 Runnable, 返回值是 CompletableFuture, 意思是工厂方法创建的 CompletableFuture 对象封装的任务没有返回值。
例如:
CompletableFuture<Void> run = CompletableFuture.runAsync(()-> System.out.println("hello"));
而 suppyAsync 参数类型是 Supplier,返回值是CompletableFuture<U>
, 意思是任务不接受参数,但是会返回结果。
CompletableFuture<String> supply = CompletableFuture.supplyAsync(() -> { System.out.println("Hello"); return "hello world!""; }); System.out.println(supply.get()); //hello world!"
所以如果任务需要返回结果,那么应该选择 suppyAsync;否则可以选择 runAsync。
1.1 non-async 和 async 区别
public CompletionStage<Void> thenRun(Runnable action); public CompletionStage<Void> thenRunAsync(Runnable action); public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);
CompletableFuture 中有众多类似这样的方法,那么 non-async 和 async 和版本的方法究竟有什么区别呢? 参考官方文档的描述:
Actions supplied for dependent completions of non-async methods may be performed by the thread that completes the current CompletableFuture, or by any other caller of a completion method.
翻译:传递给 non-async 方法作为参数的函数(action)可能会在完成当前的 CompletableFuture 的线程中执行,也可能会在方法的调用者线程中执行。
All async methods without an explicit Executor argument are performed using the
ForkJoinPool.commonPool()
(unless it does not support a parallelism level of at least two, in which case, a new Thread is created to run each task). To simplify monitoring, debugging, and tracking, all generated asynchronous tasks are instances of the marker interfaceCompletableFuture.AsynchronousCompletionTask
.
翻译:所有没有Executor 参数的 async 方法都在 ForkJoinPool.commonPool()
线程池中执行(除非不支持最小并发度为2,这种情况下,每个任务会创建一个新线程去执行)。为了简化监控、调试和追踪,所有生成的异步任务都是接口 CompletableFuture.AsynchronousCompletionTask
的实例。
从上面这两段官方描述看。async 类方法比较容易理解,就是 CompletableFuture 实例的任务执行完成后,会将 action 提交到缺省的异步线程池 ForkJoinPool.commonPool()
,或者 async 类方法参数Executor executor 指定的线程池中执行。
而对于 non-async 的描述则有点不明确。action 可能会在完成 CompletableFuture 实例任务的线程中执行,也可能会在调用 thenRun(编排任务完成后执行 action 的系列方法) 方法的线程中执行。这个主要是看调用 thenRun 的时候,CompletableFuture 实例的任务是否已经完成。如果没有完成,那么action 会在完成任务的线程中执行。如果任务已经完成,则 action 会在调用thenAccept 等注册方法的线程中执行。
1.1.1 non-async 示例:注册 action 的时候任务可能已经结束
@Test void testThenRunWithTaskCompleted() throws Exception{ CompletableFuture<Void> future = CompletableFuture.supplyAsync(new Supplier<Integer>() { @Override public Integer get() { System.out.println("[" + Thread.currentThread().getName() + "] " + " in task" ); return 1; } }).thenRun(() -> { System.out.println("[" + Thread.currentThread().getName() + "] " + " in action" ); }); future.get(); }
运行结果:
[ForkJoinPool.commonPool-worker-1] in task
[main] in action
分析: 任务通过 CompletableFuture.supplyAsync 提交后,会以异步的方式在 ForkJoinPool.commonPool() 线程池中运行。这时候有两个线程,一个是[ForkJoinPool.commonPool-worker-1] 执行 Supplier.get 方法;一个是[main] 主线程提交完异步任务后,继续调用 thenRun 编排任务完成后执行 action。由于Supplier.get 非常简单,几乎立刻返回。所以很大概率在主线程调用 thenRun 编排任务完成后执行 action的时候,异步任务已经完成,所以 action 在主线程中执行了。注:在笔者的电脑上几乎100% 是这样的调度方式。
1.1.2 non-async 示例:注册 action 的时候任务未完成
@Test void testThenRunWithTaskUncompleted() throws Exception{ CompletableFuture<Void> future = CompletableFuture.supplyAsync(new Supplier<Integer>() { @Override public Integer get() { System.out.println("[" + Thread.currentThread().getName() + "] " + " in task" ); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return new Random().nextInt(10); } }).thenRun(() -> { System.out.println("[" + Thread.currentThread().getName() + "] " + " in action" ); }); future.get(); }
运行结果:
[ForkJoinPool.commonPool-worker-1] in task
[ForkJoinPool.commonPool-worker-1] in action
分析:在 Supplier.get 加入 sleep,延迟任务结束。主线程提交完异步任务后,继续调用 thenRun 编排任务完成后执行 action 的时候,任务没有结束。所以这个action 在完成任务的线程中执行。
1.2 Run 类方法
Run 类方法指 thenRun、runAfterBoth、runAfterEither 等。Run 类方法的动作参数都是Runable action。例如 thenRun(Runnable action)。这就意味着这个 action 不关心任务的结果,action 本身也没有返回值。只是为了实现在完成任务后执行一个动作的目的。
1.3 Accept 类方法
Accept 类方法指 thenAccept、runAcceptBoth、runAcceptEither 等。Accept 类方法的动作参数都是Consumer<? super T> action。例如 thenAccept(Consumer<? super T> action)。如方法名和参数所示,action 是接受并消费任务结果的消费者,action 没有返回值。
1.4 Apply 类方法
Apply 类方法指 thenApply、applyToEither 等。Apply 类方法的动作参数都是Function<? super T,? extends U> fn。例如 thenApply(Function<? super T,? extends U> fn)。如方法名和参数所示,action 将任务结果应用函数参数 fn 做转换,返回转换后的结果,类似于 stream 中的 map。
2 单个任务执行完成后执行一个动作(action)
方法 | 说明 |
---|---|
public CompletableFuture<Void> thenRun(Runnable action) thenRunAsync(Runnable action) thenRunAsync(Runnable action, Executor executor) | 任务完成后执行 action,action 中不关心任务的结果,action 也没有返回值 |
public CompletableFuture<Void> thenAccept(Consumer<? super T> action) thenAcceptAsync(Consumer<? super T> action) thenAcceptAsync(Consumer<? super T> action) | 任务完成后执行 action。如方法名和参数所示,action 是接受并消费任务结果的消费者,action 没有返回值 |
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) thenApplyAsync(Function<? super T,? extends U> fn) thenApplyAsync(Function<? super T,? extends U> fn, Executor executor) | 任务完成后执行 action。action 将任务结果应用 action 的函数参数做转换,返回转换后的结果,类似于 stream 中的 map |
public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action) whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action) whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor) | 任务完成后或者异常时运行action。action 是接受并消费任务结果的消费者,并且有返回值。可以认为是支持异常处理的 thenAccept 版本。 |
public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn) handleAsync(BiFunction<? super T, Throwable, ? extends U> fn) handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) | 任务完成后或者异常时运行action。可以认为是支持异常处理的 thenApply 版本。 |
public CompletableFuture exceptionally(Function<Throwable, ? extends T> fn) | 如果任务或者 action 发生异常,则会触发exceptionally的调用相当于 try...catch |
2.0 示例 exceptionally
@Test void testCompletableFutureExceptionally(){ CompletableFuture<Integer> first = CompletableFuture .supplyAsync(() -> { if (true) { throw new RuntimeException("exception in task"); } return "hello world"; }) .thenApply(data -> { if (true) { throw new RuntimeException("exception in action"); } return 1; }) .exceptionally(e -> { System.out.println("[" + Thread.currentThread().getName() + "] " + " print exception stack trace" ); e.printStackTrace(); // 异常捕捉处理,前面两个处理环节的日常都能捕获 return 0; }); }
3 两个任务执行编排
下面表格列出的每个方法都有对应两个版本的 async 方法,一个有executor 参数,一个没有。为了表格尽量简洁,表格中就不再列出async 方法了
方法 | 说明 |
---|---|
public <U> CompletableFuture<U> thenCompose(Function<? super T,? extends CompletionStage> fn) | 串行组合两个 CompletableFuture 任务,后一个任务依赖前一个任务的结果,后一个任务可以返回与第一个任务不同类型的返回值。执行后一个任务的线程与前面讨论 action 执行的线程类似。 |
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action) | 这里有两个并行任务,一个是runAfterBoth 调用本身所属的CompletableFuture 实例A,一个是参数 other 引用的任务B。两个并行任务都完成后执行 action,action 中不关心任务的结果,action 也没有返回值 |
public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action) | 与runAfterBoth 类似,也有两个并行任务 A 和 B。两个并行任务都完成后执行 action。如方法名和参数所示,action 是接受并消费两个任务结果的消费者,action 没有返回值 |
public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) | 与runAfterBoth 类似,也有两个并行任务 A 和 B。两个并行任务都完成后执行 action,action 依赖两个任务的结果,并对结果做转换,返回一个新值 |
public CompletableFuture<Void> runAfterEither(CompletionStage<?> other, Runnable action) | 与runAfterBoth 类似,也有两个并行任务 A 和 B。两个并行任务任意一个完成后执行 action,action 中不关心任务的结果,action 也没有返回值 |
public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action) | 与runAfterBoth 类似,也有两个并行任务 A 和 B。两个并行任务任意一个完成后执行 action。如方法名和参数所示,action 是接受并消费两个任务结果的消费者,action 没有返回值 |
public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn) | 与runAfterBoth 类似,也有两个并行任务 A 和 B。两个并行任务任意一个完成后执行 action。action 将任务结果应用 action 的函数参数做转换,返回转换后的结果,类似于 stream 中的 map |
4 多个任务执行编排
下面表格列出的每个方法都有对应两个版本的 async 方法,一个有executor 参数,一个没有。为了表格尽量简洁,表格中就不再列出async 方法了
方法 | 说明 |
---|---|
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) | 所有参数引用的任务都完成后,才触发执行当前的 CompletableFuture 实例任务。 |
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) | 参数引用的任务中任何一个完成后,就会触发执行当前的 CompletableFuture 实例任务。 |
5 CompletableFuture 其他方法
方法 | 说明 |
---|---|
public boolean cancel(boolean mayInterruptIfRunning) | 如果任务未完成,以 CancellationException 异常结束任务。 |
public boolean isCancelled() | 判断任务是否取消。 |
public T join() | 阻塞等待 获取返回值 |
public T get() throws InterruptedException, ExecutionException public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException | 阻塞等待(有超时重载版本)获取返回值。get 与 join区别,get 会抛出 checked 异常,调用代码需要处理异常。join 没有超时重载版本。 |
public T getNow(T valueIfAbsent) | 获取返回值,如果任务未完成则返回valueIfAbsent 参数指定value |
public boolean isDone() | 任务是否执行完成 |
加载全部内容