亲宝软件园·资讯

展开

Kotlin协程操作

LeeDuo. 人气:0

下面以launch方法为例进行分析。

一.协程的创建

launch方法的代码如下:

// CoroutineScope的扩展方法
public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    // 根据当前上下文,计算得到新的上下文
    val newContext = newCoroutineContext(context)
    // 根据启动模式,创建不同的续体
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    // 启动协程
    coroutine.start(start, coroutine, block)
    return coroutine
}

newCoroutineContext用于计算新的上下文,代码如下:

public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
    // coroutineContext为CoroutineScope中保存的全局变量
    // 对上下文进行相加
    val combined = coroutineContext + context
    // 用于debug
    val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
    // 如果上下文中没有调度器,则添加一个默认的调度器
    return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)
        debug + Dispatchers.Default else debug
}

1.start方法

在不指定协程启动模式的情况下,协程将按照DEFAULT模式启动,在上述代码中,会调用StandaloneCoroutine对象的start方法。StandaloneCoroutine的代码如下:

private open class StandaloneCoroutine(
    parentContext: CoroutineContext,
    active: Boolean
) : AbstractCoroutine<Unit>(parentContext, active) {
    override fun handleJobException(exception: Throwable): Boolean {
        handleCoroutineException(context, exception)
        return true
    }
}

StandaloneCoroutine类中仅重写了handleJobException方法,用于处理父协程不处理的异常。因此这里调用的start方法实际是父类AbstractCoroutine的方法,AbstractCoroutine类的start方法代码如下:

public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
    // 该方法用于完成父协程与子协程的绑定关联,同时确保父协程启动
    initParentJob()
    // 该方法的写法等同于start.invoke(block, receiver, this)
    // 因此调用的CoroutineStart类的方法
    start(block, receiver, this)
}

AbstractCoroutine类的start方法内,调用了CoroutineStart类的invoke方法。

2.CoroutineStart类

CoroutineStart是一个枚举类,用于根据不同的启动模式去启动协程,代码如下:

public enum class CoroutineStart {
    // 四种启动模式
    DEFAULT,
    LAZY,
    // 具有实验性,慎用
    @ExperimentalCoroutinesApi
    ATOMIC,
    // 具有实验性,慎用
    @ExperimentalCoroutinesApi
    UNDISPATCHED;
    // 根据不同的启动策略,启动协程,执行block
    @InternalCoroutinesApi
    public operator fun <T> invoke(block: suspend () -> T, completion: Continuation<T>): Unit =
        when (this) {
            DEFAULT -> block.startCoroutineCancellable(completion)
            ATOMIC -> block.startCoroutine(completion)
            UNDISPATCHED -> block.startCoroutineUndispatched(completion)
            LAZY -> Unit // 该模式不主动启动,等待用户调用start方法
        }
    // 根据不同的启动策略,启动协程,执行block
    @InternalCoroutinesApi
    public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>): Unit =
        when (this) {
            DEFAULT -> block.startCoroutineCancellable(receiver, completion)
            ATOMIC -> block.startCoroutine(receiver, completion)
            UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
            LAZY -> Unit
        }
    // 当前的启动模式是否为懒启动
    @InternalCoroutinesApi
    public val isLazy: Boolean get() = this === LAZY
}

CoroutineStart类中有两个invoke方法,其中一个参数中有receiver,另一个没有receiver。在Kotlin协程中,很多方法都重载了带有receiver的方法和不带有receiver的方法。

receiver用于为block执行提供一个环境。Kotlin中提供的启动协程的方法都是通过带receiver参数的start方法实现。通过receiver环境,可以更方便的实现一些操作,比如在launch启动的协程中再次调用launch启动新的协程。在没有receiver的环境下执行block,则更像是在suspend方法中执行,如果需要启动其他的协程,需要自己提供环境。

3.startCoroutineCancellable方法

startCoroutineCancellable是一个扩展方法,用来创建一个可以取消的协程,代码如下:

internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
    runSafely(completion) {
        // createCoroutineUnintercepted:创建协程
        // intercepted:拦截调度
        // resumeCancellableWith:恢复执行
        createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit))
    }
// 如果创建的过程发生异常,则通知续体恢复后续代码的执行
private inline fun runSafely(completion: Continuation<*>, block: () -> Unit) {
    try {
        block()
    } catch (e: Throwable) {
        completion.resumeWith(Result.failure(e))
    }
}

4.createCoroutineUnintercepted方法

createCoroutineUnintercepted方法用于创建一个新的、可挂起的、不受干扰的协程。

public expect fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(
    receiver: R,
    completion: Continuation<T>
): Continuation<Unit>

