Kotlin 协程 Select:看我如何多路复用

前言

协程通信三剑客:Channel、Select、Flow,上篇已经分析了Channel的深水区,本篇将会重点分析Select的使用及原理。
通过本篇文章,你将了解到:

  1. Select 的引入
  2. Select 的使用
  3. Invoke函数 的妙用
  4. Select 的原理
  5. Select 注意事项

1. Select 的引入

多路数据的选择

串行执行

如今的二维码识别应用场景越来越广了,早期应用比较广泛的识别SDK如zxing、zbar,它们各有各的特点,也存在识别不出来的情况,为了将两者优势结合起来,我们想到的方法是同一份二维码图片分别给两者进行识别。
如下:

    //从zxing 获取二维码信息
    suspend fun getQrcodeInfoFromZxing(bitmap: Bitmap?): String {
        //模拟耗时
        delay(2000)
        return "I'm fish"
    }

    //从zbar 获取二维码信息
    suspend fun getQrcodeInfoFromZbar(bitmap: Bitmap?): String {
        delay(1000)
        return "I'm fish"
    }

    fun testSelect() {
        runBlocking {
            var bitmap = null
            var starTime = System.currentTimeMillis()
            var qrcoe1 = getQrcodeInfoFromZxing(bitmap)
            var qrcode2 = getQrcodeInfoFromZbar(bitmap)
            println("qrcode1=$qrcoe1 qrcode2=$qrcode2 useTime:${System.currentTimeMillis() - starTime} ms")
        }
    }

查看打印,最后花费的时间:

qrcode1=I'm fish qrcode2=I'm fish useTime:3013 ms

当然这是串行的方式效率比较低,我们想到了用协程来优化它。

协程并行执行

如下:

    fun testSelect1() {
        var bitmap = null;
        var starTime = System.currentTimeMillis()
        var deferredZxing = GlobalScope.async {
            getQrcodeInfoFromZxing(bitmap)
        }

        var deferredZbar = GlobalScope.async {
            getQrcodeInfoFromZbar(bitmap)
        }

        runBlocking {
            //挂起等待识别结果
            var qrcoe1 = deferredZxing.await()
            //挂起等待识别结果
            var qrcode2 = deferredZbar.await()
            println("qrcode1=$qrcoe1 qrcode2=$qrcode2 useTime:${System.currentTimeMillis() - starTime} ms")
        }
    }

查看打印,最后花费的时间:

qrcode1=I'm fish qrcode2=I'm fish useTime:2084 ms

可以看出,花费时间明显变少了。
与上个Demo 相比,虽然识别过程是放在协程里并行执行的,但是在等待识别结果却是串行的。我们引入两个识别库的初衷是哪个识别快就用哪个的结果,为了达成这个目的,传统的方式是:

同时监听并记录识别结果的返回。

同时监听多路结果

如下:

    fun testSelect2() {
        var bitmap = null;
        var starTime = System.currentTimeMillis()
        var deferredZxing = GlobalScope.async {
            getQrcodeInfoFromZxing(bitmap)
        }

        var deferredZbar = GlobalScope.async {
            getQrcodeInfoFromZbar(bitmap)
        }

        var isEnd = false
        var result: String? = null
        GlobalScope.launch {
            if (!isEnd) {
                //没有结束,则继续识别
                var resultTmp = deferredZxing.await()
                if (!isEnd) {
                    //识别没有结束,说明自己是第一个返回结果的
                    result = resultTmp
                    println("zxing recognize ok useTime:${System.currentTimeMillis() - starTime} ms")
                    //标记识别结束
                    isEnd = true
                }
            }
        }

        GlobalScope.launch {
            if (!isEnd) {
                var resultTmp = deferredZbar.await()
                if (!isEnd) {
                    //识别没有结束,说明自己是第一个返回结果的
                    result = resultTmp
                    println("zbar recognize ok useTime:${System.currentTimeMillis() - starTime} ms")
                    isEnd = true
                }
            }
        }

        //检测是否有结果返回
        runBlocking {
            while (!isEnd) {
                delay(1)
            }
            println("recognize result:$result")
        }
    }

通过检测isEnd 标记来判断是否有某个模块返回结果。
结果如下:

  • zbar recognize ok useTime:1070 ms
  • recognize result:I'm fish

由于模拟设定的zbar 解析速度快,因此每次都是采纳的是zbar的结果,所花费的时间大幅减少了,该结果符合预期。

Select 闪亮登场

