

Kotlin Job启动流程源码层深入分析

private fun testParentChildJob() {
    val coroutineContext = Job() + CoroutineName("name1") + Dispatchers.IO + CoroutineExceptionHandler{ c,e -> println(e.message) }
    val myScope = CoroutineScope(coroutineContext)
    val job = myScope.launch {
        println("myScope.launch :")

首先创建一个有四种元素的上下文域myScope,由Job() + CoroutineName("name1") + Dispatchers.IO + CoroutineExceptionHandler{ c,e -> println(e.message) }组成,上一章coroutineContext篇已经讲过plus操作的过程了,不赘述。

接着用这个作用域myScope开启一个协程,协程内打印println("myScope.launch :")



public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine


首先使用入参context: CoroutineContext = EmptyCoroutineContext,创建一个新的上下文集合newCoroutineContext(context),newCoroutineContext函数操作:就是根据所在的scope域的上下文集合和入参进行组合操作,得到一个新的上下文集合,代码如下:

public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
    val combined = coroutineContext + context
    val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
    return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)
        debug + Dispatchers.Default else debug


由于我们使用的默认方式launch的,使用上面创建的newContext元素集合,就会创建一个StandaloneCoroutine(newContext, active = true)协程对象。这个对象继承关系比较复杂,继承关系如下:


private open class StandaloneCoroutine(
    parentContext: CoroutineContext,
    active: Boolean
) : AbstractCoroutine<Unit>(parentContext, active) {
public abstract class AbstractCoroutine<in T>(
     * The context of the parent coroutine.
    protected val parentContext: CoroutineContext,
    active: Boolean = true
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {
     * The context of this coroutine that includes this coroutine as a [Job].
    public final override val context: CoroutineContext = parentContext + this
     * The context of this scope which is the same as the [context] of this coroutine.
    public override val coroutineContext: CoroutineContext get() = context
    override val isActive: Boolean get() = super.isActive

context成员变量是外部传进来的newContext上下文集合 + this得到的,那么newContext的Job元素会被this替换掉;

coroutineContext成员变量是CoroutineScope接口的成员,覆写为context对象; isActive标志这个Job是否是存活状态; 调用刚刚创建的coroutine协程的start方法,coroutine.start(start, coroutine, block),跟进去看看

    public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
        start(block, receiver, this)


start(block, receiver, this)是正真启动协程的地方,CoroutineStart的值是DEFAULT

public operator fun <T> invoke(block: suspend () -> T, completion: Continuation<T>): Unit =
    when (this) {
        DEFAULT -> block.startCoroutineCancellable(completion)
        ATOMIC -> block.startCoroutine(completion)
        UNDISPATCHED -> block.startCoroutineUndispatched(completion)
        LAZY -> Unit // will start lazily

那么调用的就是DEFAULT -> block.startCoroutineCancellable(completion)这个分支,

public fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>): Unit = runSafely(completion) {
public actual fun <T> (suspend () -> T).createCoroutineUnintercepted(
    completion: Continuation<T>
): Continuation<Unit> {
        createCoroutineFromSuspendFunction(probeCompletion) {
            (this as Function1<Continuation<T>, Any?>).invoke(it)
private inline fun <T> createCoroutineFromSuspendFunction(
    completion: Continuation<T>,
    crossinline block: (Continuation<T>) -> Any?
): Continuation<Unit> {
    val context = completion.context
    // label == 0 when coroutine is not started yet (initially) or label == 1 when it was
    return if (context === EmptyCoroutineContext)
        object : ContinuationImpl(completion as Continuation<Any?>, context) {
            private var label = 0
            override fun invokeSuspend(result: Result<Any?>): Any? =
                when (label) {
                    0 -> {
                        label = 1
                        result.getOrThrow() // Rethrow exception if trying to start with exception (will be caught by BaseContinuationImpl.resumeWith
                        block(this) // run the block, may return or suspend

第一步:createCoroutineUnintercepted(completion)就是以completion作为参数创建一个ContinuationImpl对象,这个completion就是上面创建的StandaloneCoroutine对象。这个新的ContinuationImpl对象是继承自Continuation,那么他就有fun resumeWith(result: Result<T>)方法,该方法是用于恢复挂起点,val context: CoroutineContext参数,这个参数就是Continuation的所关联的上下文集合。

我们再自己看看这个createCoroutineFromSuspendFunction这个方法,发现将我们launch{}的lambda参数进行包装后(this as Function1<Continuation<T>, Any?>).invoke(it)然后作为入参block,这个block作为ContinuationImpl对象覆写的invokeSuspend函数的回调函数。那么可以从这个看出一个关系:

ContinuationImpl.invokeSuspend  -> launch入参的lambda函数体

第二步:就是调用ContinuationImpl .intercepted(),内部处理是获取ContinuationImpl的上下文集合中的ContinuationInterceptor元素,然后将ContinuationImpl作为参数,包装成DispatchedContinuation(this, continuation),其中this代表ContinuationInterceptor也就是dispatcher,continuation代表刚刚传递进来的ContinuationImpl。


public fun <T> Continuation<T>.resumeCancellableWith(result: Result<T>): Unit = when (this) {
    is DispatchedContinuation -> resumeCancellableWith(result)
    else -> resumeWith(result)
//DispatchedContinuation extends DIspatchedTask
inline fun resumeCancellableWith(result: Result<T>) {
    val state = result.toState()
    if (dispatcher.isDispatchNeeded(context)) {
        _state = state
        resumeMode = MODE_CANCELLABLE
        dispatcher.dispatch(context, this)
    } else {
public final override fun run() {
        val taskContext = this.taskContext
        var fatalException: Throwable? = null
        try {
            val delegate = delegate as DispatchedContinuation<T>
            val continuation = delegate.continuation
            withCoroutineContext(context, delegate.countOrElement) {
                if (exception == null && job != null && !job.isActive) {
                } else {
                    if (exception != null) continuation.resumeWithException(exception)
                    else continuation.resume(getSuccessfulResult(state))
        } catch (e: Throwable) {

由于DispatchedContinuation是继承自DIspatchedTask的,所以DispatchedContinuation的run方法是DIspatchedTask已经实现的了,所以dispatcher.dispatch(context, this),dispatcher调用的是DIspatchedTask.run方法,(dispatcher是一个线程池和java线程池类似,但是有一点区别,后面章节再讲),run方法中,首先获取delegate,然后取出continuation变量,这个delegate其实是被DispatchedContinuation覆写的,而且实现的Continuation接口被构造函数的continuation代理,这个入参continuation其实就是ContinuationImpl,上一步分析过了。

internal class DispatchedContinuation<in T>(
    @JvmField val dispatcher: CoroutineDispatcher,
    @JvmField val continuation: Continuation<T>
) : DispatchedTask<T>(MODE_ATOMIC_DEFAULT), CoroutineStackFrame, Continuation<T> by continuation {
    override val delegate: Continuation<T>
        get() = this

public inline fun <T> Continuation<T>.resume(value: T): Unit =


public final override fun resumeWith(result: Result<Any?>) {
        // This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
        var param = result
        while (true) {
            with(current) {
           		try {
                    val outcome = invokeSuspend(param)
                    if (outcome === COROUTINE_SUSPENDED) return
                } catch (exception: Throwable) {
                releaseIntercepted() // this state machine instance is terminating
                if (completion is BaseContinuationImpl) {
                } else {
                    // top-level completion reached -- invoke and return

第一步,调用val outcome = invokeSuspend(param),上面已经分析了,invokeSuspend被ContinuationImpl覆写了,内部回调了launch的lambda表达式;




DispatchedContinuation用于分发continuation到指定的线程池中; ContinuationImpl用于包装launch的lambda代码块作为业务代码代理类; StandAloneCoroutine协程管理类管理Job生命周期以及协程的状态父子Job关系维护等等。




initParentJob方法里面,先调用parent.start方法,确保parent的Job已经启动了,接着调用parent.attachChild(this)方法,用于关联父子Job。 代码如下:

internal fun initParentJob() {
internal fun initParentJobInternal(parent: Job?) {
	parent.start() // make sure the parent is started
	val handle = parent.attachChild(this)
	parentHandle = handle
	if (isCompleted) {


public final override fun attachChild(child: ChildJob): ChildHandle {
     * Note: This function attaches a special ChildHandleNode node object. This node object
     * is handled in a special way on completion on the coroutine (we wait for all of them) and
     * is handled specially by invokeOnCompletion itself -- it adds this node to the list even
     * if the job is already cancelling. For cancelling state child is attached under state lock.
     * It's required to properly wait all children before completion and provide linearizable hierarchy view:
     * If child is attached when the job is already being cancelled, such child will receive immediate notification on
     * cancellation, but parent *will* wait for that child before completion and will handle its exception.
    return invokeOnCompletion(onCancelling = true, handler = ChildHandleNode(this, child).asHandler) as ChildHandle
internal class ChildHandleNode(
    parent: JobSupport,
    @JvmField val childJob: ChildJob
) : JobCancellingNode<JobSupport>(parent), ChildHandle {
    override fun invoke(cause: Throwable?) = childJob.parentCancelled(job)
    override fun childCancelled(cause: Throwable): Boolean = job.childCancelled(cause)
    override fun toString(): String = "ChildHandle[$childJob]"

首先创建了一个handler = ChildHandleNode(this, child).asHandler对象,这个对象ChildHandleNode作为参数传递给invokeOnCompletion,然后返回一个ChildHandle类型的对象,赋值给子Job的parentHandle val handle = parent.attachChild(this); parentHandle = handle,parentHandle 这个是子Job持有的变量,ChildHandle接口拥有childCancelled方法,用于子Job通知父Job,子Job已经取消了,父Job需要根据子Job状态继续进行处理。

public final override fun invokeOnCompletion(
        onCancelling: Boolean,
        invokeImmediately: Boolean,
        handler: CompletionHandler
    ): DisposableHandle {
        var nodeCache: JobNode<*>? = null
        loopOnState { state ->
            when (state) {
                is Empty -> { // EMPTY_X state -- no completion handlers
                    if (state.isActive) {
                        // try move to SINGLE state
                        val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it }
                        if (_state.compareAndSet(state, node)) return node
                is Incomplete -> {
                    val list = state.list
                    if (list == null) { // SINGLE/SINGLE+
                        promoteSingleToNodeList(state as JobNode<*>)
                    } else {
                        var rootCause: Throwable? = null
                        var handle: DisposableHandle = NonDisposableHandle
                      	val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it }
                        if (!addLastAtomic(state, list, node)) return@loopOnState // retry
                        if (rootCause == null) return node
                        if (rootCause != null) {
                        } else {
                            val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it }
                            if (addLastAtomic(state, list, node)) return node
                else -> { // is complete

invokeOnCompletion方法就是,将传递进来的handler: CompletionHandler,分情况存储起来,

当state状态是Empty状态,创建一个代理节点node ,之后存入到state中; 当state是Incomplete状态,如果state.list结构是空的,那么创建一个链表,将node 节点作为第一个节点存进去,当前state.list不为空,那么将node节点插入到链表的末尾。 这样经过上面这两步: 子Job持有的parentHandle对象可以通知父Job自己已经取消了:

override fun childCancelled(cause: Throwable): Boolean = job.childCancelled(cause)


override fun invoke(cause: Throwable?) = childJob.parentCancelled(job)

会发现, 调用launch生成一个Job,这个Job就会initParentJob() ,进而子Job会持有父Job,父Job也会将子Job加入到state的数据结构中,进而形成了树的结构,类似于下图:




DispatchedContinuation用于分发continuation到指定的线程池中; ContinuationImpl用于包装launch的lambda代码块作为业务代码代理类; StandAloneCoroutine协程管理类管理Job生命周期以及协程的状态父子Job关系维护等等。 调用链:DispatchedContinuation -> ContinuationImpl(在这里调用launch的lambda业务代码块) -> StandAloneCoroutine

launch启动一个协程Job,这个Job所在域如果存在parentJob ,那么parentJob和Job会形成树结构上的父子节点,并且子Job继承了父Job的CoroutineScope的上下文集合(根据参数会覆盖一些重复Key的元素)。