在Kotlin中有很多被expect关键字标记的接口方法,需要找到对应平台下被actual标记的实现方法。

public actual fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(
    receiver: R,
    completion: Continuation<T>
): Continuation<Unit> {
    // 用于debug
    val probeCompletion = probeCoroutineCreated(completion)
    return if (this is BaseContinuationImpl)
        create(receiver, probeCompletion)
    else {
        createCoroutineFromSuspendFunction(probeCompletion) {
            (this as Function2<R, Continuation<T>, Any?>).invoke(receiver, it)
        }
    }
}

createCoroutineUnintercepted方法创建的协程需要手动调用resumeWith方法才可以启动,但重复的调用resumeWith方法可能会导致状态机发生异常。同时,参数中传入的completion可能会在任意的上下文中被调用。

正常情况下,我们编写的lambda表达式——block,在编译器编译时,会自动生成一个类,并继承SuspendLambda类,实现Continuation等接口。因为SuspendLambda继承自ContinuationImpl,ContinuationImpl继承自BaseContinuationImpl,所以才有了上述代码中的判断逻辑。

如果当前的block对象的类型为BaseContinuationImpl,则调用create方法,这里的create方法是编译器生成的类里的重写方法,它的内部就是通过我们传入的参数,创建并返回根据blcok生成的类的一个实例对象。

如果当前的block对象的类型不为BaseContinuationImpl,则需要通过createCoroutineFromSuspendFunction方法创建协程。这里假设lambda表达式的类型不是BaseContinuationImpl。

5.createCoroutineFromSuspendFunction方法

该方法用于在createCoroutineUnintercepted方法中使用,当一个被suspend修饰的lambda表达式没有继承BaseContinuationImpl类时,则通过此方法创建协程。

有两种情况会调用该方法创建协程:第一种情况是lambda表达式中调用了其他的挂起方法;第二种情况是挂起方法是通过Java实现的。

createCoroutineFromSuspendFunction方法的代码如下:

private inline fun <T> createCoroutineFromSuspendFunction(
    completion: Continuation<T>,
    crossinline block: (Continuation<T>) -> Any?
): Continuation<Unit> {
    val context = completion.context
    // 如果上下文为空
    return if (context === EmptyCoroutineContext)
        // 创建一个受限协程
        object : RestrictedContinuationImpl(completion as Continuation<Any?>) {
            private var label = 0

            override fun invokeSuspend(result: Result<Any?>): Any? =
                when (label) {
                    0 -> {
                        label = 1
                        result.getOrThrow()
                        block(this)
                    }
                    1 -> {
                        label = 2
                        result.getOrThrow()
                    }
                    else -> error("This coroutine had already completed")
                }
        }
    else // 不为空,则创建一个正常的协程
        object : ContinuationImpl(completion as Continuation<Any?>, context) {
            private var label = 0

            override fun invokeSuspend(result: Result<Any?>): Any? =
                when (label) {
                    0 -> {
                        label = 1
                        result.getOrThrow()
                        block(this)
                    }
                    1 -> {
                        label = 2
                        result.getOrThrow()
                    }
                    else -> error("This coroutine had already completed")
                }
        }
}

受限协程是指协程在运行过程中的,只能调用协程作用域中提供的挂起方法发生挂起,其他挂起方法不能调用,因为在挂起方法会对续体进行拦截,可能导致后续代码的执行变得无法预测。

典型的例子就是sequence方法,它创建的协程就是受限协程,只能通过调用yield方法或者yieldAll方法才能发生挂起。由于受限协程中不能进行协程调度,因此其上下文是空的。

这里launch方法的上下文有一个默认调度器,因此会创建一个ContinuationImpl对象。

到这里,协程完成了创建。

二.协程的启动

再次回到startCoroutineCancellable方法,当调用createCoroutineUnintercepted创建好协程后,会调用intercepted方法,代码如下:

public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
    (this as? ContinuationImpl)?.intercepted() ?: this

intercepted方法是Continuation接口的扩展方法,内部调用了ContinuationImpl类的intercepted方法。

1.ContinuationImpl类

internal abstract class ContinuationImpl(
    completion: Continuation<Any?>?,
    private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) {
    constructor(completion: Continuation<Any?>?) : this(completion, completion?.context)
    public override val context: CoroutineContext
        get() = _context!!
    @Transient
    private var intercepted: Continuation<Any?>? = null
    // 如果没有缓存,则从上下文中获取拦截器,调用interceptContinuation进行拦截,
    // 将拦截的续体保存到全局变量
    public fun intercepted(): Continuation<Any?> =
        intercepted
            ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
                .also { intercepted = it }
    protected override fun releaseIntercepted() {
        val intercepted = intercepted
        if (intercepted != null && intercepted !== this) {
            context[ContinuationInterceptor]!!.releaseInterceptedContinuation(intercepted)
        }
        this.intercepted = CompletedContinuation // just in case
    }
}

