从源码上理解Netty并发工具-Promise
throwable 人气:2前提
最近一直在看Netty
相关的内容,也在编写一个轻量级的RPC
框架来练手,途中发现了Netty
的源码有很多亮点,某些实现甚至可以用苛刻来形容。另外,Netty
提供的工具类也是相当优秀,可以开箱即用。这里分析一下个人比较喜欢的领域,并发方面的一个Netty
工具模块 - Promise
。
环境版本:
Netty:4.1.44.Final
JDK1.8
Promise简介
Promise,中文翻译为承诺或者许诺,含义是人与人之间,一个人对另一个人所说的具有一定憧憬的话,一般是可以实现的。
io.netty.util.concurrent.Promise
在注释中只有一句话:特殊的可写的io.netty.util.concurrent.Future
(Promise
接口是io.netty.util.concurrent.Future
的子接口)。而io.netty.util.concurrent.Future
是java.util.concurrent.Future
的扩展,表示一个异步操作的结果。我们知道,JDK
并发包中的Future
是不可写,也没有提供可监听的入口(没有应用观察者模式),而Promise
很好地弥补了这两个问题。另一方面从继承关系来看,DefaultPromise
是这些接口的最终实现类,所以分析源码的时候需要把重心放在DefaultPromise
类。一般一个模块提供的功能都由接口定义,这里分析一下两个接口的功能列表:
io.netty.util.concurrent.Promise
io.netty.util.concurrent.Future
先看io.netty.util.concurrent.Future
接口:
public interface Future<V> extends java.util.concurrent.Future<V> {
// I/O操作是否执行成功
boolean isSuccess();
// 标记是否可以通过下面的cancel(boolean mayInterruptIfRunning)取消I/O操作
boolean isCancellable();
// 返回I/O操作的异常实例 - 如果I/O操作本身是成功的,此方法返回null
Throwable cause();
// 为当前Future实例添加监听Future操作完成的监听器 - isDone()方法激活之后所有监听器实例会得到回调
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
// 为当前Future移除监听Future操作完成的监听器
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
// 同步等待Future完成得到最终结果(成功)或者抛出异常(失败),响应中断
Future<V> sync() throws InterruptedException;
// 同步等待Future完成得到最终结果(成功)或者抛出异常(失败),不响应中断
Future<V> syncUninterruptibly();
// 等待Future完成,响应中断
Future<V> await() throws InterruptedException;
// 等待Future完成,不响应中断
Future<V> awaitUninterruptibly();
// 带超时时限的等待Future完成,响应中断
boolean await(long timeout, TimeUnit unit) throws InterruptedException;
boolean await(long timeoutMillis) throws InterruptedException;
// 带超时时限的等待Future完成,不响应中断
boolean awaitUninterruptibly(long timeout, TimeUnit unit);
boolean awaitUninterruptibly(long timeoutMillis);
// 非阻塞马上返回Future的结果,如果Future未完成,此方法一定返回null;有些场景下如果Future成功获取到的结果是null则需要二次检查isDone()方法是否为true
V getNow();
// 取消当前Future实例的执行,如果取消成功会抛出CancellationException异常
@Override
boolean cancel(boolean mayInterruptIfRunning);
}
sync()
和await()
方法类似,只是sync()
会检查异常执行的情况,一旦发现执行异常马上把异常实例包装抛出,而await()
方法对异常无感知。
接着看io.netty.util.concurrent.Promise
接口:
public interface Promise<V> extends Future<V> {
// 标记当前Future成功,设置结果,如果设置成功,则通知所有的监听器,如果Future已经成功或者失败,则抛出IllegalStateException
Promise<V> setSuccess(V result);
// 标记当前Future成功,设置结果,如果设置成功,则通知所有的监听器并且返回true,否则返回false
boolean trySuccess(V result);
// 标记当前Future失败,设置结果为异常实例,如果设置成功,则通知所有的监听器,如果Future已经成功或者失败,则抛出IllegalStateException
Promise<V> setFailure(Throwable cause);
// 标记当前Future失败,设置结果为异常实例,如果设置成功,则通知所有的监听器并且返回true,否则返回false
boolean tryFailure(Throwable cause);
// 标记当前的Promise实例为不可取消,设置成功返回true,否则返回false
boolean setUncancellable();
// 下面的方法和io.netty.util.concurrent.Future中的方法基本一致,只是修改了返回类型为Promise
@Override
Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
@Override
Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
@Override
Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
@Override
Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
@Override
Promise<V> await() throws InterruptedException;
@Override
Promise<V> awaitUninterruptibly();
@Override
Promise<V> sync() throws InterruptedException;
@Override
Promise<V> syncUninterruptibly();
}
到此,Promise
接口的所有功能都分析完毕,接下来从源码角度详细分析Promise
的实现。
Promise源码实现
Promise
的实现类为io.netty.util.concurrent.DefaultPromise
(其实DefaultPromise
还有很多子类,某些实现是为了定制特定的场景做了扩展),而DefaultPromise
继承自io.netty.util.concurrent.AbstractFuture
:
public abstract class AbstractFuture<V> implements Future<V> {
// 永久阻塞等待获取结果的方法
@Override
public V get() throws InterruptedException, ExecutionException {
// 调用响应中断的永久等待方法进行阻塞
await();
// 从永久阻塞中唤醒后,先判断Future是否执行异常
Throwable cause = cause();
if (cause == null) {
// 异常为空说明执行成功,调用getNow()方法返回结果
return getNow();
}
// 异常为空不为空,这里区分特定的取消异常则转换为CancellationException抛出
if (cause instanceof CancellationException) {
throw (CancellationException) cause;
}
// 非取消异常的其他所有异常都被包装为执行异常ExecutionException抛出
throw new ExecutionException(cause);
}
// 带超时阻塞等待获取结果的方法
@Override
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
// 调用响应中断的带超时时限等待方法进行阻塞
if (await(timeout, unit)) {
// 从带超时时限阻塞中唤醒后,先判断Future是否执行异常
Throwable cause = cause();
if (cause == null) {
// 异常为空说明执行成功,调用getNow()方法返回结果
return getNow();
}
// 异常为空不为空,这里区分特定的取消异常则转换为CancellationException抛出
if (cause instanceof CancellationException) {
throw (CancellationException) cause;
}
// 在非等待超时的前提下,非取消异常的其他所有异常都被包装为执行异常ExecutionException抛出
throw new ExecutionException(cause);
}
// 方法步入此处说明等待超时,则抛出超时异常TimeoutException
throw new TimeoutException();
}
}
AbstractFuture
仅仅对get()
和get(long timeout, TimeUnit unit)
两个方法进行了实现,其实这两处的实现和java.util.concurrent.FutureTask
中的实现方式十分相似。
DefaultPromise
的源码比较多,这里分开多个部分去阅读,先看它的属性和构造函数:
public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
// 正常日志的日志句柄,InternalLogger是Netty内部封装的日志接口
private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultPromise.class);
// 任务拒绝执行时候的日志句柄 - Promise需要作为一个任务提交到线程中执行,如果任务拒绝则使用此日志句柄打印日志
private static final InternalLogger rejectedExecutionLogger =
InternalLoggerFactory.getInstance(DefaultPromise.class.getName() + ".rejectedExecution");
// 监听器的最大栈深度,默认值为8,这个值是防止嵌套回调调用的时候栈深度过大导致内存溢出,后面会举个例子说明它的用法
private static final int MAX_LISTENER_STACK_DEPTH = Math.min(8,
SystemPropertyUtil.getInt("io.netty.defaultPromise.maxListenerStackDepth", 8));
// 结果更新器,用于CAS更新结果result的值
@SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result");
// 用于填充result的值,当设置结果result传入null,Promise执行成功,用这个值去表示成功的结果
private static final Object SUCCESS = new Object();
// 用于填充result的值,表示Promise不能被取消
private static final Object UNCANCELLABLE = new Object();
// CancellationException实例的持有器,用于判断Promise取消状态和抛出CancellationException
private static final CauseHolder CANCELLATION_CAUSE_HOLDER = new CauseHolder(ThrowableUtil.unknownStackTrace(
new CancellationException(), DefaultPromise.class, "cancel(...)"));
// CANCELLATION_CAUSE_HOLDER的异常栈信息元素数组
private static final StackTraceElement[] CANCELLATION_STACK = CANCELLATION_CAUSE_HOLDER.cause.getStackTrace();
// 真正的结果对象,使用Object类型,最终有可能为null、真正的结果实例、SUCCESS、UNCANCELLABLE或者CANCELLATION_CAUSE_HOLDER等等
private volatile Object result;
// 事件执行器,这里暂时不做展开,可以理解为单个调度线程
private final EventExecutor executor;
// 监听器集合,可能是单个GenericFutureListener实例或者DefaultFutureListeners(监听器集合)实例
private Object listeners;
// 等待获取结果的线程数量
private short waiters;
// 标记是否正在回调监听器
private boolean notifyingListeners;
// 构造函数依赖于EventExecutor
public DefaultPromise(EventExecutor executor) {
this.executor = checkNotNull(executor, "executor");
}
protected DefaultPromise() {
// only for subclasses - 这个构造函数预留给子类
executor = null;
}
// ... 省略其他代码 ...
// 私有静态内部类,用于存放Throwable实例,也就是持有异常的原因实例
private static final class CauseHolder {
final Throwable cause;
CauseHolder(Throwable cause) {
this.cause = cause;
}
}
// 私有静态内部类,用于覆盖CancellationException的栈信息为前面定义的CANCELLATION_STACK,同时覆盖了toString()返回CancellationException的全类名
private static final class LeanCancellationException extends CancellationException {
private static final long serialVersionUID = 2794674970981187807L;
@Override
public Throwable fillInStackTrace() {
setStackTrace(CANCELLATION_STACK);
return this;
}
@Override
public String toString() {
return CancellationException.class.getName();
}
}
// ... 省略其他代码 ...
}
Promise
目前支持两种类型的监听器:
GenericFutureListener
:支持泛型的Future
监听器。GenericProgressiveFutureListener
:它是GenericFutureListener
的子类,支持进度表示和支持泛型的Future
监听器(有些场景需要多个步骤实现,类似于进度条那样)。
// GenericFutureListener
public interface GenericFutureListener<F extends Future<?>> extends EventListener {
void operationComplete(F future) throws Exception;
}
// GenericProgressiveFutureListener
public interface GenericProgressiveFutureListener<F extends ProgressiveFuture<?>> extends GenericFutureListener<F> {
void operationProgressed(F future, long progress, long total) throws Exception;
}
为了让Promise
支持多个监听器,Netty
添加了一个默认修饰符修饰的DefaultFutureListeners
类用于保存监听器实例数组:
// DefaultFutureListeners
final class DefaultFutureListeners {
private GenericFutureListener<? extends Future<?>>[] listeners;
private int size;
private int progressiveSize; // the number of progressive listeners
// 这个构造相对特别,是为了让Promise中的listeners(Object类型)实例由单个GenericFutureListener实例转换为DefaultFutureListeners类型
@SuppressWarnings("unchecked")
DefaultFutureListeners(GenericFutureListener<? extends Future<?>> first, GenericFutureListener<? extends Future<?>> second) {
listeners = new GenericFutureListener[2];
listeners[0] = first;
listeners[1] = second;
size = 2;
if (first instanceof GenericProgressiveFutureListener) {
progressiveSize ++;
}
if (second instanceof GenericProgressiveFutureListener) {
progressiveSize ++;
}
}
public void add(GenericFutureListener<? extends Future<?>> l) {
GenericFutureListener<? extends Future<?>>[] listeners = this.listeners;
final int size = this.size;
// 注意这里,每次扩容数组长度是原来的2倍
if (size == listeners.length) {
this.listeners = listeners = Arrays.copyOf(listeners, size << 1);
}
// 把当前的GenericFutureListener加入数组中
listeners[size] = l;
// 监听器总数量加1
this.size = size + 1;
// 如果为GenericProgressiveFutureListener,则带进度指示的监听器总数量加1
if (l instanceof GenericProgressiveFutureListener) {
progressiveSize ++;
}
}
public void remove(GenericFutureListener<? extends Future<?>> l) {
final GenericFutureListener<? extends Future<?>>[] listeners = this.listeners;
int size = this.size;
for (int i = 0; i < size; i ++) {
if (listeners[i] == l) {
// 计算需要需要移动的监听器的下标
int listenersToMove = size - i - 1;
if (listenersToMove > 0) {
// listenersToMove后面的元素全部移动到数组的前端
System.arraycopy(listeners, i + 1, listeners, i, listenersToMove);
}
// 当前监听器总量的最后一个位置设置为null,数量减1
listeners[-- size] = null;
this.size = size;
// 如果监听器是GenericProgressiveFutureListener,则带进度指示的监听器总数量减1
if (l instanceof GenericProgressiveFutureListener) {
progressiveSize --;
}
return;
}
}
}
// 返回监听器实例数组
public GenericFutureListener<? extends Future<?>>[] listeners() {
return listeners;
}
// 返回监听器总数量
public int size() {
return size;
}
// 返回带进度指示的监听器总数量
public int progressiveSize() {
return progressiveSize;
}
}
接下来看DefaultPromise
的剩余方法实现,笔者觉得DefaultPromise
方法实现在代码顺序上是有一定的艺术的。先看几个判断Promise
执行状态的方法:
public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
// ... 省略其他代码 ...
@Override
public boolean setUncancellable() {
// 通过结果更新器CAS更新result为UNCANCELLABLE,期望旧值为null,更新值为UNCANCELLABLE属性,如果成功则返回true
if (RESULT_UPDATER.compareAndSet(this, null, UNCANCELLABLE)) {
return true;
}
Object result = this.result;
// 步入这里说明result当前值不为null,isDone0()和isCancelled0()都是终态,这里如果命中终态就返回false
//(笔者注:其实可以这样认为,这里result不能为null,如果不为终态,它只能是UNCANCELLABLE属性实例)
return !isDone0(result) || !isCancelled0(result);
}
@Override
public boolean isSuccess() {
Object result = this.result;
// 如果执行成功,则结果不为null,同时不为UNCANCELLABLE,同时不为CauseHolder类型
//(笔者注:其实可以这样认为,Promise为成功,则result只能是一个开发者定义的实例或者SUCCESS属性实例)
return result != null && result != UNCANCELLABLE && !(result instanceof CauseHolder);
}
@Override
public boolean isCancellable() {
// 是否可取消的,result为null说明Promise处于初始化状态尚未执行,则认为可以取消
return result == null;
}
@Override
public Throwable cause() {
// 通过当前result获取Throwable实例
return cause0(result);
}
private Throwable cause0(Object result) {
// result非CauseHolder类型,则直接返回null
if (!(result instanceof CauseHolder)) {
return null;
}
// 如果result为CANCELLATION_CAUSE_HOLDER(静态CancellationException的持有)
if (result == CANCELLATION_CAUSE_HOLDER) {
// 则新建一个自定义LeanCancellationException实例
CancellationException ce = new LeanCancellationException();
// 如果CAS更新结果result为LeanCancellationException新实例则返回
if (RESULT_UPDATER.compareAndSet(this, CANCELLATION_CAUSE_HOLDER, new CauseHolder(ce))) {
return ce;
}
// 走到这里说明了result是非CANCELLATION_CAUSE_HOLDER的自定义CauseHolder实例
result = this.result;
}
// 兜底返回CauseHolder持有的cause
return ((CauseHolder) result).cause;
}
// 静态方法,判断Promise是否为取消,依据是result必须是CauseHolder类型,同时CauseHolder中的cause必须为CancellationException类型或者其子类
private static boolean isCancelled0(Object result) {
return result instanceof CauseHolder && ((CauseHolder) result).cause instanceof CancellationException;
}
// 静态方法,判断Promise是否完成,依据是result不为null同时不为UNCANCELLABLE属性实例
private static boolean isDone0(Object result) {
return result != null && result != UNCANCELLABLE;
}
// 判断Promise实例是否取消
@Override
public boolean isCancelled() {
return isCancelled0(result);
}
// 判断Promise实例是否完成
@Override
public boolean isDone() {
return isDone0(result);
}
// ... 省略其他代码 ...
}
接着看监听器的添加和移除方法(这其中也包含了通知监听器的逻辑):
public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
// ... 省略其他代码 ...
@Override
public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
// 入参非空校验
checkNotNull(listener, "listener");
// 加锁,锁定的对象是Promise实例自身
synchronized (this) {
// 添加监听器
addListener0(listener);
}
// 如果Promise实例已经执行完毕,则通知监听器进行回调
if (isDone()) {
notifyListeners();
}
return this;
}
@Override
public Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners) {
// 入参非空校验
checkNotNull(listeners, "listeners");
// 加锁,锁定的对象是Promise实例自身
synchronized (this) {
// 遍历入参数组添加监听器,有空元素直接跳出
for (GenericFutureListener<? extends Future<? super V>> listener : listeners) {
if (listener == null) {
break;
}
addListener0(listener);
}
}
// 如果Promise实例已经执行完毕,则通知监听器进行回调
if (isDone()) {
notifyListeners();
}
return this;
}
@Override
public Promise<V> removeListener(final GenericFutureListener<? extends Future<? super V>> listener) {
// 入参非空校验
checkNotNull(listener, "listener");
// 加锁,锁定的对象是Promise实例自身
synchronized (this) {
// 移除监听器
removeListener0(listener);
}
return this;
}
@Override
public Promise<V> removeListeners(final GenericFutureListener<? extends Future<? super V>>... listeners) {
// 入参非空校验
checkNotNull(listeners, "listeners");
// 加锁,锁定的对象是Promise实例自身
synchronized (this) {
// 遍历入参数组移除监听器,有空元素直接跳出
for (GenericFutureListener<? extends Future<? super V>> listener : listeners) {
if (listener == null) {
break;
}
removeListener0(listener);
}
}
return this;
}
private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
// 如果Promise实例持有listeners为null,则直接设置为入参listener
if (listeners == null) {
listeners = listener;
} else if (listeners instanceof DefaultFutureListeners) {
// 如果当前Promise实例持有listeners的是DefaultFutureListeners类型,则调用它的add()方法进行添加
((DefaultFutureListeners) listeners).add(listener);
} else {
// 步入这里说明当前Promise实例持有listeners为单个GenericFutureListener实例,需要转换为DefaultFutureListeners实例
listeners = new DefaultFutureListeners((GenericFutureListener<?>) listeners, listener);
}
}
private void removeListener0(GenericFutureListener<? extends Future<? super V>> listener) {
// 如果当前Promise实例持有listeners的是DefaultFutureListeners类型,则调用它的remove()方法进行移除
if (listeners instanceof DefaultFutureListeners) {
((DefaultFutureListeners) listeners).remove(listener);
} else if (listeners == listener) {
// 如果当前Promise实例持有listeners不为DefaultFutureListeners类型,也就是单个GenericFutureListener并且和传入的listener相同,
// 则Promise实例持有listeners置为null
listeners = null;
}
}
private void notifyListeners() {
EventExecutor executor = executor();
// 当前执行线程是事件循环线程,那么直接同步调用,简单来说就是调用notifyListeners()方法的线程和EventExecutor是同一个线程
if (executor.inEventLoop()) {
// 下面的ThreadLocal和listenerStackDepth是调用栈深度保护相关,博文会另起一个章节专门讲解这个问题,这里可以暂时忽略
final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
final int stackDepth = threadLocals.futureListenerStackDepth();
if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
threadLocals.setFutureListenerStackDepth(stackDepth + 1);
try {
notifyListenersNow();
} finally {
threadLocals.setFutureListenerStackDepth(stackDepth);
}
return;
}
}
// 当前执行线程不是事件循环线程,则把notifyListenersNow()包装为Runnable实例放到EventExecutor中执行
safeExecute(executor, new Runnable() {
@Override
public void run() {
notifyListenersNow();
}
});
}
// 使用EventExecutor进行任务执行,execute()方法抛出的异常会使用rejectedExecutionLogger句柄打印
private static void safeExecute(EventExecutor executor, Runnable task) {
try {
executor.execute(task);
} catch (Throwable t) {
rejectedExecutionLogger.error("Failed to submit a listener notification task. Event loop shut down?", t);
}
}
// 马上通知所有监听器进行回调
private void notifyListenersNow() {
Object listeners;
// 这里加锁,在锁的保护下设置notifyingListeners的值,如果多个线程调用同一个Promise实例的notifyListenersNow()方法
// 命中notifyingListeners的线程可以直接返回
synchronized (this) {
// Only proceed if there are listeners to notify and we are not already notifying listeners.
if (notifyingListeners || this.listeners == null) {
return;
}
notifyingListeners = true;
// 临时变量listeners存放瞬时的监听器实例,方便下一步设置Promise实例的listeners为null
listeners = this.listeners;
// 重置当前Promise实例的listeners为null
this.listeners = null;
}
for (;;) {
if (listeners instanceof DefaultFutureListeners) {
// 多个监听器情况下的通知
notifyListeners0((DefaultFutureListeners) listeners);
} else {
// 单个监听器情况下的通知
notifyListener0(this, (GenericFutureListener<?>) listeners);
}
synchronized (this) {
if (this.listeners == null) {
// 这里因为没有异常抛出的可能,不用在finally块中编写,重置notifyingListeners为false并且返回跳出循环
notifyingListeners = false;
return;
}
// 临时变量listeners存放瞬时的监听器实例,回调操作判断是基于临时实例去做 - 这里可能由另一个线程更新了listeners的值
listeners = this.listeners;
// 重置当前Promise实例的listeners为null,确保监听器只会被回调一次,下一次跳出for死循环
this.listeners = null;
}
}
}
// 遍历DefaultFutureListeners中的listeners数组,调用静态方法notifyListener0()
private void notifyListeners0(DefaultFutureListeners listeners) {
GenericFutureListener<?>[] a = listeners.listeners();
int size = listeners.size();
for (int i = 0; i < size; i ++) {
notifyListener0(this, a[i]);
}
}
// 这个静态方法是最终监听器回调的方法,也就是简单调用GenericFutureListener#operationComplete()传入的是当前的Promise实例,捕获一切异常打印warn日志
@SuppressWarnings({ "unchecked", "rawtypes" })
private static void notifyListener0(Future future, GenericFutureListener l) {
try {
l.operationComplete(future);
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
}
}
}
}
然后看wait()
和sync()
方法体系:
public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
// ... 省略其他代码 ...
@Override
public Promise<V> await() throws InterruptedException {
// 如果Promise执行完毕,直接返回
if (isDone()) {
return this;
}
// 如果当前线程中断则直接抛出InterruptedException
if (Thread.interrupted()) {
throw new InterruptedException(toString());
}
// 死锁检测
checkDeadLock();
// 加锁,加锁对象是当前Promise实例
synchronized (this) {
// 这里设置一个死循环,终止条件是isDone()为true
while (!isDone()) {
// 等待线程数加1
incWaiters();
try {
// 这里调用的是Object#wait()方法进行阻塞,如果线程被中断会抛出InterruptedException
wait();
} finally {
// 解除阻塞后等待线程数减1
decWaiters();
}
}
}
return this;
}
@Override
public Promise<V> awaitUninterruptibly() {
// 如果Promise执行完毕,直接返回
if (isDone()) {
return this;
}
// 死锁检测
checkDeadLock();
boolean interrupted = false;
// 加锁,加锁对象是当前Promise实例
synchronized (this) {
// 这里设置一个死循环,终止条件是isDone()为true
while (!isDone()) {
// 等待线程数加1
incWaiters();
try {
// 这里调用的是Object#wait()方法进行阻塞,捕获了InterruptedException异常,如果抛出InterruptedException记录线程的中断状态到interrupted
wait();
} catch (InterruptedException e) {
// Interrupted while waiting.
interrupted = true;
} finally {
// 解除阻塞后等待线程数减1
decWaiters();
}
}
}
// 如果线程被中断跳出等待阻塞,则清除线程的中断标志位
if (interrupted) {
Thread.currentThread().interrupt();
}
return this;
}
// 后面的几个带超时时限的wait()方法都是调用await0()
@Override
public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
return await0(unit.toNanos(timeout), true);
}
@Override
public boolean await(long timeoutMillis) throws InterruptedException {
return await0(MILLISECONDS.toNanos(timeoutMillis), true);
}
@Override
public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
try {
return await0(unit.toNanos(timeout), false);
} catch (InterruptedException e) {
// Should not be raised at all.
throw new InternalError();
}
}
@Override
public boolean awaitUninterruptibly(long timeoutMillis) {
try {
return await0(MILLISECONDS.toNanos(timeoutMillis), false);
} catch (InterruptedException e) {
// Should not be raised at all.
throw new InternalError();
}
}
// 检查死锁,这里判断了等待线程是事件循环线程则直接抛出BlockingOperationException异常
// 简单来说就是:Promise的执行线程和等待结果的线程,不能是同一个线程,否则依赖会成环
protected void checkDeadLock() {
EventExecutor e = executor();
if (e != null && e.inEventLoop()) {
throw new BlockingOperationException(toString());
}
}
@Override
public Promise<V> sync() throws InterruptedException {
// 同步永久阻塞等待
await();
// 阻塞等待解除,如果执行存在异常,则直接抛出
rethrowIfFailed();
return this;
}
@Override
public Promise<V> syncUninterruptibly() {
// 同步永久阻塞等待 - 响应中断
awaitUninterruptibly();
// 塞等待解除,如果执行存在异常,则直接抛出
rethrowIfFailed();
return this;
}
// waiters加1,如果超过Short.MAX_VALUE则抛出IllegalStateException
private void incWaiters() {
if (waiters == Short.MAX_VALUE) {
throw new IllegalStateException("too many waiters: " + this);
}
++waiters;
}
// waiters减1
private void decWaiters() {
--waiters;
}
// cause不为null则抛出
private void rethrowIfFailed() {
Throwable cause = cause();
if (cause == null) {
return;
}
PlatformDependent.throwException(cause);
}
private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException {
// 如果Promise执行完毕,直接返回
if (isDone()) {
return true;
}
// 如果超时时限小于0那么返回isDone()的结果
if (timeoutNanos <= 0) {
return isDone();
}
// 如果允许中断,当前线程的中断标志位为true,则抛出InterruptedException
if (interruptable && Thread.interrupted()) {
throw new InterruptedException(toString());
}
// 死锁检测
checkDeadLock();
// 记录当前的纳秒时间戳
long startTime = System.nanoTime();
// 等待时间的长度 - 单位为纳秒
long waitTime = timeoutNanos;
// 记录线程是否被中断
boolean interrupted = false;
try {
// 死循环
for (;;) {
synchronized (this) {
// 如果Promise执行完毕,直接返回true - 这一步是先验判断,命中了就不需要阻塞等待
if (isDone()) {
return true;
}
// 等待线程数加1
incWaiters();
try {
// 这里调用的是带超时时限的Object#wait()方法进行阻塞
wait(waitTime / 1000000, (int) (waitTime % 1000000));
} catch (InterruptedException e) {
// 线程被中断并且外部允许中断,那么直接抛出InterruptedException
if (interruptable) {
throw e;
} else {
// 否则只记录中断过的状态
interrupted = true;
}
} finally {
// 解除阻塞后等待线程数减1
decWaiters();
}
}
// 解除阻塞后,如果Promise执行完毕,直接返回true
if (isDone()) {
return true;
} else {
// 步入这里说明Promise尚未执行完毕,则重新计算等待时间间隔的长度数量(修正),如果大于0则进入下一轮循环
waitTime = timeoutNanos - (System.nanoTime() - startTime);
if (waitTime <= 0) {
return isDone();
}
}
}
} finally {
// 如果线程被中断跳出等待阻塞,则清除线程的中断标志位
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}
// ... 省略其他代码 ...
}
最后是几个设置结果和获取结果的方法:
public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
// ... 省略其他代码 ...
@Override
public Promise<V> setSuccess(V result) {
// 设置成功结果,如果设置成功则返回当前Promise实例
if (setSuccess0(result)) {
return this;
}
// 设置失败说明了多次设置,Promise已经执行完毕,则抛出异常
throw new IllegalStateException("complete already: " + this);
}
@Override
public boolean trySuccess(V result) {
// 设置成功结果,返回的布尔值表示成功或失败
return setSuccess0(result);
}
@Override
public Promise<V> setFailure(Throwable cause) {
// 设置失败结果,如果设置成功则返回当前Promise实例
if (setFailure0(cause)) {
return this;
}
// 设置失败说明了多次设置,Promise已经执行完毕,则抛出异常
throw new IllegalStateException("complete already: " + this, cause);
}
@Override
public boolean tryFailure(Throwable cause) {
// 设置失败结果,返回的布尔值表示成功或失败
return setFailure0(cause);
}
@SuppressWarnings("unchecked")
@Override
public V getNow() {
// 非阻塞获取结果,如果result是CauseHolder类型、SUCCESS属性实例或者UNCANCELLABLE实行实例则返回null,否则返回转换类型后的result值
// 对异常无感知,如果CauseHolder包裹了异常,此方法依然返回null
Object result = this.result;
if (result instanceof CauseHolder || result == SUCCESS || result == UNCANCELLABLE) {
return null;
}
return (V) result;
}
@SuppressWarnings("unchecked")
@Override
public V get() throws InterruptedException, ExecutionException {
// 永久阻塞获取结果
Object result = this.result;
// 如果Promise未执行完毕则进行永久阻塞等待
if (!isDone0(result)) {
await();
// 更新结果临时变量
result = this.result;
}
// result为SUCCESS属性实例或者UNCANCELLABLE属性实例的时候直接返回null
if (result == SUCCESS || result == UNCANCELLABLE) {
return null;
}
// 如果result为CauseHolder类型,则获取其中持有的cause属性,也有可能为null
Throwable cause = cause0(result);
if (cause == null) {
// 执行成功的前提下转换类型后的result值返回
return (V) result;
}
// 取消的情况,抛出CancellationException
if (cause instanceof CancellationException) {
throw (CancellationException) cause;
}
// 剩余的情况一律封装为ExecutionException异常
throw new ExecutionException(cause);
}
@SuppressWarnings("unchecked")
@Override
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
// 带超时时限的阻塞获取结果
Object result = this.result;
// 如果Promise未执行完毕则进行带超时时限的阻塞等待
if (!isDone0(result)) {
if (!await(timeout, unit)) {
// 等待超时直接抛出TimeoutException
throw new TimeoutException();
}
// 更新结果临时变量
result = this.result;
}
// result为SUCCESS属性实例或者UNCANCELLABLE属性实例的时候直接返回null
if (result == SUCCESS || result == UNCANCELLABLE) {
return null;
}
// 如果result为CauseHolder类型,则获取其中持有的cause属性,也有可能为null
Throwable cause = cause0(result);
if (cause == null) {
// 执行成功的前提下转换类型后的result值返回
return (V) result;
}
// 取消的情况,抛出CancellationException
if (cause instanceof CancellationException) {
throw (CancellationException) cause;
}
// 剩余的情况一律封装为ExecutionException异常
throw new ExecutionException(cause);
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
// CAS更新result为CANCELLATION_CAUSE_HOLDER,result的期望值必须为null
if (RESULT_UPDATER.compareAndSet(this, null, CANCELLATION_CAUSE_HOLDER)) {
// 判断是否需要进行等待线程的通知
if (checkNotifyWaiters()) {
// 通知监听器进行回调
notifyListeners();
}
return true;
}
return false;
}
private boolean setSuccess0(V result) {
// 设置执行成功的结果,如果入参result为null,则选用SUCCESS属性,否则使用result
return setValue0(result == null ? SUCCESS : result);
}
private boolean setFailure0(Throwable cause) {
// 设置执行失败的结果,入参是Throwable类型,封装为CauseHolder,存放在CauseHolder实例的cause属性
return setValue0(new CauseHolder(checkNotNull(cause, "cause")));
}
private boolean setValue0(Object objResult) {
// CAS更新result为入参objResult,result的期望值必须为null或者UNCANCELLABLE才能更新成功
if (RESULT_UPDATER.compareAndSet(this, null, objResult) || RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
// 判断是否需要进行等待线程的通知
if (checkNotifyWaiters()) {
// 通知监听器进行回调
notifyListeners();
}
return true;
}
return false;
}
// 判断是否需要进行等待线程的通知 - 其实是判断是否需要通知监听器回调
private synchronized boolean checkNotifyWaiters() {
// 如果等待线程数量大于0则调用Object#notifyAll()唤醒所有等待线程
if (waiters > 0) {
notifyAll();
}
// 如果listeners不为空(也就是存在监听器)的时候才返回true
return listeners != null;
}
// ... 省略其他代码 ...
}
Promise的基本使用
要使用Netty
的Promise
模块,并不需要引入Netty
的所有依赖,这里只需要引入netty-common
:
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
<version>4.1.44.Final</version>
<https://img.qb5200.com/download-x/dependency>
EventExecutor
选取方面,Netty
已经准备了一个GlobalEventExecutor
用于全局事件处理,这里可以直接选用(当然也可以自行实现EventExecutor
或者用EventExecutor
的其他实现类):
EventExecutor executor = GlobalEventExecutor.INSTANCE;
Promise<String> promise = new DefaultPromise<>(executor);
这里设计一个场景:异步下载一个链接的资源到磁盘上,下载完成之后需要异步通知下载完的磁盘文件路径,得到通知之后打印下载结果到控制台中。
public class PromiseMain {
public static void main(String[] args) throws Exception {
String url = "http://xxx.yyy.zzz";
EventExecutor executor = GlobalEventExecutor.INSTANCE;
Promise<DownloadResult> promise = new DefaultPromise<>(executor);
promise.addListener(new DownloadResultListener());
Thread thread = new Thread(() -> {
try {
System.out.println("开始下载资源,url:" + url);
long start = System.currentTimeMillis();
// 模拟下载耗时
Thread.sleep(2000);
String location = "C:\\xxx\\yyy\\z.md";
long cost = System.currentTimeMillis() - start;
System.out.println(String.format("下载资源成功,url:%s,保存到:%s,耗时:%d ms", url, location, cost));
DownloadResult result = new DownloadResult();
result.setUrl(url);
result.setFileDiskLocation(location);
result.setCost(cost);
// 通知结果
promise.setSuccess(result);
} catch (Exception ignore) {
}
}, "Download-Thread");
thread.start();
Thread.sleep(Long.MAX_VALUE);
}
@Data
private static class DownloadResult {
private String url;
private String fileDiskLocation;
private long cost;
}
private static class DownloadResultListener implements GenericFutureListener<Future<DownloadResult>> {
@Override
public void operationComplete(Future<DownloadResult> future) throws Exception {
if (future.isSuccess()) {
DownloadResult downloadResult = future.getNow();
System.out.println(String.format("下载完成通知,url:%s,文件磁盘路径:%s,耗时:%d ms", downloadResult.getUrl(),
downloadResult.getFileDiskLocation(), downloadResult.getCost()));
}
}
}
}
执行后控制台输出:
开始下载资源,url:http://xxx.yyy.zzz
下载资源成功,url:http://xxx.yyy.zzz,保存到:C:\xxx\yyy\z.md,耗时:2000 ms
下载完成通知,url:http://xxx.yyy.zzz,文件磁盘路径:C:\xxx\yyy\z.md,耗时:2000 ms
Promise
适用的场景很多,除了异步通知的场景也能用于同步调用,它在设计上比JUC
的Future
灵活很多,基于Future
扩展出很多新的特性,有需要的可以单独引入此依赖直接使用。
Promise监听器栈深度的问题
有些时候,由于封装或者人为编码异常等原因,监听器的回调可能出现基于多个Promise
形成的链(参考Issue-5302,a promise listener chain
),这样子有可能出现递归调用深度过大而导致栈溢出,因此需要设置一个阈值,限制递归调用的最大栈深度,这个深度阈值暂且称为栈深度保护阈值,默认值是8,可以通过系统参数io.netty.defaultPromise.maxListenerStackDepth
覆盖设置。这里贴出前面提到过的代码块:
private void notifyListeners() {
EventExecutor executor = executor();
// 事件执行器必须是事件循环类型,也就是executor.inEventLoop()为true的时候才启用递归栈深度保护
if (executor.inEventLoop()) {
// 获取当前线程绑定的InternalThreadLocalMap实例,这里类似于ThreadLocal
final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
// 获取当前线程的监听器调用栈深度
final int stackDepth = threadLocals.futureListenerStackDepth();
// 监听器调用栈深度如果不超过阈值MAX_LISTENER_STACK_DEPTH
if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
// 调用notifyListenersNow()前先设置监听器调用栈深度 + 1
threadLocals.setFutureListenerStackDepth(stackDepth + 1);
try {
notifyListenersNow();
} finally {
// 调用notifyListenersNow()完毕后设置监听器调用栈深度为调用前的数值,也就是恢复线程的监听器调用栈深度
threadLocals.setFutureListenerStackDepth(stackDepth);
}
return;
}
}
// 如果监听器调用栈深度超过阈值MAX_LISTENER_STACK_DEPTH,则直接每次通知监听器当成一个新的异步任务处理
safeExecute(executor, new Runnable() {
@Override
public void run() {
notifyListenersNow();
}
});
}
如果我们想模拟一个例子触发监听器调用栈深度保护,那么只需要想办法在同一个EventLoop
类型的线程中递归调用notifyListeners()
方法即可。
最典型的例子就是在上一个Promise
监听器回调的方法里面触发下一个Promise
的监听器的setSuccess()
(简单理解就是套娃),画个图理解一下:
测试代码:
public class PromiseListenerMain {
private static final AtomicInteger COUNTER = new AtomicInteger(0);
public static void main(String[] args) throws Exception {
EventExecutor executor = ImmediateEventExecutor.INSTANCE;
// root
Promise<String> root = new DefaultPromise<>(executor);
Promise<String> p1 = new DefaultPromise<>(executor);
Promise<String> p2 = new DefaultPromise<>(executor);
Promise<String> p3 = new DefaultPromise<>(executor);
Promise<String> p4 = new DefaultPromise<>(executor);
Promise<String> p5 = new DefaultPromise<>(executor);
Promise<String> p6 = new DefaultPromise<>(executor);
Promise<String> p7 = new DefaultPromise<>(executor);
Promise<String> p8 = new DefaultPromise<>(executor);
Promise<String> p9 = new DefaultPromise<>(executor);
Promise<String> p10 = new DefaultPromise<>(executor);
p1.addListener(new Listener(p2));
p2.addListener(new Listener(p3));
p3.addListener(new Listener(p4));
p4.addListener(new Listener(p5));
p5.addListener(new Listener(p6));
p6.addListener(new Listener(p7));
p7.addListener(new Listener(p8));
p8.addListener(new Listener(p9));
p9.addListener(new Listener(p10));
root.addListener(new Listener(p1));
root.setSuccess("success");
Thread.sleep(Long.MAX_VALUE);
}
private static class Listener implements GenericFutureListener<Future<String>> {
private final String name;
private final Promise<String> promise;
public Listener(Promise<String> promise) {
this.name = "listener-" + COUNTER.getAndIncrement();
this.promise = promise;
}
@Override
public void operationComplete(Future<String> future) throws Exception {
System.out.println(String.format("监听器[%s]回调成功...", name));
if (null != promise) {
promise.setSuccess("success");
}
}
}
}
因为有safeExecute()
兜底执行,上面的所有Promise
都会回调,这里可以采用IDEA
的高级断点功能,在步入断点的地方添加额外的日志,输出如下:
MAX_LISTENER_STACK_DEPTH(notifyListenersNow)执行---
监听器[listener-9]回调成功...
MAX_LISTENER_STACK_DEPTH(notifyListenersNow)执行---
监听器[listener-0]回调成功...
MAX_LISTENER_STACK_DEPTH(notifyListenersNow)执行---
监听器[listener-1]回调成功...
MAX_LISTENER_STACK_DEPTH(notifyListenersNow)执行---
监听器[listener-2]回调成功...
MAX_LISTENER_STACK_DEPTH(notifyListenersNow)执行---
监听器[listener-3]回调成功...
MAX_LISTENER_STACK_DEPTH(notifyListenersNow)执行---
监听器[listener-4]回调成功...
MAX_LISTENER_STACK_DEPTH(notifyListenersNow)执行---
监听器[listener-5]回调成功...
MAX_LISTENER_STACK_DEPTH(notifyListenersNow)执行---
监听器[listener-6]回调成功...
safeExecute(notifyListenersNow)执行----------
监听器[listener-7]回调成功...
safeExecute(notifyListenersNow)执行----------
监听器[listener-8]回调成功...
这里笔者有点疑惑,如果调用栈深度大于8,超出的部分会包装为Runnable
实例提交到事件执行器执行,岂不是把递归栈溢出的隐患变成了内存溢出的隐患(因为异步任务也有可能积压,除非拒绝任务提交,那么具体要看EventExecutor
的实现了)?
小结
Netty
提供的Promise
工具的源码和使用方式都分析完了,设计理念和代码都是十分值得借鉴,同时能够开箱即用,可以在日常编码中直接引入,减少重复造轮子的劳动和风险。
个人博客
- Throwable's Blog
(本文完 e-a-20200123 c-3-d)
加载全部内容