Kotlin协程Job生命周期结构化并发详解
无糖可乐爱好者 人气:0引言
前面在学习协程启动方式的时候在launch
的源码中有一个返回值是Job
,async
的返回Deferred
也是实现了Job
,那么而也就是说launch
和async
在创建一个协程的时候也会创建一个对应的Job
对象。还提到过Job
是协程的句柄,那么Job
到底是什么?它有什么用?
1.Job的生命周期
先看一下Job
的源码,这里只保留了跟标题相关的内容
public interface Job : CoroutineContext.Element { // ------------ 状态查询API ------------ /** * 当该Job处于活动状态时,返回true——它已经开始,没有完成,也没有取消。 * 如果没有取消或失败,等待其子任务完成的Job仍被认为是活动的。 */ public val isActive: Boolean /** * 当Job因任何原因完成时返回true。作业被取消或失败并已完成其执行也被视为完成。 * Job只有在所有子任务完成后才算完成。 */ public val isCompleted: Boolean /** *如果该作业因任何原因被取消,无论是通过显式调用cancel,还是因为它失败或其子或父作业被取消, * 则返回true。在一般情况下,它并不意味着任务已经完成,因为它可能仍然在完成它正在做的事情, * 并等待它的子任务完成。 */ public val isCancelled: Boolean // ------------ 操控状态API ------------ /** * 如果Job所在的协程还没有被启动那么调用这个方法就会启动协程 * 如果这个协程被启动了返回true,如果已经启动或者执行完毕了返回false */ public fun start(): Boolean /** * 取消此Job,可用于指定错误消息或提供有关取消原因的其他详细信息 */ public fun cancel(cause: CancellationException? = null) /** * 取消此Job */ public fun cancel(): Unit = cancel(null) public fun cancel(cause: Throwable? = null): Boolean // ------------ 等待状态API ------------ /** * 挂起协程,知道任务完成再恢复 */ public suspend fun join() // ------------ 完成状态回调API ------------ /** * 注册Job完成时同步调用的处理程序. * 当Job已经完成时,将处理程序将立即调用Job的异常或取消原因或null * 否则,该处理程序将在此Job完成时调用一次。 */ public fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle /** * 注册在取消或完成此Job时同步调用的处理程序。 * 当Job已经被取消并完成执行时,处理程序将立即调用Job的取消原因或null, * 除非将invokeImmediately设置为false。否则, * 当Job取消或完成时将调用一次handler。 */ public fun invokeOnCompletion( onCancelling: Boolean = false, invokeImmediately: Boolean = true, handler: CompletionHandler): DisposableHandle }
从源码中可以发现这几个函数和变量跟Actviity或者Fragment非常像,所以我们可以总结出两个结论:
- Job可以监测协程的生命周期
- Job可以操控协程
在例子中使用这几个函数和变量再来校验一下上面的结论:
fun main() = runBlocking { val job = launch { delay(1000L) } job.log() job.cancel() job.log() } fun Job.log() { println( """ isActive:$isActive isCompleted:$isCompleted isCancelled:$isCancelled Thread:${Thread.currentThread().name} ================================ """.trimIndent() ) } //输出结果 //isActive:true //isCompleted:false //isCancelled:false //Thread:main @coroutine#1 //================================ //isActive:false //isCompleted:false //isCancelled:true //Thread:main @coroutine#1 //================================
Job.log
用了扩展函数,方便调用Job
中的状态监测返回值。
上面的代码通过launch
创建了一个协程,接收了Job
的返回值,这里用这个job
对象做了三件事:
- 第一个
job.log()
:launch
的创建标志着协程已经被启动所以在第一个job.log()
的日志中isActive
返回值是true; job.cancel()
: 这里调用了job
的取消函数将协程任务取消;- 第二个
job.log()
: 上面的代码将协程任务取消了,然后再次获取协程状态发现isActivte
返回false,isCancelled
返回true。
上面的代码也印证了前面提出的结论,还有一个函数start
没使用,再来调用它之后输出的日志:
fun main() = runBlocking { //变化1 val job = launch(start = CoroutineStart.LAZY) { delay(1000L) } job.log() //变化2 job.start() job.log() job.cancel() job.log() } fun Job.log() { println( """ isActive:$isActive isCompleted:$isCompleted isCancelled:$isCancelled Thread:${Thread.currentThread().name} ================================ """.trimIndent() ) } //输出结果: //isActive:false //isCompleted:false //isCancelled:false //Thread:main @coroutine#1 //================================ //isActive:true //isCompleted:false //isCancelled:false //Thread:main @coroutine#1 //================================ //isActive:false //isCompleted:false //isCancelled:true //Thread:main @coroutine#1 //================================
上面的代码增加了两处修改:
- 变化1:协程在创建出来的时候就已经被启动,因此为了查看调用
Job.start()
前的日志需要加上懒启动 - 变化2:调用
start
函数启动协程
从输出结果来看没有调用start
函数前isActive
返回true,调用后就返回了true
,当使用懒启动后在调用cancel
函数与前面使用cancel
函数输出的日志是一样的,可以得知懒启动后对协程的生命周期并没有设么影响(这可能是句废话)。
现在还有最后一个变量没有看isCompleted
,在上面的代码中添加一个延时函数,等协程任务结束再打印日志
fun main() = runBlocking { val job = launch(start = CoroutineStart.LAZY) { delay(1000L) } job.log() job.start() job.log() job.cancel() delay(2000L) //变化在这里 job.log() } fun Job.log() { println( """ isActive:$isActive isCompleted:$isCompleted isCancelled:$isCancelled Thread:${Thread.currentThread().name} ================================ """.trimIndent() ) } //输出结果: //isActive:false //isCompleted:false //isCancelled:false //Thread:main @coroutine#1 //================================ //isActive:true //isCompleted:false //isCancelled:false //Thread:main @coroutine#1 //================================ //isActive:false //isCompleted:true //isCancelled:true //Thread:main @coroutine#1 //================================
从输出结果中看到当调用isCancel
后isCompleted
也返回了true,也就是说任务结束了。
上面的代码为了监测isCompleted
的状态加了一个延时函数delay
,但是这种方式并不建议使用,因为这个时间他不是固定的,例如从后台请求数据或者下载文件,这种情况下的时间是完全无法预知的。
现在假设已经知道协程执行完毕需要delay(1000L)
的时间,如果将协程内的delay
时长设置的大于外部的delay
时长,会带来什么问题?
fun main() = runBlocking { val job = launch(start = CoroutineStart.LAZY) { delay(4000L) } job.log() job.start() job.log() delay(1000L) job.log() println("Process end!") } fun Job.log() { println( """ isActive:$isActive isCompleted:$isCompleted isCancelled:$isCancelled Thread:${Thread.currentThread().name} ================================ """.trimIndent() ) } //输出结果: //isActive:false //isCompleted:false //isCancelled:false //Thread:main @coroutine#1 //================================ //isActive:true //isCompleted:false //isCancelled:false //Thread:main @coroutine#1 //================================ //isActive:true //isCompleted:false //isCancelled:false //Thread:main @coroutine#1 //================================ //Process end!
由输出结果可知isCompleted
状态是false,协程任务是否执行完毕不得而知。另外当println("Process end!")
执行完毕后程序并没有立即输出Process finished with exit code 0
,这是因为runBlocking 会一直阻塞,等到 job 任务执行完毕以后才真正退出。
那要如何解决这个问题?
//Job#join /** * 挂起协程,知道任务完成再恢复 */ public suspend fun join()
join
是Job
中的一个挂起函数,调用后会挂起当前程序的执行流程,等待job
当中的协程任务执行完毕然后再恢复当前程序的执行流程。
join
将任务挂起后再恢复,那要如何知道任务是否执行完毕了?invokeOnCompletion
可以监听任务执行的状态
//Job#invokeOnCompletion /** * 注册Job完成时同步调用的处理程序. * 当Job已经完成时,将处理程序将立即调用Job的异常或取消原因或null * 否则,该处理程序将在此Job完成时调用一次。 */ public fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle //Job#invokeOnCompletion /** * 注册在取消或完成此Job时同步调用的处理程序。 * 当Job已经被取消并完成执行时,处理程序将立即调用Job的取消原因或null, * 除非将invokeImmediately设置为false。否则, * 当Job取消或完成时将调用一次handler。 */ public fun invokeOnCompletion( onCancelling: Boolean = false, invokeImmediately: Boolean = true, handler: CompletionHandler): DisposableHandle
join
和invokeOnCompletion
的使用如下:
fun main() = runBlocking { val job = launch(start = CoroutineStart.LAZY) { delay(4000L) } job.log() job.start() job.log() //新增 job.join() //新增 job.invokeOnCompletion { println("==========Task status==========") job.log() } println("Process end!") } fun Job.log() { println( """ isActive:$isActive isCompleted:$isCompleted isCancelled:$isCancelled Thread:${Thread.currentThread().name} ================================ """.trimIndent() ) } //输出结果: //isActive:false //isCompleted:false //isCancelled:false //Thread:main @coroutine#1 //================================ //isActive:true //isCompleted:false //isCancelled:false //Thread:main @coroutine#1 //================================ //==========Task status========== //isActive:false //isCompleted:true //isCancelled:false //Thread:main @coroutine#1 //================================ //Process end!
可以看到加入join
和invokeOnCompletion
之后isCompleted
的状态就正确了,同时Process end!
输出后Process finished with exit code 0
也会很快的输出,这说明任务确实执行完毕了。
在讲协程的启动方式的时候提出一个观点:launch
的返回值Job
代表的是协程的句柄。那么Job
是协程的句柄该怎么理解?
句柄: 是指一个中间媒介,可以操控一个东西。就类似于遥控器操作空调场景中遥控器就是句柄,开关控制灯具场景中开关就是句柄。
所以Job
和协程的关系就类似于遥控器和空调,开关和灯具。Job
可以监测协程的运行状态也可以控制协程的运行状态。那么Job
就和遥控器、开关一样看做是一个句柄。
2.Deffered
launch
直接创建了Job
,async
通过Deffered
间接创建了Job
对象,但是它并没有在 Job
的基础上扩展出很多其他功能,而接收一个返回值是依靠 await()
方法,那await
方法是如何实现的?
fun main() = runBlocking { val deferred = async { logX("Coroutine start!") delay(1000L) logX("Coroutine end!") "Coroutine result!" } val result = deferred.await() println("Result = $result") logX("Process end!") } fun logX(any: Any?) { println( """ ================================ $any Thread:${Thread.currentThread().name} ================================ """.trimIndent() ) } //输出结果: //Coroutine start! //Thread:main @coroutine#2 //================================ //================================ //Coroutine end! //Thread:main @coroutine#2 //================================ //Result = Coroutine result! //================================ //Process end! //Thread:main @coroutine#1
从输出结果来看,await
方法可以获取协程执行结果外,好像还会阻塞协程的执行流程,直到协程任务执行完毕。看一下await
的源码
//Deferred#await public interface Deferred<out T> : Job { ... public suspend fun await(): T ... }
从源码来看await
也是一个挂起函数,它跟join
是一样的,看似阻塞的过程其实是协程的挂起和恢复能力。
所以,总的来说,Deferred
只是比 Job
多了一个 await()
挂起函数而已,通过这个挂起函数,就可以等待协程执行完毕的同时,还可以直接拿到协程的执行结果。
3.Job与结构化并发
在其他地方看过这么一句话:协程的优势在于结构化并发, 这句话该如何理解?
这句话可以理解为带有结构和层级的并发,用代码表现就像这样:
fun main() = runBlocking { val parentJob: Job var childJob1: Job? = null var childJob2: Job? = null var childJob3: Job? = null parentJob = launch { childJob1 = launch { delay(1000L) } childJob2 = launch { delay(3000L) } childJob3 = launch { delay(5000L) } } delay(500L) parentJob.children.forEachIndexed { index, job -> when (index) { 0 -> println("childJob1 === childJob1 is ${childJob1 === job}") 1 -> println("childJob2 === childJob2 is ${childJob2 === job}") 2 -> println("childJob3 === childJob3 is ${childJob3 === job}") } } parentJob.join() logX("Process end!") } //输出结果: //childJob1 === childJob1 is true //childJob2 === childJob2 is true //childJob3 === childJob3 is true //================================ //Process end! //Thread:main @coroutine#1
上面的代码是父子层级,父Job
使用launch
启动了协程同时它的内部还有三个Job
,三个子Job
是并发执行的,同时也是用过launch
启动的协程,调用了parentJob.join()
那么挂起的时间就是childJob3
的时长—5秒,因为它要等待所有任务都执行完毕才会恢复执行,然后通过children.forEachIndexed
进行遍历并分别对比他们与三个子Job
的引用是否相等“===”代表了引用相等,即是否是同一个对象)。图示如下
前面讲过,Job
可以调用cancel
方法取消执行,那么当调用parentJob.cancel
会有什么样的情况?
fun main() = runBlocking { val parentJob: Job var childJob1: Job? = null var childJob2: Job? = null var childJob3: Job? = null parentJob = launch { childJob1 = launch { println("childJob1 start") delay(1000L) println("childJob1 end") } childJob2 = launch { println("childJob2 start") delay(3000L) println("childJob2 start") } childJob3 = launch { println("childJob3 start") delay(5000L) println("childJob3 start") } } delay(500L) parentJob.cancel() logX("Process end!") } //输出结果: //childJob1 start //childJob2 start //childJob3 start //================================ //Process end! //Thread:main @coroutine#1
parentJob.cancel
调用后,每个子Job
只是输出了start,这就可以得出一个结论:父Job
取消后子Job
也会依次跟着取消。如果调用任何一个子Job
的cancel
则不会对父Job
和其他子Job
产生影响。
到这里对于开头的那句协程的优势在于结构化并发就有更更好的理解了,这是Kotlin协程的第二大优势。
4.launch和async的使用场景
- launch: 主要用来发起一些不需要任何结果的耗时任务,这个任务在执行中可以改变它的执行状态。
- async: 主要用来发起一些需要结果的耗时任务,以及与挂起函数结合,优化并发。
加载全部内容