这里的ContinuationInterceptor指的就是在newCoroutineContext方法中传入的Dispatchers.Default调度器。CoroutineDispatcher类的interceptContinuation方法的代码如下:

public abstract class CoroutineDispatcher :
    AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
     ...
    // 将续体包裹成DispatchedContinuation,并传入当前调度器 
    public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
        DispatchedContinuation(this, continuation)
    ...
}

2.resumeCancellableWith方法

再次回到startCoroutineCancellable方法,当调用intercepted方法进行拦截后,会调用resumeCancellableWith方法,代码如下:

public fun <T> Continuation<T>.resumeCancellableWith(result: Result<T>): Unit = when (this) {
    is DispatchedContinuation -> resumeCancellableWith(result)
    else -> resumeWith(result)
}

由于当前的Continuation对象的类型为DispatchedContinuation,因此调用DispatchedContinuation类的resumeCancellableWith方法,代码如下:

internal class DispatchedContinuation<in T>(
    @JvmField val dispatcher: CoroutineDispatcher,
    @JvmField val continuation: Continuation<T>
) : DispatchedTask<T>(MODE_ATOMIC_DEFAULT), CoroutineStackFrame, Continuation<T> by continuation {
    ...
    @Suppress("NOTHING_TO_INLINE")
    inline fun resumeCancellableWith(result: Result<T>) {
        val state = result.toState()
        // 是否进行调度
        if (dispatcher.isDispatchNeeded(context)) {
            _state = state
            resumeMode = MODE_CANCELLABLE
            // 进行调度
            dispatcher.dispatch(context, this)
        } else {// Dispatcher.Unconfined调度器会走这里
            executeUnconfined(state, MODE_CANCELLABLE) {
                // 协程未被取消
                if (!resumeCancelled()) {
                    // 恢复执行
                    resumeUndispatchedWith(result)
                }
            }
        }
    }
    // 恢复执行前判断协程是否已经取消执行
    @Suppress("NOTHING_TO_INLINE")
    inline fun resumeCancelled(): Boolean {
        // 获取当前的协程任务
        val job = context[Job]
        // 如果不为空且不活跃
        if (job != null && !job.isActive) {
            // 抛出异常
            resumeWithException(job.getCancellationException())
            return true
        }
        return false
    }
    @Suppress("NOTHING_TO_INLINE")
    inline fun resumeUndispatchedWith(result: Result<T>) {
        // 该方法在指定的上下文中执行,在执行后同步协程上下文变化
        withCoroutineContext(context, countOrElement) {
            // 调用续体的resumeWith方法
            continuation.resumeWith(result)
        }
    }
    ...
}
// Dispatchers.Unconfined模式下的调度
private inline fun DispatchedContinuation<*>.executeUnconfined(
    contState: Any?, mode: Int, doYield: Boolean = false,
    block: () -> Unit
): Boolean {
    // 从ThreadLocal中获取EventLoop
    val eventLoop = ThreadLocalEventLoop.eventLoop
    // doYield表示是否正在让出执行
    // 如果正在让出执行,并且执行队列还是空的,说明不需要执行,返回false
    if (doYield && eventLoop.isUnconfinedQueueEmpty) return false
    // 如果EventLoop当前还在被Unconfined调度器使用
    return if (eventLoop.isUnconfinedLoopActive) {
        _state = contState
        resumeMode = mode
        // 向队列中添加当前的任务
        eventLoop.dispatchUnconfined(this)
        // 返回 true
        true
    } else {
        // 重新运行EventLoop
        runUnconfinedEventLoop(eventLoop, block = block)
        // 返回false
        false
    }
}

runUnconfinedEventLoop方法是一个扩展方法,用于启动EventLoop,代码如下:

internal inline fun DispatchedTask<*>.runUnconfinedEventLoop(
    eventLoop: EventLoop,
    block: () -> Unit
) {
    // 引用计数+1
    eventLoop.incrementUseCount(unconfined = true)
    try {
        // 先执行当前的任务
        block()
        // 循环分发任务
        while (true) {
            // 全部执行完毕,则退出分发
            if (!eventLoop.processUnconfinedEvent()) break
        }
    } catch (e: Throwable) {
        handleFatalException(e, null)
    } finally {
        // 引用计数+1
        eventLoop.decrementUseCount(unconfined = true)
    }
}

