Java异步编程
凯哥的Java技术活 人气:0异步编程(Twitter Future)
为啥要异步
异步编程有点难以理解,这东西感觉不符合常理,因为我们思考都是按照串行的逻辑,事都是一件一件办。但在异步计算的情况下,回调往往分散在代码片段中,需要理解其中的意义。
最难搞的就是组合,嵌套。如果再加上递归,派发等逻辑,能写的极其复杂,又难以理解。当我们需要处理其中一个步骤中可能发生的错误时,情况会变得更糟。
java在核心库中引入了CompletableFuture,同时也是一个异步框架,有大约50种不同的方法用于组合、组合和执行异步计算步骤以及处理错误。
基本用法
1、封装计算逻辑,异步返回。
CompletableFuture的静态方法runAsync
和supplySync
允许我们相应地使用Runnable和SupplySync函数类型创建一个完整的future实例。如下就是一个简单的示例。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(3 * 1000); } catch (InterruptedException e) { throw new RuntimeException(e); } return "Hello"; }); System.out.println("Main goes on..."); String result = future.get(); System.out.println(result);
如上代码片段,打印后的结果是Main goes on 先执行,异步任务在future.get() 阻塞结果返回。
2、异步计算结果串联异步处理
如果想在一个future完毕后,接上另一个异步任务,则用法如下:
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> { try { System.out.println("task1: " + Thread.currentThread().getName()); Thread.sleep(2 * 1000); } catch (InterruptedException e) { throw new RuntimeException(e); } return "Hello"; }); CompletableFuture<String> future = completableFuture.thenApply(s -> { try { System.out.println("task2: " + Thread.currentThread().getName()); Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } return s + " World"; }); System.out.println(future.get());
3、并行多个异步任务,统一等待结果
当我们需要并行执行多个Future时,我们通常希望等待所有Futrue都能够执行,然后处理它们的全部统一的返回结果。
CompletableFuture 的 allOf
静态方法允许等待所有的future完成:
如下面的代码片段:
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello"); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "my"); CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> "World"); CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(future1, future2, future3); combinedFuture.get(); System.out.println(future1.isDone()); System.out.println(future2.isDone()); System.out.println(future3.isDone());
4、异步错误处理
CompletableFuture类不需要捕获语法块中的异常,而是允许我们用一种特殊的回调方法来处理。此方法接收两个参数:计算结果
(如果成功完成)和异常结果
(如果某些计算步骤有异常)。
String name = "fengkai CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> { if ("fengkai".equals(name)) { throw new RuntimeException("Computation error!"); } return "Hello, " + name; }).handle((s, t) -> s != null ? s : "Hello, Stranger!"); System.out.println(completableFuture.get());
Twitter包装
对于以上的代码,twitter工具包有自己的小包装,可以提升一点编程的逼格。
以下是用法:
pom依赖
首先引入maven坐标,因为是用scala编写的工具包,所以要引入scala的依赖。
<dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>com.twitter</groupId> <artifactId>util-core_2.12</artifactId> <version>${twitter.util.version}</version> </dependency>
1、封装计算逻辑,异步返回
注意这里的FuturePool,可以用ExecutorService去包装。
Future<String> future = futurePool.apply(() -> { try { Thread.sleep(3 * 1000); } catch (InterruptedException e) { throw new RuntimeException(e); } return "Hello"; })
2、异步计算结果串联异步处理
和CompletableFuture
相似的,有以下用法,不过是用的map方法
Future<String> future = futurePool.apply(() -> { try { System.out.println("task2: " + Thread.currentThread().getName()); Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } return "Hello"; }); Future<Object> mappedFuture = future.map(new Function1<String, Object>() { @Override public Object apply(String v1) { try { System.out.println("task2: " + Thread.currentThread().getName()); Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } return "World"; } }); Await.result(mappedFuture);
3、并行多个异步任务
这个相对看起来就简洁的多了,用List
添加所有的异步结果,然后collect
收集起来,调用get()
或者其他方法阻塞等待。
List<Future> futures = new ArrayList<>(); Future<String> future1 = futurePool.apply(() -> "hello"); Future<String> future2 = futurePool.apply(() -> "my"); Future<String> future3 = futurePool.apply(() -> "world"); futures.add(future1); futures.add(future2); futures.add(future3); Future<List<String>> collect = Futures.collect(futureList);
4、错误处理
这部分处理也比较简洁,注意这里返回的是BoxedUnit.UNIT
,其实这是scala的语法,可以理解成void
的return
。
future.onFailure(new Function1<Throwable, BoxedUnit>() { @Override public BoxedUnit apply(Throwable v1) { System.out.println("Error"); return BoxedUnit.UNIT; } );
其他用法
除了以上的用法。其实还有很多用法。
例如:collectToTry
,会返回一个Try对象,Try代表了一个成功返回的结果,或者错误返回的异常.
可以使用try.isReturn()来判断是否是正常返回的。这在多个Future异步结果的处理中用着很不错。
Future<List<Try<String>>> futures = Futures.collectToTry(futureList);
flattern()
,该方法类似scala的扁平方法,可以将嵌套的异步对象拍平。
flatMap()
,和flatMap的用法一致,不过是异步的结果。
当你用不好twitter future的时候,随时随地可以转成javaFuture。 toJavaFuture()
。所以,放心用。
其他更有趣的方法,可以自己研究下,还是有点骚东西的。
其他工具
twitter的这个工具包出了异步编程外,还有其他的很实用的工具。 包括:
codec
编解码cahce
缓存hasing
哈希相关jackson
mock
thirft
validator
自行发掘吧。 地址是: github.com/twitter/uti…
加载全部内容