Kotlin Job启动流程源码层深入分析
Super-B 人气:0Job启动流程
job启动流程,我们先从一段最简单使用协程的代码开始,进行代码跟跟踪,顺便引出几个关键的概念,在后面章节里面去单独分析。代码如下:
private fun testParentChildJob() { val coroutineContext = Job() + CoroutineName("name1") + Dispatchers.IO + CoroutineExceptionHandler{ c,e -> println(e.message) } val myScope = CoroutineScope(coroutineContext) val job = myScope.launch { println("myScope.launch :") } }
首先创建一个有四种元素的上下文域myScope,由Job() + CoroutineName("name1") + Dispatchers.IO + CoroutineExceptionHandler{ c,e -> println(e.message) }
组成,上一章coroutineContext篇已经讲过plus操作的过程了,不赘述。
接着用这个作用域myScope开启一个协程,协程内打印println("myScope.launch :")
。
我自己从launch函数一步一步跟踪后,得到了如下图所示的流程:
launch流程分析
public fun CoroutineScope.launch( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> Unit ): Job { val newContext = newCoroutineContext(context) val coroutine = if (start.isLazy) LazyStandaloneCoroutine(newContext, block) else StandaloneCoroutine(newContext, active = true) coroutine.start(start, coroutine, block) return coroutine }
分为三步:
首先使用入参context: CoroutineContext = EmptyCoroutineContext,
创建一个新的上下文集合newCoroutineContext(context)
,newCoroutineContext函数操作:就是根据所在的scope域的上下文集合和入参进行组合操作,得到一个新的上下文集合,代码如下:
public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext { val combined = coroutineContext + context val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null) debug + Dispatchers.Default else debug }
可以看到各种+
操作,就是coroutineContext的各种plus操作,可以得到一个继承自所在scope域的上下文集合(这个域由coroutineContext变量决定,这个变量属于CoroutineScope成员),并且包含了入参的context元素,这样上下文集合就具有继承性,并且自己还可以对已有元素进行覆盖。上一篇coroutineContext篇已经讲过,就不赘述了。
由于我们使用的默认方式launch的,使用上面创建的newContext元素集合,就会创建一个StandaloneCoroutine(newContext, active = true)
协程对象。这个对象继承关系比较复杂,继承关系如下:
这个类里面包含了很多成员变量,源码如下:
private open class StandaloneCoroutine( parentContext: CoroutineContext, active: Boolean ) : AbstractCoroutine<Unit>(parentContext, active) { //省略。。。 } public abstract class AbstractCoroutine<in T>( /** * The context of the parent coroutine. */ @JvmField protected val parentContext: CoroutineContext, active: Boolean = true ) : JobSupport(active), Job, Continuation<T>, CoroutineScope { /** * The context of this coroutine that includes this coroutine as a [Job]. */ @Suppress("LeakingThis") public final override val context: CoroutineContext = parentContext + this /** * The context of this scope which is the same as the [context] of this coroutine. */ public override val coroutineContext: CoroutineContext get() = context override val isActive: Boolean get() = super.isActive }
context成员变量是外部传进来的newContext上下文集合 + this
得到的,那么newContext的Job元素会被this替换掉;
coroutineContext成员变量是CoroutineScope接口的成员,覆写为context对象; isActive标志这个Job是否是存活状态; 调用刚刚创建的coroutine协程的start方法,coroutine.start(start, coroutine, block)
,跟进去看看
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) { initParentJob() start(block, receiver, this) }
initParentJob()
方法主要是用于关联父子Job的,这里先不讲,对启动流程没啥影响。
start(block, receiver, this)
是正真启动协程的地方,CoroutineStart
的值是DEFAULT
public operator fun <T> invoke(block: suspend () -> T, completion: Continuation<T>): Unit = when (this) { DEFAULT -> block.startCoroutineCancellable(completion) ATOMIC -> block.startCoroutine(completion) UNDISPATCHED -> block.startCoroutineUndispatched(completion) LAZY -> Unit // will start lazily }
那么调用的就是DEFAULT -> block.startCoroutineCancellable(completion)
这个分支,
public fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>): Unit = runSafely(completion) { createCoroutineUnintercepted(completion).intercepted().resumeCancellableWith(Result.success(Unit)) } public actual fun <T> (suspend () -> T).createCoroutineUnintercepted( completion: Continuation<T> ): Continuation<Unit> { //省略。。。 createCoroutineFromSuspendFunction(probeCompletion) { (this as Function1<Continuation<T>, Any?>).invoke(it) } } private inline fun <T> createCoroutineFromSuspendFunction( completion: Continuation<T>, crossinline block: (Continuation<T>) -> Any? ): Continuation<Unit> { val context = completion.context // label == 0 when coroutine is not started yet (initially) or label == 1 when it was return if (context === EmptyCoroutineContext) //省略。。。 else object : ContinuationImpl(completion as Continuation<Any?>, context) { private var label = 0 override fun invokeSuspend(result: Result<Any?>): Any? = when (label) { 0 -> { label = 1 result.getOrThrow() // Rethrow exception if trying to start with exception (will be caught by BaseContinuationImpl.resumeWith block(this) // run the block, may return or suspend } //省略。。。 } } }
第一步:createCoroutineUnintercepted(completion)
就是以completion作为参数创建一个ContinuationImpl对象,这个completion就是上面创建的StandaloneCoroutine对象。这个新的ContinuationImpl对象是继承自Continuation,那么他就有fun resumeWith(result: Result<T>)
方法,该方法是用于恢复挂起点,val context: CoroutineContext
参数,这个参数就是Continuation的所关联的上下文集合。
我们再自己看看这个createCoroutineFromSuspendFunction
这个方法,发现将我们launch{}的lambda参数进行包装后(this as Function1<Continuation<T>, Any?>).invoke(it)
然后作为入参block,这个block作为ContinuationImpl对象覆写的invokeSuspend函数的回调函数。那么可以从这个看出一个关系:
ContinuationImpl.invokeSuspend -> launch入参的lambda函数体
第二步:就是调用ContinuationImpl .intercepted()
,内部处理是获取ContinuationImpl的上下文集合中的ContinuationInterceptor元素,然后将ContinuationImpl作为参数,包装成DispatchedContinuation(this, continuation)
,其中this代表ContinuationInterceptor也就是dispatcher,continuation代表刚刚传递进来的ContinuationImpl。
第三步:resumeCancellableWith(Result.success(Unit))
,调用DispatchedContinuation的resumeCancellableWith函数,代码如下:
public fun <T> Continuation<T>.resumeCancellableWith(result: Result<T>): Unit = when (this) { is DispatchedContinuation -> resumeCancellableWith(result) else -> resumeWith(result) } //DispatchedContinuation extends DIspatchedTask inline fun resumeCancellableWith(result: Result<T>) { val state = result.toState() if (dispatcher.isDispatchNeeded(context)) { _state = state resumeMode = MODE_CANCELLABLE dispatcher.dispatch(context, this) } else { 省略。。。 } } //DIspatchedTask public final override fun run() { val taskContext = this.taskContext var fatalException: Throwable? = null try { val delegate = delegate as DispatchedContinuation<T> val continuation = delegate.continuation withCoroutineContext(context, delegate.countOrElement) { //省略。。。 if (exception == null && job != null && !job.isActive) { //省略。。。 } else { if (exception != null) continuation.resumeWithException(exception) else continuation.resume(getSuccessfulResult(state)) } } } catch (e: Throwable) { //省略。。。 } }
由于DispatchedContinuation是继承自DIspatchedTask的,所以DispatchedContinuation的run方法是DIspatchedTask已经实现的了,所以dispatcher.dispatch(context, this)
,dispatcher调用的是DIspatchedTask.run方法,(dispatcher是一个线程池和java线程池类似,但是有一点区别,后面章节再讲),run方法中,首先获取delegate,然后取出continuation变量,这个delegate其实是被DispatchedContinuation覆写的,而且实现的Continuation接口被构造函数的continuation代理,这个入参continuation其实就是ContinuationImpl,上一步分析过了。
internal class DispatchedContinuation<in T>( @JvmField val dispatcher: CoroutineDispatcher, @JvmField val continuation: Continuation<T> ) : DispatchedTask<T>(MODE_ATOMIC_DEFAULT), CoroutineStackFrame, Continuation<T> by continuation { //省略。。。 override val delegate: Continuation<T> get() = this //省略。。。 } //Continuation public inline fun <T> Continuation<T>.resume(value: T): Unit = resumeWith(Result.success(value))
那么其实就是调用的ContinuationImpl.resumeWith(Result.success(value))
方法,ContinuationImpl继承自BaseContinuationImpl,继续进去看看
public final override fun resumeWith(result: Result<Any?>) { // This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume var param = result while (true) { //省略。。。 with(current) { try { val outcome = invokeSuspend(param) if (outcome === COROUTINE_SUSPENDED) return Result.success(outcome) } catch (exception: Throwable) { Result.failure(exception) } //省略。。。 releaseIntercepted() // this state machine instance is terminating if (completion is BaseContinuationImpl) { //省略。。。 } else { // top-level completion reached -- invoke and return completion.resumeWith(outcome) return } } } }
第一步,调用val outcome = invokeSuspend(param)
,上面已经分析了,invokeSuspend被ContinuationImpl覆写了,内部回调了launch的lambda表达式;
第二步,调用completion.resumeWith(outcome)
,这个completion上面分析了,是StandAloneCoroutine协程,调用了StandAloneCoroutine对象的resumeWith方法,这个方法里面用于更新协程状态,比如协程成功,失败之类的。
综上,通过上面的invokeSuspend函数调用,最终调用到了launch的lambda表达式,也就是我们业务代码,我们的业务代码是被封装到了ContinuationImpl类中。
通过上面的分析,一共发现了三种不同类型的continuation,它们分别是:
DispatchedContinuation用于分发continuation到指定的线程池中; ContinuationImpl用于包装launch的lambda代码块作为业务代码代理类; StandAloneCoroutine协程管理类管理Job生命周期以及协程的状态父子Job关系维护等等。
它们的调用链如下:
父子Job关联分析
父子Job关联操作是在上面launch流程中的,在调用start方法的时候进行关联的:
initParentJob方法里面,先调用parent.start方法,确保parent的Job已经启动了,接着调用parent.attachChild(this)方法,用于关联父子Job。 代码如下:
//AbstractCoroutine internal fun initParentJob() { //取出上下文集合中的Job元素,调用initParentJobInternal方法 initParentJobInternal(parentContext[Job]) } //AbstractCoroutine internal fun initParentJobInternal(parent: Job?) { //省略。。。 parent.start() // make sure the parent is started //省略。。。 val handle = parent.attachChild(this) parentHandle = handle //省略。。。 if (isCompleted) { handle.dispose() } }
首先取出parentContext[Job]的Job元素,这个parentContext就是launch的时候根据scope的上下文集合创建出来的上下文集合,取出的Job元素就是父Job,作为initParentJobInternal的参数,接着调用parent.attachChild(this):
//JobSupport public final override fun attachChild(child: ChildJob): ChildHandle { /* * Note: This function attaches a special ChildHandleNode node object. This node object * is handled in a special way on completion on the coroutine (we wait for all of them) and * is handled specially by invokeOnCompletion itself -- it adds this node to the list even * if the job is already cancelling. For cancelling state child is attached under state lock. * It's required to properly wait all children before completion and provide linearizable hierarchy view: * If child is attached when the job is already being cancelled, such child will receive immediate notification on * cancellation, but parent *will* wait for that child before completion and will handle its exception. */ return invokeOnCompletion(onCancelling = true, handler = ChildHandleNode(this, child).asHandler) as ChildHandle } //JobSupport internal class ChildHandleNode( parent: JobSupport, @JvmField val childJob: ChildJob ) : JobCancellingNode<JobSupport>(parent), ChildHandle { override fun invoke(cause: Throwable?) = childJob.parentCancelled(job) override fun childCancelled(cause: Throwable): Boolean = job.childCancelled(cause) override fun toString(): String = "ChildHandle[$childJob]" }
首先创建了一个handler = ChildHandleNode(this, child).asHandler对象,这个对象ChildHandleNode作为参数传递给invokeOnCompletion,然后返回一个ChildHandle类型的对象,赋值给子Job的parentHandle val handle = parent.attachChild(this); parentHandle = handle
,parentHandle 这个是子Job持有的变量,ChildHandle接口拥有childCancelled方法,用于子Job通知父Job,子Job已经取消了,父Job需要根据子Job状态继续进行处理。
//JobSupport public final override fun invokeOnCompletion( onCancelling: Boolean, invokeImmediately: Boolean, handler: CompletionHandler ): DisposableHandle { var nodeCache: JobNode<*>? = null loopOnState { state -> when (state) { is Empty -> { // EMPTY_X state -- no completion handlers if (state.isActive) { // try move to SINGLE state val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it } if (_state.compareAndSet(state, node)) return node } //省略。。。 } is Incomplete -> { val list = state.list if (list == null) { // SINGLE/SINGLE+ promoteSingleToNodeList(state as JobNode<*>) } else { var rootCause: Throwable? = null var handle: DisposableHandle = NonDisposableHandle val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it } if (!addLastAtomic(state, list, node)) return@loopOnState // retry if (rootCause == null) return node //省略。。。 if (rootCause != null) { //省略。。。 } else { val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it } if (addLastAtomic(state, list, node)) return node } } } else -> { // is complete //省略。。。 } } } }
invokeOnCompletion方法就是,将传递进来的handler: CompletionHandler,分情况存储起来,
当state状态是Empty状态,创建一个代理节点node ,之后存入到state中; 当state是Incomplete状态,如果state.list结构是空的,那么创建一个链表,将node 节点作为第一个节点存进去,当前state.list不为空,那么将node节点插入到链表的末尾。 这样经过上面这两步: 子Job持有的parentHandle对象可以通知父Job自己已经取消了:
override fun childCancelled(cause: Throwable): Boolean = job.childCancelled(cause)
父Job持有的state对象保存着包装着子Job的ChildHandleNode对象,父Job通过遍历调用列表中的node元素的invoke方法,即可取消所有的子Job:
override fun invoke(cause: Throwable?) = childJob.parentCancelled(job)
会发现, 调用launch生成一个Job,这个Job就会initParentJob() ,进而子Job会持有父Job,父Job也会将子Job加入到state的数据结构中,进而形成了树的结构,类似于下图:
父子Job都可以互相通知对方自己已经取消,需要做出对应的处理。
结论
launch启动一个协程,会生成三个continuation,分别是
DispatchedContinuation用于分发continuation到指定的线程池中; ContinuationImpl用于包装launch的lambda代码块作为业务代码代理类; StandAloneCoroutine协程管理类管理Job生命周期以及协程的状态父子Job关系维护等等。 调用链:DispatchedContinuation -> ContinuationImpl(在这里调用launch的lambda业务代码块) -> StandAloneCoroutine
launch启动一个协程Job,这个Job所在域如果存在parentJob ,那么parentJob和Job会形成树结构上的父子节点,并且子Job继承了父Job的CoroutineScope的上下文集合(根据参数会覆盖一些重复Key的元素)。
加载全部内容