【翻译】kotlin协程核心库文档(六)—— 共享的可变状态和并发

github原文地址

原创翻译,转载请保留或注明出处:https://www.jianshu.com/p/01d26fbc9b80

共享的可变状态和并发


协程可用多线程调度器(比如默认的 CommonPool )并发执行。这样就可以提出所有常见的并发问题。主要的问题是同步访问共享的可变状态。协程领域对这个问题的一些解决方案类似于多线程领域中的解决方案,但其他解决方案则是独一无二的。

问题

我们启动一千个协程,它们都做一千次相同的动作(总计100万次执行)。我们同时会测量它们的完成时间,以便进一步的比较:

suspend fun massiveRun(context: CoroutineContext, action: suspend () -> Unit) {
    val n = 1000 // number of coroutines to launch
    val k = 1000 // times an action is repeated by each coroutine
    val time = measureTimeMillis {
        val jobs = List(n) {
            launch(context) {
                repeat(k) { action() }
            }
        }
        jobs.forEach { it.join() }
    }
    println("Completed ${n * k} actions in $time ms")
}

我们从一个非常简单的动作开始:在多线程 CommonPool 上下文递增一个共享的可变变量。

var counter = 0

fun main(args: Array<String>) = runBlocking<Unit> {
    massiveRun(CommonPool) {
        counter++
    }
    println("Counter = $counter")
}

获取完整代码 here

这段代码最后打印出什么结果?它不太可能打印出“Counter = 1000000”,因为一千个协程从多个线程同时递增计数器而且没有做同步并发处理。

注意:如果你的运行机器使用两个或者更少的cpu,那么你总是会看到1000000,因为CommonPool在这种情况下只会在一个线程中运行。要重现这个问题,可以做如下的变动:

val mtContext = newFixedThreadPoolContext(2, "mtPool") // explicitly define context with two threads
var counter = 0

fun main(args: Array<String>) = runBlocking<Unit> {
    massiveRun(mtContext) { // use it instead of CommonPool in this sample and below
        counter++
    }
    println("Counter = $counter")
}

获取完整代码 here

没有发挥作用的volatile

有一种常见的误解:volatile 可以解决并发问题。让我们尝试一下:

@Volatile // in Kotlin `volatile` is an annotation 
var counter = 0

fun main(args: Array<String>) = runBlocking<Unit> {
    massiveRun(CommonPool) {
        counter++
    }
    println("Counter = $counter")
}

获取完整代码 here

这段代码运行速度更慢了,但我们仍然没有得到 “Counter = 1000000”,因为 volatile 变量保证可线性化(这是“原子”的技术术语)读取和写入变量,但在大量动作(在我们的示例中即“递增”操作)发生时并不提供原子性。

线程安全的数据结构

一种对线程、协程都有效的常规解决方法,就是使用线程安全(也称为同步的、可线性化、原子)的数据结构,它为需要在共享状态上执行的相应操作提供所有必需的同步处理。在简单的计数器场景中,我们可以使用具有 incrementAndGet 原子操作的AtomicInteger 类:

var counter = AtomicInteger()

fun main(args: Array<String>) = runBlocking<Unit> {
    massiveRun(CommonPool) {
        counter.incrementAndGet()
    }
    println("Counter = ${counter.get()}")
}

获取完整代码 here

这是针对此类特定问题的最快解决方案。它适用于普通计数器、集合、队列和其他标准数据结构以及它们的基本操作。然而,它并不容易扩展为应对复杂状态、或复杂操作没有现成的线程安全实现的情况。

以细粒度限制线程

限制线程是解决共享可变状态问题的一种方案,其中对特定共享状态的所有访问权都限制在单个线程中。它通常应用于UI程序中:所有UI状态都局限于单个事件分发线程或应用主线程中。这在协程中很容易实现,通过使用一个单线程上下文:

val counterContext = newSingleThreadContext("CounterContext")
var counter = 0

fun main(args: Array<String>) = runBlocking<Unit> {
    massiveRun(CommonPool) { // run each coroutine in CommonPool
        withContext(counterContext) { // but confine each increment to the single-threaded context
            counter++
        }
    }
    println("Counter = $counter")
}

获取完整代码 here