Dispatchers.Default调度器与Dispatchers.Unconfined调度器的调度逻辑基本都相同,最终都是调用Contination对象的resumeWith方法,同时传入Result对象作为参数。

这里的Contination是createCoroutineUnintercepted方法创建的继承ContinuationImpl的匿名内部类对象。Result是resumeCancellableWith方法传入的Result.success(Unit)对象,因为首次启动,所以传入类型为Unit。

调用匿名内部类的resumeWith方法,实际调用的是父类BaseContinuationImpl的resumeWith方法。

3.BaseContinuationImpl类

BaseContinuationImpl类的resumeWith方法的代码如下:

internal abstract class BaseContinuationImpl(
    public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
    public final override fun resumeWith(result: Result<Any?>) {
        var current = this
        var param = result
        // 循环
        while (true) {
            // 用于debug
            probeCoroutineResumed(current)
            // current环境下
            with(current) {
                // completion用于续体执行完的回调,为空,则抛出异常
                // 这里的completion就是一开始创建的StandaloneCoroutine对象
                val completion = completion!! 
                // 获取执行后的结果
                val outcome: Result<Any?> =
                    try {
                        // 核心执行
                        val outcome = invokeSuspend(param)
                        // 如果返回值为COROUTINE_SUSPENDED,说明协程挂起,退出循环
                        if (outcome === COROUTINE_SUSPENDED) return
                        // 返回结果成功
                        Result.success(outcome)
                    } catch (exception: Throwable) {
                        // 返回结果失败
                        Result.failure(exception)
                    }
                // 释放拦截的续体,状态机终止
                releaseIntercepted() 
                // 这里没有直接调用resume,而是通过循环代替递归
                // 这也是resumeWith方法声明为final的原因
                if (completion is BaseContinuationImpl) {
                    // 这种情况一般为多个suspend方法按顺序执行
                    // 等待下一次循环
                    current = completion
                    param = outcome
                } else {
                    // 返回结果
                    completion.resumeWith(outcome)
                    return
                }
            }
        }
    }
    protected abstract fun invokeSuspend(result: Result<Any?>): Any?
    protected open fun releaseIntercepted() {
        // does nothing here, overridden in ContinuationImpl
    }
    public open fun create(completion: Continuation<*>): Continuation<Unit> {
        throw UnsupportedOperationException("create(Continuation) has not been overridden")
    }
    public open fun create(value: Any?, completion: Continuation<*>): Continuation<Unit> {
        throw UnsupportedOperationException("create(Any?;Continuation) has not been overridden")
    }
     ...
}

4.invokeSuspend方法

在上述代码中,resumeWith方法内部调用了invokeSuspend方法,这里的invokeSuspend方法实际就是createCoroutineFromSuspendFunction方法中创建的匿名内部类的invokeSuspend方法。匿名内部类的代码如下:

object : ContinuationImpl(completion as Continuation<Any?>, context) {
    // 初始状态
    private var label = 0
    override fun invokeSuspend(result: Result<Any?>): Any? =
            when (label) {
                0 -> {
                    label = 1
                    // 先去获取一次结果,如果有异常,则直接抛出,避免执行
                    // 比如在调度器中,如果发现协程已经取消,
                    // 则调用resumeWithException方法,在这里直接被抛出
                    result.getOrThrow()
                    // 把当前续体传入,执行协程
                    // 可能发生挂起
                    block(this)
                }
                1 -> {
                    // 如果协程发生了挂起,那么恢复挂起后会走到这里
                    label = 2
                    // 获取最终的执行结果
                    result.getOrThrow()
                }
                else -> error("This coroutine had already completed")
            }
}

三.协程的挂起与恢复

通过上述代码的分析,协程的挂起实际就是在协程返回结果时返回一个COROUTINE_SUSPENDED对象,在收到COROUTINE_SUSPENDED结果后直接返回,等待被再次调用resumeWith恢复。

COROUTINE_SUSPENDED对象定义在枚举类CoroutineSingletons中,代码如下:

internal enum class CoroutineSingletons { COROUTINE_SUSPENDED, UNDECIDED, RESUMED }

该枚举类代表了协程的三个状态,协程在创建后状态为UNDECIDED,如果执行过程中发生挂起,则状态变为COROUTINE_SUSPENDED,最后挂起恢复后状态变为RESUMED。

而协程的恢复实际就是在挂起方法执行完成后,通过调用协程执行时传入的续体的resumeWith方法,恢复后续代码的执行。

加载全部内容

相关教程
猜你喜欢
用户评论