虽说上个Demo结果符合预期,但是多了很多额外的代码、多引入了其它协程,并且需要子模块对标记进行赋值(对"isEnd"进行赋值),没有达到解耦的目的。我们希望子模块的任务是单一且闭环的,如果能在一个函数里统一检测结果的返回就好了。
Select 就是为了解决多路数据的选择而生的。
来看看它是怎么解决该问题的:

    fun testSelect3() {
        var bitmap = null;
        var starTime = System.currentTimeMillis()
        var deferredZxing = GlobalScope.async {
            getQrcodeInfoFromZxing(bitmap)
        }
        var deferredZbar = GlobalScope.async {
            getQrcodeInfoFromZbar(bitmap)
        }
        runBlocking {
            //通过select 监听zxing、zbar 结果返回
            var result = select<String> {
                //监听zxing
                deferredZxing.onAwait {value->
                    //value 为deferredZxing 识别的结果
                    "zxing result $value"
                }

                //监听zbar
                deferredZbar.onAwait { value->
                    "zbar result $value"
                }
            }

            //运行到此,说明已经有结果返回
            println("result from $result useTime:${System.currentTimeMillis() - starTime}")
        }
    }

结果如下:

result from zbar result I'm fish useTime:1079

符合预期,同时可以看出:相比上个Demo,这样写简洁了许多。

2. Select 的使用

除了可以监听async的结果,Select 还可以监听Channel的发送方/接收方 数据,我们以监听接收方数据为例:

    fun testSelect4() {
        runBlocking {
            var bitmap = null;
            var starTime = System.currentTimeMillis()
            var receiveChannelZxing = produce {
                //生产数据
                var result = getQrcodeInfoFromZxing(bitmap)
                //发送数据
                send(result)
            }

            var receiveChannelZbar = produce {
                var result = getQrcodeInfoFromZbar(bitmap)
                send(result)
            }

            var result = select<String> {
                //监听是否有数据发送过来
                receiveChannelZxing.onReceive {
                    value->"zxing result $value"
                }

                receiveChannelZbar.onReceive {
                        value->"zbar result $value"
                }
            }

            println("result from $result useTime:${System.currentTimeMillis() - starTime}")
        }
    }

结果如下:

result from zbar result I'm fish useTime:1028

不论是async还是Channel,Select 都可以监听它们的数据,从而形成多路复用的效果。

image.png

在监听协程里调用select 表达式,表达式{}内声明需要监听的协程的数据,对于select 来说有两种场景:

  1. 没有数据,则select 挂起协程并等待直到其它协程数据准备完成后再次恢复select 所在的协程。
  2. 有数据,则select 正常执行并返回获取的数据。

3. Invoke函数 的妙用

在分析Select 原理之前,需要弄明白invoke函数的原理。
对于Kotlin 类来说,都可以重写其invoke函数。

    operator fun invoke():String {
        return "I'm fish"
    }

如上,重写了SelectDemo里的invoke函数,和普通成员函数一样,我们可以通过对象调用它。

fun main(args: Array<String>) {
    var selectDemo = SelectDemo()
    var result = selectDemo.invoke()
    println("result:$result")
}

当然,可以进一步简化:

fun main(args: Array<String>) {
    var selectDemo = SelectDemo()
    var result = selectDemo()
    println("result:$result")
}

这里涉及到了kotlin的语法糖:对象居然可以像函数一样调用。
作为函数,invoke 当然也可以接收高阶函数作为参数:

    operator fun invoke(block: (Int) -> String): String {
        return block(3)
    }

fun main(args: Array<String>) {
    var selectDemo = SelectDemo()
    var result = selectDemo { age ->
        when (age) {
            3 -> "I'm fish3"
            4 -> "I'm fish4"
            else -> "error"
        }
    }
    println("result:$result")
}

因此,当看到对象作为函数调用时,实际上调用的是invoke函数,具体的逻辑需要查看其invoke函数的实现。

4. Select 的原理

上篇分析过Channel,因此本篇趁热打铁,通过Select 监听Channel数据的变化来分析其原理,为方便讲解,我们先以监听一个Channel的为例。
先从select 表达式本身入手。

    fun testSelect5() {
        runBlocking {
            var starTime = System.currentTimeMillis()
            var receiveChannelZxing = produce {
                //发送数据
                send("I'm fish")
            }

            //确保channel 数据已经send
            delay(1000)
            var result = select<String> {
                //监听是否有数据发送过来
                receiveChannelZxing.onReceive { value ->
                    "zxing result $value"
                }
            }
            println("result from $result useTime:${System.currentTimeMillis() - starTime}")
        }
    }

select 是挂起函数,因此协程运行到此有可能被挂起。

#Select.kt
public suspend inline fun <R> select(crossinline builder: SelectBuilder<R>.() -> Unit): R {
    //...
    return suspendCoroutineUninterceptedOrReturn { uCont ->
        //传入父协程体
        val scope = SelectBuilderImpl(uCont)
        try {
            //执行builder
            builder(scope)
        } catch (e: Throwable) {
            scope.handleBuilderException(e)
        }
        //通过返回值判断是否需要挂起协程
        scope.getResult()
    }
}

重点看builder(scope),builder 是高阶函数,实际上就是执行了select花括号里的内容,而它里面就是监听数据是否返回。

receiveChannelZxing.onReceive
刚开始看的时候势必以为onReceive是个函数,然而它是ReceiveChannel 里的成员变量:

#Channel.kt
    public val onReceive: SelectClause1<E>

通过上一节的分析可知,关键是要找到SelectClause1 的invoke的实现。

#Select.kt
public interface SelectBuilder<in R> {
    //block 有个入参
    //声明了SelectClause1的扩展函数invoke
    public operator fun <Q> SelectClause1<Q>.invoke(block: suspend (Q) -> R)
}

override fun <Q> SelectClause1<Q>.invoke(block: suspend (Q) -> R) {
    //SelectBuilderImpl 实现了 SelectClause1 的invoke函数
    registerSelectClause1(this@SelectBuilderImpl, block)
}

再看onReceive 的赋值:

#AbstractChannel.kt
final override val onReceive: SelectClause1<E>
    get() = object : SelectClause1<E> {
        @Suppress("UNCHECKED_CAST")
        override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (E) -> R) {
            registerSelectReceiveMode(select, RECEIVE_THROWS_ON_CLOSE, block as suspend (Any?) -> R)
        }
    }

因此,简单总结调用栈如下:

当调用receiveChannelZxing.onReceive{},实际上调用了SelectClause1.invoke(),而它里面又调用了SelectClause1.registerSelectClause1(),最终调用了AbstractChannel.registerSelectReceiveMode。

AbstractChannel. registerSelectReceiveMode

#AbstractChannel.kt
private fun <R> registerSelectReceiveMode(select: SelectInstance<R>, receiveMode: Int, block: suspend (Any?) -> R) {
    while (true) {
        //如果已经有结果了,则直接返回------->①
        if (select.isSelected) return
        if (isEmptyImpl) {
            //没有发送者在等待,则入队等待,并返回 ------->②
            if (enqueueReceiveSelect(select, block, receiveMode)) return
        } else {
            //直接取出值------->③
            val pollResult = pollSelectInternal(select)
            when {
                pollResult === ALREADY_SELECTED -> return
                pollResult === POLL_FAILED -> {} // retry
                pollResult === RETRY_ATOMIC -> {} // retry
                //调用block------->④
                else -> block.tryStartBlockUnintercepted(select, receiveMode, pollResult)
            }
        }
    }
}

分为4个点,接着来一一分析。

select 同时监听多个值,若是有1个符合要求的数据返回了,那么该isSelected 标记为true,当检测到该标记为true时直接退出。
结合之前的Demo,zbar 已经识别出结果了,当select 检测zxing的结果时直接返回。

#AbstractChannel.kt
private fun <R> enqueueReceiveSelect(
    select: SelectInstance<R>,
    block: suspend (Any?) -> R,
    receiveMode: Int
): Boolean {
    //构造为Node元素
    val node = AbstractChannel.ReceiveSelect(this, select, block, receiveMode)
    //添加到Channel队列里
    val result = enqueueReceive(node)
    if (result) select.disposeOnSelect(node)
    return result
}

当select 时,发现Channel里没有数据,说明Channel还没有开始send,因此构造了Node(ReceiveSelect)加入到Channel queue里。当send数据时,会查找queue里是否有接收者等待,若有则调用Node(ReceiveSelect.completeResumeReceive):

#AbstractChannel.kt
        override fun completeResumeReceive(value: E) {
            block.startCoroutineCancellable(
                if (receiveMode == RECEIVE_RESULT) ChannelResult.success(value) else value,
                select.completion,
                resumeOnCancellationFun(value)
            )
        }

block 被调度执行,最后会恢复select 协程的执行。


取出数据,并尝试恢复send协程。


在③的基础上,拿到数据后,直接执行block(此时并没有切换线程进行调度)。

小结一下select 原理:


image.png

可以看出:

select 本身执行并不耗时,若最终没有数据返回则挂起等待,若是有数据返回则不会挂起协程。

我们从头再捋一下select 配合Channel 的原理:


image.png

虽然以Channel为例讲解了select 原理,实际上async等结合select 原理大致差不多,重点都是利用了协程的挂起/恢复做文章。

5. Select 注意事项

如果select有多个数据同时到达,select 默认会选择第一个数据,若想要随机选择数据,可做如下处理:

            var result = selectUnbiased<String> {
                //监听是否有数据发送过来
                receiveChannelZxing.onReceive { value ->
                    "zxing result $value"
                }
            }

想要知道select 还可以监听哪些数据,可查看该数据是否实现了SelectClauseX(X 表示0、1、2)。

以上即为Select 的原理及其使用,下篇将会进入协程的精华部分:Flow的运用,该部分内容较多,可能会分几篇分析,敬请期待。

本文基于Kotlin 1.5.3,文中完整Demo请点击

您若喜欢,请点赞、关注、收藏,您的鼓励是我前进的动力

持续更新中,和我一起步步为营系统、深入学习Android/Kotlin

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,362评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,330评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,247评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,560评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,580评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,569评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,929评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,587评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,840评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,596评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,678评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,366评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,945评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,929评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,165评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,271评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,403评论 2 342

推荐阅读更多精彩内容