这段代码运行非常缓慢,因为它进行了细粒度的线程限制。每个增量操作都得使用 withContext 块从多线程 CommonPool 上下文切换到单线程上下文。

以粗粒度限制线程

在实践中,线程限制是在大段代码中执行的,例如:状态更新类业务逻辑中大部分都是限于单线程中。下面的示例演示了这种情况,在单线程上下文中运行每个协程。

val counterContext = newSingleThreadContext("CounterContext")
var counter = 0

fun main(args: Array<String>) = runBlocking<Unit> {
    massiveRun(counterContext) { // run each coroutine in the single-threaded context
        counter++
    }
    println("Counter = $counter")
}

获取完整代码 here

这段代码运行更快而且打印出了正确的结果。

互斥

该问题的互斥解决方案是使用永远不会同时执行的关键代码块来保护共享状态的所有修改。在阻塞的世界中,你通常会使用 synchronized 或者 ReentrantLock 。在协程中的替代品叫做 Mutex 。它具有 lockunlock 方法,可以隔离关键的部分。关键的区别在于 Mutex.lock() 是一个挂起函数,它不会阻塞线程。

还有 withLock 扩展函数,可以方便的替代常用的 mutex.lock(); try { ... } finally { mutex.unlock() } 模式:

val mutex = Mutex()
var counter = 0

fun main(args: Array<String>) = runBlocking<Unit> {
    massiveRun(CommonPool) {
        mutex.withLock {
            counter++
        }
    }
    println("Counter = $counter")
}

获取完整代码 here

此示例中锁是细粒度的,因此会付出一些代价。但是对于某些必须定期修改共享状态的场景,它是一个不错的选择,但是没有自然线程可以限制此状态。

Actors

一个 actor 是由若干元素组成的一个实体:一个协程、它的状态受限封装在此协程中、以及一个与其他协程通信的 channel 。一个简单的 actor 可以简单的写成一个函数,但是一个拥有复杂状态的 actor 更适合由类来表示。

有一个 actor 协程构建器,它可以方便地将 actor 的邮箱 channel 组合到其作用域中(用来接收消息)、组合发送 channel 与结果集对象,这样对 actor 的单个引用就可以作为其句柄持有。

使用 actor 的第一步是定一个 actor 要处理的消息类。Kotlin 的 sealed classes 密封类很适合这种场景。我们使用 IncCounter 消息(用来递增计数器)和 GetCounter 消息(用来获取值)来定义 CounterMsg 密封类。后者需要发送回复。CompletableDeferred 通信原语表示未来可知(传达)的单个值,此处用于此目的。

// Message types for counterActor
sealed class CounterMsg
object IncCounter : CounterMsg() // one-way message to increment counter        
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // a request with reply

接下来我们定义一个函数,使用 actor 协程构建器来启动一个 actor:

// This function launches a new counter actor
fun counterActor() = actor<CounterMsg> {
    var counter = 0 // actor state
    for (msg in channel) { // iterate over incoming messages
        when (msg) {
            is IncCounter -> counter++
            is GetCounter -> msg.response.complete(counter)
        }
    }
}

主函数代码很简单:

fun main(args: Array<String>) = runBlocking<Unit> {
    val counter = counterActor() // create the actor
    massiveRun(CommonPool) {
        counter.send(IncCounter)
    }
    // send a message to get a counter value from an actor
    val response = CompletableDeferred<Int>()
    counter.send(GetCounter(response))
    println("Counter = ${response.await()}")
    counter.close() // shutdown the actor
}

获取完整代码 here

actor 本身执行所处上下文的正确性无关紧要。一个 actor 是一个协程,而一个协程是按顺序执行的,因此将状态限制到特定协程可以解决共享可变状态的问题。实际上,actor 可以修改自己的私有状态,但只能通过消息互相影响(避免任何锁定)。

actor 在高负载下比锁更有效,因为在这种情况下它总是有工作要做,而且根本不需要切换到不同的上下文。

注意, actor 协程构建器是 produce 协程构建器的双重构件。一个 actor 与它接收消息的 channel 相关联,而一个 producer 与它发送元素的 channel 相关联。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,324评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,303评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,192评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,555评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,569评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,566评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,927评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,583评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,827评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,590评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,669评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,365评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,941评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,928评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,159评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,880评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,399评论 2 342