Kotlin协程与并发深入全面讲解
且听真言 人气:2协程与并发
Kotlin协程是基于线程执行的。经过一层封装以后,Kotlin协程面对并发,处理方式与Java不同。
在java的世界里,并发往往是多个线程一起工作,存在共享的变量。需要处理好同步问题。要避免把协程与线程的概念混淆。
runBlocking { var i = 0 launch(Dispatchers.Default) { repeat(1000) { i++ } } delay(1000L) println(i) }
Log
1000
Process finished with exit code 0
上述代码中没有任何并发任务,launch创建了一个协程,所有的计算都发生在协程中。所以不需要考虑同步问题。
1.协程并发问题
多个协程并发执行的例子:
runBlocking { var i = 0 val jobs = mutableListOf<Job>() repeat(10) { val job = launch(Dispatchers.Default) { repeat(1000) { i++ } } jobs.add(job) } jobs.joinAll() println(i) }
9933
Process finished with exit code 0
上述代码中,创建了10个协程任务,每个协程任务都会工作在Default线程池中,这10个协程任务对i进行1000次自增操作,但是因为10个协程分别运行在不同的线程之前,且共享一个变量,所以会产生同步问题。
2.协程处理并发的手段
在Java中的同步手段有:synchronized、Atomic、Lock等;
使用@Synchronized注解或者synchronized(){}代码块
runBlocking { var i = 0 val lock = Any() val jobs = mutableListOf<Job>() repeat(10) { val job = launch(Dispatchers.Default) { repeat(1000) { synchronized(lock) { i++ } } } jobs.add(job) } jobs.joinAll() println(i) }
10000
Process finished with exit code 0
如何在上面的synchronized代码块中加入挂起函数,则发现会报错。
如下:
runBlocking { suspend fun prepare() { } var i = 0 val lock = Any() val jobs = mutableListOf<Job>() repeat(10) { val job = launch(Dispatchers.Default) { repeat(1000) { synchronized(lock) { prepare() i++ } } } jobs.add(job) } jobs.joinAll() println(i) }
所以可以发现不能在synchronized{}当中调用挂起函数,编译器会报错。因为挂起函数会被翻译成带有Continuation的异步函数,造成synchronized代码块无法同步处理。
协程并发思路
单线程并发
在Kotlin协程中可以实现单线程并发。
runBlocking { suspend fun getResult1(): String { printlnCoroutine("Start getResult1") delay(1000L) printlnCoroutine("End getResult1") return "Result1" } suspend fun getResult2(): String { printlnCoroutine("Start getResult2") delay(1000L) printlnCoroutine("End getResult2") return "Result2" } suspend fun getResult3(): String { printlnCoroutine("Start getResult3") delay(1000L) printlnCoroutine("End getResult3") return "Result3" } val results = mutableListOf<String>() val time = measureTimeMillis { val result1 = async { getResult1() } val result2 = async { getResult2() } val result3 = async { getResult3() } results.add(result1.await()) results.add(result2.await()) results.add(result3.await()) } println("Time:$time") println(results) } fun printlnCoroutine(any: Any?) { println("" + any + ";Thread:" + Thread.currentThread().name) }
Log
Start getResult1;Thread:main @coroutine#2
Start getResult2;Thread:main @coroutine#3
Start getResult3;Thread:main @coroutine#4
End getResult1;Thread:main @coroutine#2
End getResult2;Thread:main @coroutine#3
End getResult3;Thread:main @coroutine#4
Time:1028
[Result1, Result2, Result3]
Process finished with exit code 0
上面代码启动了三个协程,它们之间是并发执行的,每个协程耗时1000ms,总耗时1000多毫秒,而且这几个协程都运行在main线程上。
所以 可以考虑将i++逻辑分发到单线程之上。
runBlocking { val coroutineDispatcher = Executors.newSingleThreadExecutor { Thread(it, "MySingleThread").apply { isDaemon = true } }.asCoroutineDispatcher() var i = 0 val jobs = mutableListOf<Job>() repeat(10) { val job = launch(coroutineDispatcher) { repeat(1000) { i++ } } jobs.add(job) } jobs.joinAll() println(i) }
10000
Process finished with exit code 0
上述代码把所有协程任务分发到单独的线程中执行,但这10个协程是并发执行的。
Mutex
在java中,Lock之类的同步锁是阻塞式的,而Kotlin提供了非阻塞式的锁:Mutex。
runBlocking { val mutex = Mutex() var i = 0 val jobs = mutableListOf<Job>() repeat(10) { val job = launch(Dispatchers.Default) { repeat(1000) { mutex.lock() i++ mutex.unlock() } } jobs.add(job) } jobs.joinAll() println(i) }
Log
10000
Process finished with exit code 0
上述代码使用mutex.lock()、 mutex.unlock()包裹同步计算逻辑,实现多线程同步。Mutex 对比 JDK 当中的锁,最大的优势就在于支持挂起和恢复。
public interface Mutex { public val isLocked: Boolean public fun tryLock(owner: Any? = null): Boolean public suspend fun lock(owner: Any? = null; @Deprecated(level = DeprecationLevel.WARNING, message = "Mutex.onLock deprecated without replacement. " + "For additional details please refer to #2794") // WARNING since 1.6.0 public val onLock: SelectClause2<Any?, Mutex> public fun holdsLock(owner: Any): Boolean public fun unlock(owner: Any? = null) }
Mutex 是一个接口,它的 lock() 方法其实是一个挂起函数。而这就是实现非阻塞式同步锁的根本原因。
但是上述代码中对于 Mutex 的使用其实是错误的,会存在问题。如果代码在 mutex.lock()、mutex.unlock() 之间发生异常,从而导致 mutex.unlock() 无法被调用。这个时候,整个程序的执行流程就会一直卡住,无法结束。看下面代码:
runBlocking { val mutex = Mutex() var i = 0 val jobs = mutableListOf<Job>() repeat(10) { val job = launch(Dispatchers.Default) { repeat(1000) { mutex.lock() i++ i/0 mutex.unlock() } } jobs.add(job) } jobs.joinAll() println(i) }
如何解决?使用mutex.withLock{}。
代码入下:
runBlocking { val mutex = Mutex() var i = 0 val jobs = mutableListOf<Job>() repeat(10) { val job = launch(Dispatchers.Default) { repeat(1000) { mutex.withLock { i++ } } } jobs.add(job) } jobs.joinAll() println(i) }
10000
Process finished with exit code 0
public suspend inline fun <T> Mutex.withLock(owner: Any? = null, action: () -> T): T { contract { callsInPlace(action, InvocationKind.EXACTLY_ONCE) } lock(owner) try { return action() } finally { unlock(owner) } }
withLock{} 的本质,其实是在 finally{} 当中调用了 unlock()。
Actor
Actor,它本质上是基于 Channel 管道消息实现的。
sealed class Msg object AddMsg : Msg() class ResultMsg(val result: CompletableDeferred<Int>) : Msg() fun testCoroutinueConcurrent10() { runBlocking { suspend fun addActor() = actor<Msg> { var counter = 0 for (msg in channel) { when (msg) { is AddMsg -> counter++ is ResultMsg -> msg.result.complete(counter) } } } val actor = addActor() val jobs = mutableListOf<Job>() repeat(10) { val job = launch(Dispatchers.Default) { repeat(1000) { actor.send(AddMsg) } } jobs.add(job) } jobs.joinAll() val deferred = CompletableDeferred<Int>() actor.send(ResultMsg(deferred)) val result = deferred.await() actor.close() println(result) } }
Log
10000
Process finished with exit code 0
addActor() 挂起函数,它其实调用了 actor() 这个高阶函数。而这个函数的返回值类型其实是 SendChannel。由此可见,Kotlin 当中的 Actor 其实就是 Channel 的简单封装。Actor 的多线程同步能力都源自于 Channel。这里,我们借助密封类定义了两种消息类型,AddMsg、ResultMsg,然后在 actor{} 内部,我们处理这两种消息类型,如果我们收到了 AddMsg,则计算“i++”;如果收到了 ResultMsg,则返回计算结果。而在 actor{} 的外部,我们则只需要发送 10000 次的 AddMsg 消息,最后再发送一次 ResultMsg,取回计算结果即可。Actor 本质上是基于 Channel 管道消息实现的。
避免共享可变状态
runBlocking { val deferreds = mutableListOf<Deferred<Int>>() repeat(10) { val deferred = async(Dispatchers.Default) { var i = 0 repeat(1000) { i++ } return@async i } deferreds.add(deferred) } var result = 0 deferreds.forEach { result += it.await() } println(result) }
Log
10000
Process finished with exit code 0
在每一个协程当中,都有一个局部的变量 i,同时将 launch 都改为了 async,让每一个协程都可以返回计算结果。这种方式,相当于将 10000 次计算,平均分配给了 10 个协程,让它们各自计算 1000 次。这样一来,每个协程都可以进行独立的计算,然后我们将 10 个协程的结果汇总起来,最后累加在一起。
runBlocking { val result = (1..10).map { async(Dispatchers.Default) { var i = 0 repeat(1000) { i++ } return@async i } }.awaitAll() .sum() println(result) }
Log
10000
Process finished with exit code 0
加载全部内容