协程上下文源代码
public interface CoroutineContext {
//从该上下文返回具有给定[键]的元素或' null '
public operator fun <E : Element> get(key: Key<E>): E?
//从左到右添加元素
public fun <R> fold(initial: R, operation: (R, Element) -> R): R
//删除此上下文
public operator fun plus(context: CoroutineContext): CoroutineContext =
if (context === EmptyCoroutineContext) this else // fast path -- avoid lambda creation
context.fold(this) { acc, element ->
val removed = acc.minusKey(element.key)
if (removed === EmptyCoroutineContext) element else {
// make sure interceptor is always last in the context (and thus is fast to get when present)
val interceptor = removed[ContinuationInterceptor]
if (interceptor == null) CombinedContext(removed, element) else {
val left = removed.minusKey(ContinuationInterceptor)
if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else
CombinedContext(CombinedContext(left, element), interceptor)
}
}
}
//返回包含来自此上下文的元素的上下文
public fun minusKey(key: Key<*>): CoroutineContext
//上下文元素的键
public interface Key<E : Element>
//上下文的一个元素,本身就是一个单例的协程上下文
public interface Element : CoroutineContext {
/**
* A key of this coroutine context element.
*/
public val key: Key<*>
public override operator fun <E : Element> get(key: Key<E>): E? =
@Suppress("UNCHECKED_CAST")
if (this.key == key) this as E else null
public override fun <R> fold(initial: R, operation: (R, Element) -> R): R =
operation(initial, this)
public override fun minusKey(key: Key<*>): CoroutineContext =
if (this.key == key) EmptyCoroutineContext else this
}
}
通过源码我们可以看出协程上下文是一个跟list类似的数据结构
CoroutineContext 是元素Element的集合,每一个Element都有一个key,同时Element 又实现了CoroutineContext 的接口,它自身也是一个协程的上下文,因此也可以作为集合出现。
协程上下文关键的几个子类
1.协程拦截器
refrofit接口定义
suspend fun getMessage3(@Query("city") city: String): WeatherEntity
转化java代码
Object getMessage3(@Query("city") @NotNull String var1, @NotNull Continuation var2);
我们大胆猜测协程的本质就是回调 + “黑魔法
如何查看Continuation在线程调度过程中做了些什么,这时候就要利用拦截器
调度器就是基于拦截器实现的
public interface ContinuationInterceptor : CoroutineContext.Element {
//上下文拦截器的键
companion object Key : CoroutineContext.Key<ContinuationInterceptor>
//拦截操作,Continuation很重要
public fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>
//释放拦截器
public fun releaseInterceptedContinuation(continuation: Continuation<*>) {
/* do nothing by default */
}
// 重写get方法
public override operator fun <E : CoroutineContext.Element> get(key: CoroutineContext.Key<E>): E? =
@Suppress("UNCHECKED_CAST")
if (key === Key) this as E else null
// 重写minusKey方法
public override fun minusKey(key: CoroutineContext.Key<*>): CoroutineContext =
if (key === Key) EmptyCoroutineContext else this
}
Continuation的源码
public interface Continuation<in T> {
/**
* 与此Continuation相对应的协程上下文
*/
public val context: CoroutineContext
/**
* 继续执行相应的协程,将一个成功或失败的[result]作为最后一个挂起点的返回值
*/
public fun resumeWith(result: Result<T>)
}
我们可以自定义拦截器,看一下具体调用
class MyContinuation<T>(private val continuation: Continuation<T>) :Continuation<T>{
override val context=continuation.context
override fun resumeWith(result: Result<T>) {
log("MyContinuation:$result")
continuation.resumeWith(result)
}
}
class MyContinuationInterceptor: ContinuationInterceptor {
override val key=ContinuationInterceptor
override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =MyContinuation(continuation)
}
suspend fun main(){
GlobalScope.launch(MyContinuationInterceptor()) {
log(1)
val async = async {
log(2)
delay(1000)
log(3)
"hello"
}
log(4)
val result = async.await()
log("5---$result")
}.join()
log(6)
}
打印结果
22:25:29:804 [main] MyContinuation:Success(kotlin.Unit) //①
22:25:29:819 [main] 1
22:25:29:850 [main] MyContinuation:Success(kotlin.Unit) //②
22:25:29:850 [main] 2
22:25:29:897 [main] 4
22:25:30:897 [kotlinx.coroutines.DefaultExecutor] MyContinuation:Success(kotlin.Unit)//③
22:25:30:897 [kotlinx.coroutines.DefaultExecutor] 3
22:25:30:897 [kotlinx.coroutines.DefaultExecutor] MyContinuation:Success(hello)//④
22:25:30:897 [kotlinx.coroutines.DefaultExecutor] 5---hello
22:25:30:897 [kotlinx.coroutines.DefaultExecutor] 6
打印结果分析
所有协程启动都会有一次resumeWith,所以打印①,Result<T>此时为Success(kotlin.Unit)
由于是join,所以打印1
async 又启动一个协程,所以打印②Result<T>,Result<T>此时为Success(kotlin.Unit)
打印2,delay函数是挂起函数
打印4
delay函数的挂起函数恢复继续,打印③,Result<T>此时为Success(kotlin.Unit)
打印3
async.await()是挂起函数,打印④,Result<T>此时为Success(hello)
打印5---hello
打印6
思考为什么从③处开始线程切换
public suspend fun delay(timeMillis: Long) {
if (timeMillis <= 0) return // don't delay
return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->
cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)
}
}
2.协程调度器
public abstract class CoroutineDispatcher :AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
//如果需要执行diapatch方法,返回true
public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true
//由给定的上下文执行Runnable代码块在另外的线程中,此方法不会立即执行
public abstract fun dispatch(context: CoroutineContext, block: Runnable)
@InternalCoroutinesApi
public open fun dispatchYield(context: CoroutineContext, block: Runnable) = dispatch(context, block)
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =DispatchedContinuation(this, continuation)
@InternalCoroutinesApi
public override fun releaseInterceptedContinuation(continuation: Continuation<*>) {
(continuation as DispatchedContinuation<*>).reusableCancellableContinuation?.detachChild()
}
public operator fun plus(other: CoroutineDispatcher) = other
override fun toString(): String = "$classSimpleName@$hexAddress"
}
它本身是协程上下文的子类,同时实现了拦截器的接口, dispatch 方法会在拦截器的方法 interceptContinuation 中调用,进而实现协程的调度
kotlin在Android中提供以下四种CoroutineDispatcher
public actual object Dispatchers {
@JvmStatic
//协程默认的调度器,是线程池,默认等于cpu内核的数量,最少两个
public actual val Default: CoroutineDispatcher = createDefaultDispatcher()
@JvmStatic
//Android当中的主线程即ui线程
public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
@JvmStatic
//无指定派发线程,会根据运行时的上线文环境决定
public actual val Unconfined: CoroutineDispatcher = kotlinx.coroutines.Unconfined
@JvmStatic
//用于执行阻塞线程的IO线程池,默认限制为64或者cpu内核数量(取最大)
public val IO: CoroutineDispatcher = DefaultScheduler.IO
}
也可以自定义CoroutineDispatcher
suspend fun main(){
val dispatcher =
Executors.newSingleThreadExecutor { r -> Thread(r, "MyThread") }.asCoroutineDispatcher()
GlobalScope.launch(dispatcher) {
log(1)
}.join()
log(2)
//由于这个线程池是我们自己创建的,因此我们需要在合适的时候关闭它
dispatcher.close()
}
协程如果运行在多线程中一样会有两个问题
1.线程切换的开销问题
2.多线程安全问题
举例多线程开销问题
suspend fun main(){
val dispatcher = Executors.newFixedThreadPool(10).asCoroutineDispatcher()
GlobalScope.launch (dispatcher){
log(1)
val async = async {
log(2)
delay(1000)
log(3)
"hello"
}
log(4)
val result = async.await()
log("5$result")
}.join()
log(6)
}
打印
21:52:54:234 [pool-1-thread-1] 1
21:52:54:287 [pool-1-thread-1] 4
21:52:54:313 [pool-1-thread-2] 2
21:52:55:339 [pool-1-thread-3] 3
21:52:55:352 [pool-1-thread-4] 5hello
21:52:55:352 [pool-1-thread-4] 6
线程切了四次,挂起函数的继续操作都会切换线程
所以我们在实际开发中要根据具体情况选用合适的CoroutineDispatcher
举例多线程安全问题
suspend fun main(){
val dispatcher = Executors.newFixedThreadPool(10).asCoroutineDispatcher()
var i=0
dispatcher.use {
List(1000000){
GlobalScope.launch(dispatcher) {
i++
}
}.forEach {
it.join()
}
}
log(i)
dispatcher.close()
}
打印
22:14:56:583 [main] 999881
解决方案1 单线程的CoroutineDispatcher操作数据(其他逻辑可以放在多线程,数据操作放在单线程比如此处的i++)
解决方案2 使用kotlin中线程安全的数据结构
suspend fun main(){
val dispatcher = Executors.newFixedThreadPool(10).asCoroutineDispatcher()
var i= AtomicInteger()
dispatcher.use {
List(1000000){
GlobalScope.launch(dispatcher) {
i.incrementAndGet()
}
}.forEach {
it.join()
}
}
log(i)
dispatcher.close()
}
与此相关的数据结构
解决方案3 利用互斥锁
suspend fun main(){
val dispatcher = Executors.newFixedThreadPool(10).asCoroutineDispatcher()
var i=0
val mutex = Mutex()
dispatcher.use {
List(1000000){
GlobalScope.launch(dispatcher) {
// 用锁保护每次自增
mutex.withLock {
i++
}
}
}.forEach {
it.join()
}
}
log(i)
dispatcher.close()
}
解决方案4 协程方式处理线程安全的actor,actor 在高负载下比锁更有效
// 计数器 Actor 的各种类型
sealed class CounterMsg
object IncCounter : CounterMsg() // 递增计数器的单向消息
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // 携带回复的请求
// 这个函数启动一个新的计数器 actor
fun CoroutineScope.counterActor() = actor<CounterMsg> {
var counter = 0 // actor 状态
for (msg in channel) { // 即将到来消息的迭代器
when (msg) {
is IncCounter -> counter++
is GetCounter -> msg.response.complete(counter)
}
}
}
suspend fun main(){
val dispatcher = Executors.newFixedThreadPool(10).asCoroutineDispatcher()
var scope= CoroutineScope(EmptyCoroutineContext)
scope.launch {
val counter = counterActor() // 创建该 actor
dispatcher.use {
List(1000000){
GlobalScope.launch(dispatcher) {
counter.send(IncCounter)
}
}.forEach {
it.join()
}
}
// 发送一条消息以用来从一个 actor 中获取计数值
val response = CompletableDeferred<Int>()
counter.send(GetCounter(response))
log("Counter = ${response.await()}")
counter.close() // 关闭该actor
dispatcher.close()
}.join()
}
第四种稍微偏难,了解即可,本质还是用到kotlin协程里面的SendChannel