Kotlin协程操作
LeeDuo. 人气:0下面以launch方法为例进行分析。
一.协程的创建
launch方法的代码如下:
// CoroutineScope的扩展方法 public fun CoroutineScope.launch( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> Unit ): Job { // 根据当前上下文,计算得到新的上下文 val newContext = newCoroutineContext(context) // 根据启动模式,创建不同的续体 val coroutine = if (start.isLazy) LazyStandaloneCoroutine(newContext, block) else StandaloneCoroutine(newContext, active = true) // 启动协程 coroutine.start(start, coroutine, block) return coroutine }
newCoroutineContext用于计算新的上下文,代码如下:
public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext { // coroutineContext为CoroutineScope中保存的全局变量 // 对上下文进行相加 val combined = coroutineContext + context // 用于debug val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined // 如果上下文中没有调度器,则添加一个默认的调度器 return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null) debug + Dispatchers.Default else debug }
1.start方法
在不指定协程启动模式的情况下,协程将按照DEFAULT模式启动,在上述代码中,会调用StandaloneCoroutine对象的start方法。StandaloneCoroutine的代码如下:
private open class StandaloneCoroutine( parentContext: CoroutineContext, active: Boolean ) : AbstractCoroutine<Unit>(parentContext, active) { override fun handleJobException(exception: Throwable): Boolean { handleCoroutineException(context, exception) return true } }
StandaloneCoroutine类中仅重写了handleJobException方法,用于处理父协程不处理的异常。因此这里调用的start方法实际是父类AbstractCoroutine的方法,AbstractCoroutine类的start方法代码如下:
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) { // 该方法用于完成父协程与子协程的绑定关联,同时确保父协程启动 initParentJob() // 该方法的写法等同于start.invoke(block, receiver, this) // 因此调用的CoroutineStart类的方法 start(block, receiver, this) }
AbstractCoroutine类的start方法内,调用了CoroutineStart类的invoke方法。
2.CoroutineStart类
CoroutineStart是一个枚举类,用于根据不同的启动模式去启动协程,代码如下:
public enum class CoroutineStart { // 四种启动模式 DEFAULT, LAZY, // 具有实验性,慎用 @ExperimentalCoroutinesApi ATOMIC, // 具有实验性,慎用 @ExperimentalCoroutinesApi UNDISPATCHED; // 根据不同的启动策略,启动协程,执行block @InternalCoroutinesApi public operator fun <T> invoke(block: suspend () -> T, completion: Continuation<T>): Unit = when (this) { DEFAULT -> block.startCoroutineCancellable(completion) ATOMIC -> block.startCoroutine(completion) UNDISPATCHED -> block.startCoroutineUndispatched(completion) LAZY -> Unit // 该模式不主动启动,等待用户调用start方法 } // 根据不同的启动策略,启动协程,执行block @InternalCoroutinesApi public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>): Unit = when (this) { DEFAULT -> block.startCoroutineCancellable(receiver, completion) ATOMIC -> block.startCoroutine(receiver, completion) UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion) LAZY -> Unit } // 当前的启动模式是否为懒启动 @InternalCoroutinesApi public val isLazy: Boolean get() = this === LAZY }
CoroutineStart类中有两个invoke方法,其中一个参数中有receiver,另一个没有receiver。在Kotlin协程中,很多方法都重载了带有receiver的方法和不带有receiver的方法。
receiver用于为block执行提供一个环境。Kotlin中提供的启动协程的方法都是通过带receiver参数的start方法实现。通过receiver环境,可以更方便的实现一些操作,比如在launch启动的协程中再次调用launch启动新的协程。在没有receiver的环境下执行block,则更像是在suspend方法中执行,如果需要启动其他的协程,需要自己提供环境。
3.startCoroutineCancellable方法
startCoroutineCancellable是一个扩展方法,用来创建一个可以取消的协程,代码如下:
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) = runSafely(completion) { // createCoroutineUnintercepted:创建协程 // intercepted:拦截调度 // resumeCancellableWith:恢复执行 createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit)) } // 如果创建的过程发生异常,则通知续体恢复后续代码的执行 private inline fun runSafely(completion: Continuation<*>, block: () -> Unit) { try { block() } catch (e: Throwable) { completion.resumeWith(Result.failure(e)) } }
4.createCoroutineUnintercepted方法
createCoroutineUnintercepted方法用于创建一个新的、可挂起的、不受干扰的协程。
public expect fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted( receiver: R, completion: Continuation<T> ): Continuation<Unit>
在Kotlin中有很多被expect关键字标记的接口方法,需要找到对应平台下被actual标记的实现方法。
public actual fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted( receiver: R, completion: Continuation<T> ): Continuation<Unit> { // 用于debug val probeCompletion = probeCoroutineCreated(completion) return if (this is BaseContinuationImpl) create(receiver, probeCompletion) else { createCoroutineFromSuspendFunction(probeCompletion) { (this as Function2<R, Continuation<T>, Any?>).invoke(receiver, it) } } }
createCoroutineUnintercepted方法创建的协程需要手动调用resumeWith方法才可以启动,但重复的调用resumeWith方法可能会导致状态机发生异常。同时,参数中传入的completion可能会在任意的上下文中被调用。
正常情况下,我们编写的lambda表达式——block,在编译器编译时,会自动生成一个类,并继承SuspendLambda类,实现Continuation等接口。因为SuspendLambda继承自ContinuationImpl,ContinuationImpl继承自BaseContinuationImpl,所以才有了上述代码中的判断逻辑。
如果当前的block对象的类型为BaseContinuationImpl,则调用create方法,这里的create方法是编译器生成的类里的重写方法,它的内部就是通过我们传入的参数,创建并返回根据blcok生成的类的一个实例对象。
如果当前的block对象的类型不为BaseContinuationImpl,则需要通过createCoroutineFromSuspendFunction方法创建协程。这里假设lambda表达式的类型不是BaseContinuationImpl。
5.createCoroutineFromSuspendFunction方法
该方法用于在createCoroutineUnintercepted方法中使用,当一个被suspend修饰的lambda表达式没有继承BaseContinuationImpl类时,则通过此方法创建协程。
有两种情况会调用该方法创建协程:第一种情况是lambda表达式中调用了其他的挂起方法;第二种情况是挂起方法是通过Java实现的。
createCoroutineFromSuspendFunction方法的代码如下:
private inline fun <T> createCoroutineFromSuspendFunction( completion: Continuation<T>, crossinline block: (Continuation<T>) -> Any? ): Continuation<Unit> { val context = completion.context // 如果上下文为空 return if (context === EmptyCoroutineContext) // 创建一个受限协程 object : RestrictedContinuationImpl(completion as Continuation<Any?>) { private var label = 0 override fun invokeSuspend(result: Result<Any?>): Any? = when (label) { 0 -> { label = 1 result.getOrThrow() block(this) } 1 -> { label = 2 result.getOrThrow() } else -> error("This coroutine had already completed") } } else // 不为空,则创建一个正常的协程 object : ContinuationImpl(completion as Continuation<Any?>, context) { private var label = 0 override fun invokeSuspend(result: Result<Any?>): Any? = when (label) { 0 -> { label = 1 result.getOrThrow() block(this) } 1 -> { label = 2 result.getOrThrow() } else -> error("This coroutine had already completed") } } }
受限协程是指协程在运行过程中的,只能调用协程作用域中提供的挂起方法发生挂起,其他挂起方法不能调用,因为在挂起方法会对续体进行拦截,可能导致后续代码的执行变得无法预测。
典型的例子就是sequence方法,它创建的协程就是受限协程,只能通过调用yield方法或者yieldAll方法才能发生挂起。由于受限协程中不能进行协程调度,因此其上下文是空的。
这里launch方法的上下文有一个默认调度器,因此会创建一个ContinuationImpl对象。
到这里,协程完成了创建。
二.协程的启动
再次回到startCoroutineCancellable方法,当调用createCoroutineUnintercepted创建好协程后,会调用intercepted方法,代码如下:
public actual fun <T> Continuation<T>.intercepted(): Continuation<T> = (this as? ContinuationImpl)?.intercepted() ?: this
intercepted方法是Continuation接口的扩展方法,内部调用了ContinuationImpl类的intercepted方法。
1.ContinuationImpl类
internal abstract class ContinuationImpl( completion: Continuation<Any?>?, private val _context: CoroutineContext? ) : BaseContinuationImpl(completion) { constructor(completion: Continuation<Any?>?) : this(completion, completion?.context) public override val context: CoroutineContext get() = _context!! @Transient private var intercepted: Continuation<Any?>? = null // 如果没有缓存,则从上下文中获取拦截器,调用interceptContinuation进行拦截, // 将拦截的续体保存到全局变量 public fun intercepted(): Continuation<Any?> = intercepted ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this) .also { intercepted = it } protected override fun releaseIntercepted() { val intercepted = intercepted if (intercepted != null && intercepted !== this) { context[ContinuationInterceptor]!!.releaseInterceptedContinuation(intercepted) } this.intercepted = CompletedContinuation // just in case } }
这里的ContinuationInterceptor指的就是在newCoroutineContext方法中传入的Dispatchers.Default调度器。CoroutineDispatcher类的interceptContinuation方法的代码如下:
public abstract class CoroutineDispatcher : AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor { ... // 将续体包裹成DispatchedContinuation,并传入当前调度器 public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> = DispatchedContinuation(this, continuation) ... }
2.resumeCancellableWith方法
再次回到startCoroutineCancellable方法,当调用intercepted方法进行拦截后,会调用resumeCancellableWith方法,代码如下:
public fun <T> Continuation<T>.resumeCancellableWith(result: Result<T>): Unit = when (this) { is DispatchedContinuation -> resumeCancellableWith(result) else -> resumeWith(result) }
由于当前的Continuation对象的类型为DispatchedContinuation,因此调用DispatchedContinuation类的resumeCancellableWith方法,代码如下:
internal class DispatchedContinuation<in T>( @JvmField val dispatcher: CoroutineDispatcher, @JvmField val continuation: Continuation<T> ) : DispatchedTask<T>(MODE_ATOMIC_DEFAULT), CoroutineStackFrame, Continuation<T> by continuation { ... @Suppress("NOTHING_TO_INLINE") inline fun resumeCancellableWith(result: Result<T>) { val state = result.toState() // 是否进行调度 if (dispatcher.isDispatchNeeded(context)) { _state = state resumeMode = MODE_CANCELLABLE // 进行调度 dispatcher.dispatch(context, this) } else {// Dispatcher.Unconfined调度器会走这里 executeUnconfined(state, MODE_CANCELLABLE) { // 协程未被取消 if (!resumeCancelled()) { // 恢复执行 resumeUndispatchedWith(result) } } } } // 恢复执行前判断协程是否已经取消执行 @Suppress("NOTHING_TO_INLINE") inline fun resumeCancelled(): Boolean { // 获取当前的协程任务 val job = context[Job] // 如果不为空且不活跃 if (job != null && !job.isActive) { // 抛出异常 resumeWithException(job.getCancellationException()) return true } return false } @Suppress("NOTHING_TO_INLINE") inline fun resumeUndispatchedWith(result: Result<T>) { // 该方法在指定的上下文中执行,在执行后同步协程上下文变化 withCoroutineContext(context, countOrElement) { // 调用续体的resumeWith方法 continuation.resumeWith(result) } } ... } // Dispatchers.Unconfined模式下的调度 private inline fun DispatchedContinuation<*>.executeUnconfined( contState: Any?, mode: Int, doYield: Boolean = false, block: () -> Unit ): Boolean { // 从ThreadLocal中获取EventLoop val eventLoop = ThreadLocalEventLoop.eventLoop // doYield表示是否正在让出执行 // 如果正在让出执行,并且执行队列还是空的,说明不需要执行,返回false if (doYield && eventLoop.isUnconfinedQueueEmpty) return false // 如果EventLoop当前还在被Unconfined调度器使用 return if (eventLoop.isUnconfinedLoopActive) { _state = contState resumeMode = mode // 向队列中添加当前的任务 eventLoop.dispatchUnconfined(this) // 返回 true true } else { // 重新运行EventLoop runUnconfinedEventLoop(eventLoop, block = block) // 返回false false } }
runUnconfinedEventLoop方法是一个扩展方法,用于启动EventLoop,代码如下:
internal inline fun DispatchedTask<*>.runUnconfinedEventLoop( eventLoop: EventLoop, block: () -> Unit ) { // 引用计数+1 eventLoop.incrementUseCount(unconfined = true) try { // 先执行当前的任务 block() // 循环分发任务 while (true) { // 全部执行完毕,则退出分发 if (!eventLoop.processUnconfinedEvent()) break } } catch (e: Throwable) { handleFatalException(e, null) } finally { // 引用计数+1 eventLoop.decrementUseCount(unconfined = true) } }
Dispatchers.Default调度器与Dispatchers.Unconfined调度器的调度逻辑基本都相同,最终都是调用Contination对象的resumeWith方法,同时传入Result对象作为参数。
这里的Contination是createCoroutineUnintercepted方法创建的继承ContinuationImpl的匿名内部类对象。Result是resumeCancellableWith方法传入的Result.success(Unit)对象,因为首次启动,所以传入类型为Unit。
调用匿名内部类的resumeWith方法,实际调用的是父类BaseContinuationImpl的resumeWith方法。
3.BaseContinuationImpl类
BaseContinuationImpl类的resumeWith方法的代码如下:
internal abstract class BaseContinuationImpl( public val completion: Continuation<Any?>? ) : Continuation<Any?>, CoroutineStackFrame, Serializable { public final override fun resumeWith(result: Result<Any?>) { var current = this var param = result // 循环 while (true) { // 用于debug probeCoroutineResumed(current) // current环境下 with(current) { // completion用于续体执行完的回调,为空,则抛出异常 // 这里的completion就是一开始创建的StandaloneCoroutine对象 val completion = completion!! // 获取执行后的结果 val outcome: Result<Any?> = try { // 核心执行 val outcome = invokeSuspend(param) // 如果返回值为COROUTINE_SUSPENDED,说明协程挂起,退出循环 if (outcome === COROUTINE_SUSPENDED) return // 返回结果成功 Result.success(outcome) } catch (exception: Throwable) { // 返回结果失败 Result.failure(exception) } // 释放拦截的续体,状态机终止 releaseIntercepted() // 这里没有直接调用resume,而是通过循环代替递归 // 这也是resumeWith方法声明为final的原因 if (completion is BaseContinuationImpl) { // 这种情况一般为多个suspend方法按顺序执行 // 等待下一次循环 current = completion param = outcome } else { // 返回结果 completion.resumeWith(outcome) return } } } } protected abstract fun invokeSuspend(result: Result<Any?>): Any? protected open fun releaseIntercepted() { // does nothing here, overridden in ContinuationImpl } public open fun create(completion: Continuation<*>): Continuation<Unit> { throw UnsupportedOperationException("create(Continuation) has not been overridden") } public open fun create(value: Any?, completion: Continuation<*>): Continuation<Unit> { throw UnsupportedOperationException("create(Any?;Continuation) has not been overridden") } ... }
4.invokeSuspend方法
在上述代码中,resumeWith方法内部调用了invokeSuspend方法,这里的invokeSuspend方法实际就是createCoroutineFromSuspendFunction方法中创建的匿名内部类的invokeSuspend方法。匿名内部类的代码如下:
object : ContinuationImpl(completion as Continuation<Any?>, context) { // 初始状态 private var label = 0 override fun invokeSuspend(result: Result<Any?>): Any? = when (label) { 0 -> { label = 1 // 先去获取一次结果,如果有异常,则直接抛出,避免执行 // 比如在调度器中,如果发现协程已经取消, // 则调用resumeWithException方法,在这里直接被抛出 result.getOrThrow() // 把当前续体传入,执行协程 // 可能发生挂起 block(this) } 1 -> { // 如果协程发生了挂起,那么恢复挂起后会走到这里 label = 2 // 获取最终的执行结果 result.getOrThrow() } else -> error("This coroutine had already completed") } }
三.协程的挂起与恢复
通过上述代码的分析,协程的挂起实际就是在协程返回结果时返回一个COROUTINE_SUSPENDED对象,在收到COROUTINE_SUSPENDED结果后直接返回,等待被再次调用resumeWith恢复。
COROUTINE_SUSPENDED对象定义在枚举类CoroutineSingletons中,代码如下:
internal enum class CoroutineSingletons { COROUTINE_SUSPENDED, UNDECIDED, RESUMED }
该枚举类代表了协程的三个状态,协程在创建后状态为UNDECIDED,如果执行过程中发生挂起,则状态变为COROUTINE_SUSPENDED,最后挂起恢复后状态变为RESUMED。
而协程的恢复实际就是在挂起方法执行完成后,通过调用协程执行时传入的续体的resumeWith方法,恢复后续代码的执行。
加载全部内容