github原文地址
原创翻译,转载请保留或注明出处:https://www.jianshu.com/p/01d26fbc9b80
共享的可变状态和并发
协程可用多线程调度器(比如默认的 CommonPool )并发执行。这样就可以提出所有常见的并发问题。主要的问题是同步访问共享的可变状态。协程领域对这个问题的一些解决方案类似于多线程领域中的解决方案,但其他解决方案则是独一无二的。
问题
我们启动一千个协程,它们都做一千次相同的动作(总计100万次执行)。我们同时会测量它们的完成时间,以便进一步的比较:
suspend fun massiveRun(context: CoroutineContext, action: suspend () -> Unit) {
val n = 1000 // number of coroutines to launch
val k = 1000 // times an action is repeated by each coroutine
val time = measureTimeMillis {
val jobs = List(n) {
launch(context) {
repeat(k) { action() }
}
}
jobs.forEach { it.join() }
}
println("Completed ${n * k} actions in $time ms")
}
我们从一个非常简单的动作开始:在多线程 CommonPool 上下文递增一个共享的可变变量。
var counter = 0
fun main(args: Array<String>) = runBlocking<Unit> {
massiveRun(CommonPool) {
counter++
}
println("Counter = $counter")
}
获取完整代码 here
这段代码最后打印出什么结果?它不太可能打印出“Counter = 1000000”,因为一千个协程从多个线程同时递增计数器而且没有做同步并发处理。
注意:如果你的运行机器使用两个或者更少的cpu,那么你总是会看到1000000,因为
CommonPool
在这种情况下只会在一个线程中运行。要重现这个问题,可以做如下的变动:
val mtContext = newFixedThreadPoolContext(2, "mtPool") // explicitly define context with two threads
var counter = 0
fun main(args: Array<String>) = runBlocking<Unit> {
massiveRun(mtContext) { // use it instead of CommonPool in this sample and below
counter++
}
println("Counter = $counter")
}
获取完整代码 here
没有发挥作用的volatile
有一种常见的误解:volatile
可以解决并发问题。让我们尝试一下:
@Volatile // in Kotlin `volatile` is an annotation
var counter = 0
fun main(args: Array<String>) = runBlocking<Unit> {
massiveRun(CommonPool) {
counter++
}
println("Counter = $counter")
}
获取完整代码 here
这段代码运行速度更慢了,但我们仍然没有得到 “Counter = 1000000”,因为 volatile 变量保证可线性化(这是“原子”的技术术语)读取和写入变量,但在大量动作(在我们的示例中即“递增”操作)发生时并不提供原子性。
线程安全的数据结构
一种对线程、协程都有效的常规解决方法,就是使用线程安全(也称为同步的、可线性化、原子)的数据结构,它为需要在共享状态上执行的相应操作提供所有必需的同步处理。在简单的计数器场景中,我们可以使用具有 incrementAndGet
原子操作的AtomicInteger
类:
var counter = AtomicInteger()
fun main(args: Array<String>) = runBlocking<Unit> {
massiveRun(CommonPool) {
counter.incrementAndGet()
}
println("Counter = ${counter.get()}")
}
获取完整代码 here
这是针对此类特定问题的最快解决方案。它适用于普通计数器、集合、队列和其他标准数据结构以及它们的基本操作。然而,它并不容易扩展为应对复杂状态、或复杂操作没有现成的线程安全实现的情况。
以细粒度限制线程
限制线程是解决共享可变状态问题的一种方案,其中对特定共享状态的所有访问权都限制在单个线程中。它通常应用于UI程序中:所有UI状态都局限于单个事件分发线程或应用主线程中。这在协程中很容易实现,通过使用一个单线程上下文:
val counterContext = newSingleThreadContext("CounterContext")
var counter = 0
fun main(args: Array<String>) = runBlocking<Unit> {
massiveRun(CommonPool) { // run each coroutine in CommonPool
withContext(counterContext) { // but confine each increment to the single-threaded context
counter++
}
}
println("Counter = $counter")
}
获取完整代码 here
这段代码运行非常缓慢,因为它进行了细粒度的线程限制。每个增量操作都得使用 withContext 块从多线程 CommonPool
上下文切换到单线程上下文。
以粗粒度限制线程
在实践中,线程限制是在大段代码中执行的,例如:状态更新类业务逻辑中大部分都是限于单线程中。下面的示例演示了这种情况,在单线程上下文中运行每个协程。
val counterContext = newSingleThreadContext("CounterContext")
var counter = 0
fun main(args: Array<String>) = runBlocking<Unit> {
massiveRun(counterContext) { // run each coroutine in the single-threaded context
counter++
}
println("Counter = $counter")
}
获取完整代码 here
这段代码运行更快而且打印出了正确的结果。
互斥
该问题的互斥解决方案是使用永远不会同时执行的关键代码块来保护共享状态的所有修改。在阻塞的世界中,你通常会使用 synchronized
或者 ReentrantLock
。在协程中的替代品叫做 Mutex 。它具有 lock 和 unlock 方法,可以隔离关键的部分。关键的区别在于 Mutex.lock()
是一个挂起函数,它不会阻塞线程。
还有 withLock 扩展函数,可以方便的替代常用的 mutex.lock(); try { ... } finally { mutex.unlock() }
模式:
val mutex = Mutex()
var counter = 0
fun main(args: Array<String>) = runBlocking<Unit> {
massiveRun(CommonPool) {
mutex.withLock {
counter++
}
}
println("Counter = $counter")
}
获取完整代码 here
此示例中锁是细粒度的,因此会付出一些代价。但是对于某些必须定期修改共享状态的场景,它是一个不错的选择,但是没有自然线程可以限制此状态。
Actors
一个 actor 是由若干元素组成的一个实体:一个协程、它的状态受限封装在此协程中、以及一个与其他协程通信的 channel 。一个简单的 actor 可以简单的写成一个函数,但是一个拥有复杂状态的 actor 更适合由类来表示。
有一个 actor 协程构建器,它可以方便地将 actor 的邮箱 channel 组合到其作用域中(用来接收消息)、组合发送 channel 与结果集对象,这样对 actor 的单个引用就可以作为其句柄持有。
使用 actor 的第一步是定一个 actor 要处理的消息类。Kotlin 的 sealed classes 密封类很适合这种场景。我们使用 IncCounter
消息(用来递增计数器)和 GetCounter
消息(用来获取值)来定义 CounterMsg
密封类。后者需要发送回复。CompletableDeferred 通信原语表示未来可知(传达)的单个值,此处用于此目的。
// Message types for counterActor
sealed class CounterMsg
object IncCounter : CounterMsg() // one-way message to increment counter
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // a request with reply
接下来我们定义一个函数,使用 actor 协程构建器来启动一个 actor:
// This function launches a new counter actor
fun counterActor() = actor<CounterMsg> {
var counter = 0 // actor state
for (msg in channel) { // iterate over incoming messages
when (msg) {
is IncCounter -> counter++
is GetCounter -> msg.response.complete(counter)
}
}
}
主函数代码很简单:
fun main(args: Array<String>) = runBlocking<Unit> {
val counter = counterActor() // create the actor
massiveRun(CommonPool) {
counter.send(IncCounter)
}
// send a message to get a counter value from an actor
val response = CompletableDeferred<Int>()
counter.send(GetCounter(response))
println("Counter = ${response.await()}")
counter.close() // shutdown the actor
}
获取完整代码 here
actor 本身执行所处上下文的正确性无关紧要。一个 actor 是一个协程,而一个协程是按顺序执行的,因此将状态限制到特定协程可以解决共享可变状态的问题。实际上,actor 可以修改自己的私有状态,但只能通过消息互相影响(避免任何锁定)。
actor 在高负载下比锁更有效,因为在这种情况下它总是有工作要做,而且根本不需要切换到不同的上下文。
注意, actor 协程构建器是 produce 协程构建器的双重构件。一个 actor 与它接收消息的 channel 相关联,而一个 producer 与它发送元素的 channel 相关联。