0318 guava并发工具
李福春 人气:3
![image.png](https://img2020.cnblogs.com/other/268922/202003/268922-20200319135319808-1800226777.png)
并发是一个难题,但是可以通过使用强力简单的抽象来显著的简化,为了简化问题,guava扩展了Future接口,即 ListenableFuture (可以监听的Future)。
我强烈建议你在你的所有代码里使用ListenableFuture去替代Future,原因如下: - 很多的Futures 类的方法需要它。(Futures工具类使用) - 它比后来改造为ListenableFutrue更简单。(早点使用比重构更简单) - 工具方法的提供者不需要提供Future和ListenableFuture方法的变体。(不需要兼容两套)
# 接口 一个传统的Futrue代表一个异步计算的结果:一个可能完成也可能没有完成输出结果的计算。
一个Future可以用在进度计算,或者说是 一个提供给我们结果的服务的承诺。
一个ListenableFuture允许注册当你在计算完成的时候的回调,或者计算已经完成了。
这个简单的增强让高效支持多种操作成为可能。而Future接口并不能支持。
ListenbleFuture中添加的基本操作是
addListener(Runnable , Executor ),
它指出了当未来计算完成时,指定的Runnable会在指定的Executor中运行。
# 增加回调 很多用户喜欢使用 Futures.addCallback(ListenableFuture,FutureCallback,Executor)方法。
FutureCallback实现了下面两个方法:
- onSuccess(v) 当未来成功执行的动作,基于计算结果 - onFailure(Throwable) 当未来失败执行的动作,基于失败 # 创建 相较于jdk提供的 ExecutorService.submit(Callable)方法来初始化一个异步计算。它返回一个常规的Future,
guava提供了ListeningExecutorService接口,它返回ListenableFuture。
把ExecutorService转换为ListenableExecutorService
使用:MoreExecutors.listeningDecorator(ExecutorService) 基础用法如下: ```java /** * 说明:使用例子代码 * @author carter * 创建时间: 2020年03月19日 9:54 上午 **/ @Slf4j public class ListenableFutureUtils { public static void main(String[] args) { ListeningExecutorService service = MoreExecutors.listeningDecorator( Executors.newFixedThreadPool(10)); final ListenableFuture listenableFuture = service.submit(() -> { try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } return new AResult(30, "male", 1); }); Futures.addCallback(listenableFuture, new FutureCallback() { @Override public void onSuccess(AResult aResult) { log.info("计算成功,{}",aResult); } @Override public void onFailure(Throwable throwable) { log.error("计算错误",throwable); } },service); } @Data @AllArgsConstructor public static class AResult{ private Integer age; private String sex; private Integer id; } } ``` 相对的,如果你想从基于FutureTask的API转换过来,
Guava提供了
ListenableFutureTask.create(Callable)
和
ListenableFutureTask.create(Runnable)
不同于jdk,ListenableFutureTask并不是直接扩展的。 如果你喜欢抽象的设置future的值,而不是实现一个方法然后计算值,可以考虑使用AbstractFuture或使用SettableFuture ; 如果你必须转换Future为ListenableFuture,你别无选择,必须使用 **JdkFutureAdapters.listenInPoolThread(Future)来转换Future为ListenableFuture**
任何时候只要可能,推荐你修改源码让它返回一个 ListenableFuture # 应用 使用ListenablFuture最重要的原因是可以使用链式异步操作。 代码如下: ```java package com.xxx.demo; import com.google.common.util.concurrent.AsyncFunction; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import lombok.AllArgsConstructor; import lombok.Data; /** * 说明:异步操作链 * @author carter * 创建时间: 2020年03月19日 10:11 上午 **/ public class ApplicationUtils { public static void main(String[] args) { Query query = new Query(30); ListenableFuture rowKeyFuture = lookUp(query);
AsyncFunction queryFun = rowKey -> readData(rowKey);
final ListenableFuture queryResultListenableFuture =
Futures.transformAsync(rowKeyFuture, queryFun);
}
private static ListenableFuture readData(RowKey rowKey) {
return null;
}
private static ListenableFuture lookUp(Query query) {
return null;
}
@Data
@AllArgsConstructor
public static class RowKey {
private String id;
}
@Data
@AllArgsConstructor
public static class Query {
private Integer age;
}
@Data
@AllArgsConstructor
public static class QueryResult {
private String id;
private String age;
}
}
```
很多其他高效支持的操作ListenableFuture提供,而Future不提供。
不同的操作可以被不同的线程池执行,一个简单的ListenableFuture可以有多个操作去等待。 只要一个操作开始,其他多个操作应该开始,fan-out, 千帆竞发。 ListenableFuture可以实现这样的操作:它触发了所有请求的回调。 通过少量的工作,我们可以 fan-in. 触发一个ListenableFuture 来获得计算结果,当其他的Future结束的时候。 Futures.allAsList是一个例子。 方法介绍: | 方法 | 描述 | | --- | --- | | transformAsync(ListenableFuture , AsyncFunction , Executor) | 返回一个新的ListenableFuture,它的结果是执行异步函数的返回,函数入参是ListenableFuture的返回结果; | | transform(ListenableFuture , Function , Executor) | 返回一个新的ListenableFuture,它的结果是执行函数的返回,函数入参是ListenableFuture的返回结果; | | allAsList(Iterable) | 返回一个ListenableFuture,它的结果是一个list,包含每一个列表中的ListenableFuture的执行结果,任何一个ListenableFuture执行失败或者取消,最后的返回结果取消 |
| successfullAsList(Iterable) | 返回一个ListenableFuture,它的结果是一个list,包含每一个列表中的ListenableFuture的执行结果,成功的是结果,失败或者取消的值使用null替代 |
AsyncFunction 提供了一个方法 , ListenableFuture apply(A inpunt),它可以用来异步的转换值。
代码如下:
```java
package com.xxx.demo;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
/**
* 说明:成功执行结果汇集
* @author carter
* 创建时间: 2020年03月19日 10:34 上午
**/
@Slf4j
public class Test3 {
public static void main(String[] args) {
List> querys = Lists.newLinkedList();
final ListenableFuture
并发是一个难题,但是可以通过使用强力简单的抽象来显著的简化,为了简化问题,guava扩展了Future接口,即 ListenableFuture (可以监听的Future)。
我强烈建议你在你的所有代码里使用ListenableFuture去替代Future,原因如下: - 很多的Futures 类的方法需要它。(Futures工具类使用) - 它比后来改造为ListenableFutrue更简单。(早点使用比重构更简单) - 工具方法的提供者不需要提供Future和ListenableFuture方法的变体。(不需要兼容两套)
# 接口 一个传统的Futrue代表一个异步计算的结果:一个可能完成也可能没有完成输出结果的计算。
一个Future可以用在进度计算,或者说是 一个提供给我们结果的服务的承诺。
一个ListenableFuture允许注册当你在计算完成的时候的回调,或者计算已经完成了。
这个简单的增强让高效支持多种操作成为可能。而Future接口并不能支持。
ListenbleFuture中添加的基本操作是
addListener(Runnable , Executor ),
它指出了当未来计算完成时,指定的Runnable会在指定的Executor中运行。
# 增加回调 很多用户喜欢使用 Futures.addCallback(ListenableFuture,FutureCallback,Executor)方法。
FutureCallback实现了下面两个方法:
- onSuccess(v) 当未来成功执行的动作,基于计算结果 - onFailure(Throwable) 当未来失败执行的动作,基于失败 # 创建 相较于jdk提供的 ExecutorService.submit(Callable)方法来初始化一个异步计算。它返回一个常规的Future,
guava提供了ListeningExecutorService接口,它返回ListenableFuture。
把ExecutorService转换为ListenableExecutorService
使用:MoreExecutors.listeningDecorator(ExecutorService) 基础用法如下: ```java /** * 说明:使用例子代码 * @author carter * 创建时间: 2020年03月19日 9:54 上午 **/ @Slf4j public class ListenableFutureUtils { public static void main(String[] args) { ListeningExecutorService service = MoreExecutors.listeningDecorator( Executors.newFixedThreadPool(10)); final ListenableFuture listenableFuture = service.submit(() -> { try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } return new AResult(30, "male", 1); }); Futures.addCallback(listenableFuture, new FutureCallback() { @Override public void onSuccess(AResult aResult) { log.info("计算成功,{}",aResult); } @Override public void onFailure(Throwable throwable) { log.error("计算错误",throwable); } },service); } @Data @AllArgsConstructor public static class AResult{ private Integer age; private String sex; private Integer id; } } ``` 相对的,如果你想从基于FutureTask的API转换过来,
Guava提供了
ListenableFutureTask.create(Callable)
和
ListenableFutureTask.create(Runnable)
不同于jdk,ListenableFutureTask并不是直接扩展的。 如果你喜欢抽象的设置future的值,而不是实现一个方法然后计算值,可以考虑使用AbstractFuture或使用SettableFuture ; 如果你必须转换Future为ListenableFuture,你别无选择,必须使用 **JdkFutureAdapters.listenInPoolThread(Future)来转换Future为ListenableFuture**
任何时候只要可能,推荐你修改源码让它返回一个 ListenableFuture # 应用 使用ListenablFuture最重要的原因是可以使用链式异步操作。 代码如下: ```java package com.xxx.demo; import com.google.common.util.concurrent.AsyncFunction; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import lombok.AllArgsConstructor; import lombok.Data; /** * 说明:异步操作链 * @author carter * 创建时间: 2020年03月19日 10:11 上午 **/ public class ApplicationUtils { public static void main(String[] args) { Query query = new Query(30); ListenableFuture
不同的操作可以被不同的线程池执行,一个简单的ListenableFuture可以有多个操作去等待。 只要一个操作开始,其他多个操作应该开始,fan-out, 千帆竞发。 ListenableFuture可以实现这样的操作:它触发了所有请求的回调。 通过少量的工作,我们可以 fan-in. 触发一个ListenableFuture 来获得计算结果,当其他的Future结束的时候。 Futures.allAsList是一个例子。 方法介绍: | 方法 | 描述 | | --- | --- | | transformAsync(ListenableFuture , AsyncFunction , Executor) | 返回一个新的ListenableFuture,它的结果是执行异步函数的返回,函数入参是ListenableFuture的返回结果; | | transform(ListenableFuture , Function , Executor) | 返回一个新的ListenableFuture,它的结果是执行函数的返回,函数入参是ListenableFuture的返回结果; | | allAsList(Iterable
- > successfulAsList =
Futures.successfulAsList(querys);
Futures.addCallback(successfulAsList, new FutureCallback
- >() {
@Override
public void onSuccess(List
这个代码不对,因为当外层的Future 取消的时候,无法传播到内层的Future,
这也是一个 使用get()检查别的Future或者Listnener的常规的错误, 但是,除非特别关注 否则 otherCallback抛出的异常会被压制。
为了避免这种情况,所有的guava的Future处理方法(有些从jdk来),有 *Async版本来安全的解开这个嵌套。 比如:transform,transformAsyn, submit, submitAsync方法。 # 深入研究 ![](https://img2020.cnblogs.com/other/268922/202003/268922-20200319135320473-1689402433.svg+xml) > 原创不易,转载请注明出处。
加载全部内容