亲宝软件园·资讯

展开

CompletableFuture 异步编排

fxtahe 人气:3

从Future聊起

Future是java 1.5引入的异步编程api,它表示一个异步计算结果,提供了获取异步结果的能力,解决了多线程场景下Runnable线程任务无法获取结果的问题。

但是其获取异步结果的方式并不够优雅,我们必须使用Future.get的方式阻塞调用线程,或者使用轮询方式判断 Future.isDone 任务是否结束,再获取结果。

public interface Future<V> {

    //任务是否完成
    boolean isDone();
    
    //阻塞调用线程获取异步结果
    V get() throws InterruptedException, ExecutionException;

   //在指定时间内阻塞线程获取异步结果
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

假如存在多个异步任务相互依赖,一个或多个异步线程任务需要依赖上一个异步线程任务结果,并且多个异步任务能够组合结果,显然这种阻塞线程的方式并不能优雅解决。

我们更希望能够提供一种异步回调的方式,组合各种异步任务,而无需开发者对多个异步任务结果的监听编排。

为了解决优化上述问题,java8 新增了CompletableFutureAPI ,其大大扩展了Future能力,并提供了异步任务编排能力。

CompletableFuture

CompletableFuture实现了新的接口CompletionStage,并扩展了Future接口。查看类图

创建异步任务

CompletableFuture 提供了四种方法去创建一个异步任务。

着几个方法本质上是有返回值和无返回值两种类型方法,supply方法可以获取异步结果,而run方法则无返回值,根据需要使用。

同时两种类型的方法均提供了指定线程池的重载,如果不指定线程池会默认使用ForkJoinPool.commonPool(),默认线程数为cpu核心数,建议使用自定义线程池的方式,避免线程资源竞争

一个简单样例

        CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> { System.out.println("无返回值任务"); });
        runAsync.get();
        CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(() -> "hello completableFuture");
        String result = supplyAsync.get();
        System.out.println(result);

我们依然可以通过get()方法阻塞获取异步结果任务,但是CompletableFuture主要还是用于异步回调及异步任务编排使用。

异步回调

在任务执行结束后我们希望能够自动触发回调方法,CompletableFuture提供了两种方法实现。

whenComplete 与 handle 区别:两者均接受上一阶段任务结果或异常,但是whenComplete 回调中没有返回值,所以其结果是上一阶段任务,而handle 最终返回的是其回调方法方法,其主要是BiConsumerBiFunction的区别。

异步编排

CompletionStage表示异步计算的一个阶段,当一个计算处理完成后会触发其他依赖的阶段。当然一个阶段的触发也可以是由多个阶段的完成触发或者多个中的任意一个完成触发。该接口定义了异步任务编排的各种场景,CompletableFuture则实现了这些场景。

可以把这些场景大致分为三类:串行、AND和OR。下面会逐个分析各个场景,接口中定义的以Async结尾的方法,指下一阶段任务会被单独提交到线程池中执行,后面不在赘述。

串行

当上一阶段任务执行完毕后,继续提交执行其他任务

T:上一个任务返回结果的类型 U:当前任务的返回值类型

AND

组合多个异步任务,当多个任务执行完毕继续执行其他任务

T:上一个任务返回结果的类型 U:上一个other任务的返回值类型 V:当前任务返回值

OR

当多个任务中任意任务执行完成则继续执行其他任务。

Future 机制扩展

CompletableFuture不仅实现了Future接口,同时对其进行了扩展,提供了更加优雅的实现。

CompletableFuture 实践

我们通过CompletableFuture实现一个经典的烧水程序。

我们可以把这个流程分为三个异步任务。

任务1:洗水壶->烧水

任务2:洗水壶->洗茶杯->拿茶叶

任务3:泡茶,需要等待任务1与任务2结束。

通过代码模拟实现

        CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("洗水壶");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException ex) {
                ex.printStackTrace();
            }
            return "水壶";
        }).thenApply(e->{
            System.out.println("烧水");
            try {
                Thread.sleep(5000);
            } catch (InterruptedException ex) {
                ex.printStackTrace();
            }
            return "热水";
        });
        //洗水壶->洗水杯->拿茶叶
        CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("洗茶壶");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException ex) {
                ex.printStackTrace();
            }
            return "茶壶";
        }).thenApply(e->{
            try {
                Thread.sleep(2000);
            } catch (InterruptedException ex) {
                ex.printStackTrace();
            }
            System.out.println("洗水杯");
            return "水杯";
        }).thenApply(e->{
            System.out.println("拿茶叶");
            return "茶叶";
        });
        //泡茶
        CompletableFuture<String> task3 = task1.thenCombine(task2, (a, b) -> {
            System.out.println("泡茶");
            return "茶";
        });
        String tea = task3.join();
        System.out.println(tea);

加载全部内容

相关教程
猜你喜欢
用户评论