一、协程是什么、协程的作用是什么
本质上Kotlin
协程是一个库,就像RxJava
、Butterknife
这些第三方库一样,通过协程我们写代码时会更简便,省去了许多重复、复杂的动作,但这个库有其特别之处,即完全通过编译实现JVM
层的逻辑,就是说协程表面上那些简单的语句经过编译之后,会转化为复杂的JVM
代码(class
字节码),这些在JVM
中执行的代码虽然复杂,但还是我们熟悉的那一套Java
的线程池、订阅、回调逻辑。
就像许多库一样,协程也是为了解决某一类问题而来,即主要用来简化异步编程,可以用同步的方式写出异步执行的代码,这一点比RxJava
的链式编程更加简便优雅。一般异步编程时,最常见的就是使用Callback
,如果回调出现嵌套,代码结构层次会过多且混乱,出现大量模板式的回调处理,而协程不仅能消除大量的模板代码,而且能让异步执行的代码,像同步代码一样,顺序执行,但是又不阻塞当前线程。
举个例子(参考:https://www.cnblogs.com/baiqiantao/p/6442129.html):
需求:查询用户信息 -> 查找该用户的好友列表 ->拿到好友列表后,查找该好友的动态
传统写法:
getUserInfo(new CallBack() {
@Override
public void onSuccess(String user) {
if (user != null) {
System.out.println(user);
getFriendList(user, new CallBack() {
@Override
public void onSuccess(String friendList) {
if (friendList != null) {
System.out.println(friendList);
getFeedList(friendList, new CallBack() {
@Override
public void onSuccess(String feed) {
if (feed != null) {
System.out.println(feed);
}
}
});
}
}
});
}
}
});
采用协程:
val user = getUserInfo()
val friendList = getFriendList(user)
val feedList = getFeedList(friendList)
使用协程,三句话就替代了传统几十行的代码写法,这三行代码是顺序执行的,而且就算这三行代码运行在UI线程(但是必须运行在协程作用域之中)之中,也不会阻塞UI线程的执行(实质是编译器会自动修改代码,加入了线程调度切换逻辑)。
附:getUserInfo、getFriendList、getFeedList
三个函数的定义:
// delay(1000L)用于模拟网络请求
suspend fun getUserInfo(): String {
withContext(Dispatchers.IO) {
delay(1000L)
}
return "BoyCoder"
}
suspend fun getFriendList(user: String): String {
withContext(Dispatchers.IO) {
delay(1000L)
}
return "Tom, Jack"
}
suspend fun getFeedList(list: String): String {
withContext(Dispatchers.IO) {
delay(1000L)
}
return "{FeedList..}"
}
虽然说协程的主要作用是简化异步编程,但是并不是说只能用来异步编程,协程也可以写同步代码,即所有代码都在同一个线程执行,但程序的逻辑会不一定按照程序输写的顺序执行,比如:
fun main() = runBlocking{
//所有代码在主线程执行
println("runBlocking start thread:${Thread.currentThread().id}")
val launch = launch {
println("launch running thread:${Thread.currentThread().id}")
}
println("launch end")
val async = async {
println("async running thread:${Thread.currentThread().id}")
}
println("async end")
println("runBlocking end")
}
输出:
runBlocking start thread:1
launch end
async end
runBlocking end
launch running thread:1
async running thread:1
程序改一下:
fun main() = runBlocking{
//所有代码在主线程执行
println("runBlocking start thread:${Thread.currentThread().id}")
val launch = launch {
delay(1000) //增加一个延时1秒
println("launch running thread:${Thread.currentThread().id}")
}
println("launch end")
val async = async {
println("async running thread:${Thread.currentThread().id}")
}
println("async end")
println("runBlocking end")
}
输出:
runBlocking start thread:1
launch end
async end
runBlocking end
async running thread:1
launch running thread:1
程序再改一下(加入多线程):
fun main() = runBlocking{
println("runBlocking start")
val launch = launch(Dispatchers.IO) { //io线程执行协程
println("launch running")
}
println("launch end")
val async = async(Dispatchers.IO) { //io线程执行协程
println("async running")
}
println("async end")
println("runBlocking end")
}
输出:
runBlocking start
launch end
launch running
async end
runBlocking end
async running
从上面的例子可以发现,程序并不一定会按照代码的顺序执行,其执行顺序受到代码顺序、代码逻辑及执行线程等各种因素的影响,其中的关键点就是Suspend Function
(挂起函数)。
上面的例子只是冰山一角,说明协程非常灵活,所以只有掌握了其中的规律,才能控制程序执行顺序,发挥协程强大的作用。
二、协程的使用
-
创建协程
一般有三种方式,其中两种方式:
launch
函数、async
函数比较常用,而runBlocking
函数只在测试时使用,不常用。-
runBlocking:T
函数顶层函数,创建一个新的协程同时阻塞当前线程,直到其内部所有逻辑以及子协程所有逻辑全部执行完成,返回值是泛型
T
,一般在项目中不会使用,主要是为main
函数和测试设计的。比如:
fun main() = runBlocking{ //代码体 }
或者
fun main() { runBlocking { //代码体 } }
-
launch
函数CoroutineScope
的扩展函数,所以launch
必须使用CoroutineScope
对象直接调用或者在协程作用域之中调用。launch
能创建一个新的协程,不会阻塞当前线程,它返回的是一个该协程任务的引用,即Job
对象。这是最常用的用于创建启动协程的方式。kotlin
协程标准库里面提供了一个全局协程作用域对象GlobalScope
,来方便创建协程,其继承自CoroutineScope
,但是因为其生命周期是process
级别的,只有APP
进程销毁了,才会被取消,为了不造成内存泄露,一般在测试时使用。比如:(参考:https://blog.csdn.net/m0_37796683/article/details/119106967)
fun launchTest() { println("start") //创建一个全局作用域协程,不会阻塞当前线程,生命周期与应用程序一致 GlobalScope.launch { //在这1000毫秒内该协程所处的线程不会阻塞 //协程将线程的执行权交出去,该线程继续干它要干的事情,到时间后会恢复至此继续向下执行 delay(1000)//1秒无阻塞延迟(默认单位为毫秒) println("GlobalScope.launch") } println("end")//主线程继续,而协程被延迟 }
或者
fun launchTest2() { println("start") GlobalScope.launch { delay(1000) println("CoroutineScope.launch") //在协程内创建子协程 launch { delay(1500)//1.5秒无阻塞延迟(默认单位为毫秒) println("launch 子协程") } } println("end") }
也可以自行创建一个
CoroutineScope
对象,然后使用launch
函数创建协程fun launchTest3() { println("start") //开启一个IO模式的协程,通过协程上下文创建一个CoroutineScope对象,需要一个类型为CoroutineContext的参数 val job = CoroutineScope(Dispatchers.IO).launch { delay(1000)//1秒无阻塞延迟(默认单位为毫秒) println("CoroutineScope.launch") } println("end")//主线程继续,而协程被延迟 }
Android
提供了适合在Android
中使用的协程作用域MainScope
,它是一个在UI主线程执行的协程作用域,可通过cancel
对协程进行取消(注:Android
中提供的可直接使用的协程作用域对象都需要添加相应的依赖才能使用)参考:https://www.cnblogs.com/bingxinshuo/p/11717209.html),
比如:
private fun launchFromMainScope() { val mainScope = MainScope() mainScope.launch { val deferred = async(Dispatchers.IO) { // network request delay(3000) "Get it" } val text = deferred.await() } }
或者:
class BasicCorotineActivity : AppCompatActivity(), CoroutineScope by MainScope() { ...... private fun launchFromMainScope() { launch { val deferred = async(Dispatchers.IO) { // network request delay(3000) "Get it" } mainScope.text = deferred.await() Toast.makeText(applicationContext, "MainScope", Toast.LENGTH_SHORT).show() } } ...... override fun onDestroy() { super.onDestroy() cancel() //在 onDestroy() 中取消协程 } ...... }
Android
也提供了ViewModel
中直接能使用的viewModelScope
,比如:fun getMessageByViewModel() { viewModelScope.launch { val deferred = async(Dispatchers.IO) { getMessage("ViewModel Ktx") } mMessage.value = deferred.await() } }
好处:当
ViewModel.onCleared()
被调用的时候,viewModelScope
会自动取消作用域内的所有协程。Android
还提供了在Activity/Fragment
等生命周期组件中直接能使用的lifecycleScope
,比如:fun getMessageByLifeCycle(lifecycleOwner: LifecycleOwner) { lifecycleOwner.lifecycleScope.launch { val deferred = async(Dispatchers.IO) { getMessage("LifeCycle Ktx") } mMessage.value = deferred.await() } }
好处:当
LifeCycle
回调onDestroy()
时,协程作用域lifecycleScope
会自动取消另外
Kotlin
为LiveData
赋予了直接使用协程的能力,比如:val user: LiveData<User> = liveData { val data = database.loadUser() // loadUser is a suspend function. emit(data) }
-
async
函数async
类似于launch
,也是CoroutineScope
的扩展函数,也是创建一个不会阻塞当前线程的新的协程。它们区别在于:async
的返回是Deferred
对象,可通过Deffer.await()
等待协程执行完成并获取结果,而launch
不行。常用于并发执行-同步等待和获取返回值的情况。比如:
//获取返回值 fun asyncTest1() { println("start") GlobalScope.launch { val deferred: Deferred<String> = async { //协程将线程的执行权交出去,该线程继续干它要干的事情,到时间后会恢复至此继续向下执行 delay(2000)//2秒无阻塞延迟(默认单位为毫秒) println("asyncOne") "HelloWord"//这里返回值为HelloWord } //等待async执行完成获取返回值,此处并不会阻塞线程,而是挂起,将线程的执行权交出去 //等到async的协程体执行完毕后,会恢复协程继续往下执行 val result = deferred.await() println("result == $result") } println("end") }
当在协程作用域中使用
async
函数时可以创建并发任务:fun asyncTest2() { println("start") GlobalScope.launch { val time = measureTimeMillis {//计算执行时间 val deferredOne: Deferred<Int> = async { delay(2000) println("asyncOne") 100//这里返回值为100 } val deferredTwo: Deferred<Int> = async { delay(3000) println("asyncTwo") 200//这里返回值为200 } val deferredThr: Deferred<Int> = async { delay(4000) println("asyncThr") 300//这里返回值为300 } //等待所有需要结果的协程完成获取执行结果 val result = deferredOne.await() + deferredTwo.await() + deferredThr.await() println("result == $result") } println("耗时 $time ms") } println("end") }
- 补充说明
实际上在Continuation.kt文件中,有2个基础函数
public fun <T> (suspend () -> T).createCoroutine( completion: Continuation<T> ): Continuation<Unit> = SafeContinuation(createCoroutineUnintercepted(completion).intercepted(), COROUTINE_SUSPENDED) public fun <T> (suspend () -> T).startCoroutine( completion: Continuation<T> ) { createCoroutineUnintercepted(completion).intercepted().resume(Unit) }
- 补充说明
createCoroutine和startCoroutine就是用来创建和启动协程的基础API,launch、async等在底层一定程度上都使用了该基础API,launch和async只不过是封装而已,实际上也可以直接用它们来创建和启动协程,比如:
val continuation = object : Continuation<String> { override val context: CoroutineContext get() = EmptyCoroutineContext override fun resumeWith(result: Result<String>) { println("结果: ${result.getOrNull()}") } } //方式1 val block = suspend { ... "结果" } block.startCoroutine(continuation) //方式2 suspend fun getUserName(): String { ... return "结果" } (::getUserName).startCoroutine(continuation)
-
也可以这样,比如 :
suspend {
coroutineScope {
println("${Thread.currentThread().name}")
return@coroutineScope 100
}
// 在这里创建一个对象 Continuation
}.startCoroutineCancellable(object : Continuation<Int> {
override val context: CoroutineContext
get() = EmptyCoroutineContext
override fun resumeWith(result: Result<Int>) {
println("${Thread.currentThread().name}: ${result.getOrNull()}")
}
})
//或者
val continuation = suspend {
println("in coroutine")
5
}.createCoroutine(object : Continuation<Int> {
override val context: CoroutineContext
get() = Dispatchers.IO
override fun resumeWith(result: Result<Int>) {
println("resumeWith result = $result")
}
})
continuation.resume(Unit)
-
“创建一个新的协程”这句话里的“协程”是什么?
上面谈到协程本质上是一个库,但我们常说创建一个新的协程,那么这个创建的协程指的是什么呢?
从
launch
函数的定义说起:// CoroutineScope的扩展方法 public fun CoroutineScope.launch( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> Unit ): Job { // 根据当前上下文,计算得到新的上下文 val newContext = newCoroutineContext(context) // 根据启动模式,创建不同的续体 val coroutine = if (start.isLazy) LazyStandaloneCoroutine(newContext, block) else StandaloneCoroutine(newContext, active = true) // 启动协程 coroutine.start(start, coroutine, block) return coroutine }
我的理解:协程就是
block
所代表的一段lambda
代码块,即runBlocking{//代码块}
、launch {//代码块}
、async{//代码块}
这三个函数调用的lambda
代码块,当然这些代码块携带了关键信息,用来标记协程。我们可以看到这三个函数可以互相嵌套,说明协程有父子关系,协程是否是父子关系,以及如何区分不同的协程,最关键的一点是CoroutineScope
(协程作用域),但实质还是由CoroutineContext
(协程上下文)决定的。//CoroutineScope 只是一个接口,里面只有一个CoroutineContext类型的变量 public interface CoroutineScope { public val coroutineContext: CoroutineContext }
协程与线程是什么关系?
上面讲到协程就是一段代码块,所以协程是一个编程语言级别的概念,不像线程、进程,是操作系统级别的概念。
它们之间没有可比性,就好比有人问,线程与
android
中的Service
是什么关系这种问题一样,其实没什么可比性。硬要说的话,协程与线程的关系就是协程可以运行在某个线程中,但协程不属于某个线程,因为它可以运行在不同的线程之中,能根据需要进行切换。
结构化并发的概念
全局的GlobalScope
是一个作用域,每个协程自身也是一个作用域,新建的协程与它的父作用域存在一个级联的关系,也就是一个父子关系层次结构。
所以协程与线程不同,Kotlin
中的协程就是 结构化并发
:每个并发操作都是在处理一个任务,它可能属于某个父任务,也可能有自己的子任务。每个任务拥有自己的生命周期,子任务的生命周期理应继承父任务的生命周期
作用域间可并列 或 包含,组成一个树状结构,这就是Kotlin
协程中的结构化并发。
结构化并发有几个好处:
- 当任务正在执行时,可以追踪这个任务
- 当任务不需要执行时,可以取消这个任务
- 当任务失败时,可以发出错误信号表明有错误发生
- 统一处理并发任务,避免任务泄漏
说白了就是因为结构化并发,协程相比线程来说,管理更方便
比如:通过Job
对象能管理协程;取消父协程,子协程也能被取消;取消协程作用域,里面的协程可以全部被取消
-
启动协程
通过
launch
、async
和runBlocking
在创建协程的同时默认会立即启动协程,也可以通过指定start
参数来指定协程的启动模式。有四种启动模式:Default
:协程创建后立即开始调度。在调度前如果协程被取消。将其直接进入取消相应的状态。不指定start
参数时,默认用此模式ATOMIC
:协程创建后。立即开始调度。协程执行到第一个挂起点之前不响应取消。LAZY
:只有协程被需要时,包括主动调用协程的start
、join
或者await
等函数时,才会开始调度,如果调度前就被取消。那么该协程将直接进入异常状态。UNDISPATCHED
:协程创建后立即在当前函数调用栈中执行,直到遇到第一个真正挂起的点。四种方式,分别举例说明:
runBlocking { val job = launch(start = CoroutineStart.DEFAULT) { Log.d("~~~", "start") delay(5000) Log.d("~~~", "done") } job.cancel() }
运行这段程序会发现没有任何输出,因为协程在调度前就被取消了,没有来得及执行
runBlocking {
val job = launch(start = CoroutineStart.ATOMIC) {
Log.d("~~~", "start")
delay(5000)
Log.d("~~~", "done")
}
job.cancel()
}
运行这段程序会发现输出了 start
,表明协程在遇到 delay()
这个挂起点时才被 cancel
runBlocking {
val job = launch(start = CoroutineStart.LAZY) {
Log.d("~~~", "start")
delay(5000)
Log.d("~~~", "done")
}
job.start()
}
在这段代码中,如果不调用 job.start()
,将不会有任何输出。只有调用了 job.start()
后,程序才能正常执行
GlobalScope.launch(Dispatchers.Main) {
launch(context = Dispatchers.IO, start = CoroutineStart.UNDISPATCHED) {
Log.d("~~~", "start ${Thread.currentThread().name}")
delay(5000)
Log.d("~~~", "done ${Thread.currentThread().name}")
}
}
运行这段程序,输出如下:
~~~: start main
~~~: done DefaultDispatcher-worker-1
在遇到挂起点前,也就是 delay()
函数执行前,协程中的代码块是在主线程中执行的,并没有切换到 Dispatchers.IO
调度器中执行。直到运行到 delay()
函数时,协程才会切换到 Dispatchers.IO
调度器中去执行
理解:挂起点一般是suspend
函数、yield
函数、isActive
变量或者其它,就像线程也不能随时在某条代码执行时取消,也要通过变量判断或者发生异常时取消,协程也一样,需要一个判断点,上面说到的suspend
函数、yield
函数、isActive
变量就是判断点。
-
协程暂停与恢复
协程默认不支持暂停与恢复,但是可以通过
suspendCancellableCoroutine
来间接的实现。一般不太推荐使用,因为有其他很好的方法实现,比如用一个协程串行执行,或者并发协程然后使用协程的通信来传递,或者用线程+队列也能做等等。
suspendCancellableCoroutine
本意是让回调也能兼容协程,这也是它最大的应用场景(后面会有介绍) -
取消协程
一般有两种方式:
Job.cancel()、CoroutineScope.cancel()
(另外,协程发生异常也会取消协程,属于被动取消)
1.
Job.cancel()
launch
、async
都会返回Job
接口对象(async
的返回是Deferred
对象,而Deferred
继承于Job
),Job
接口有一个cancel
方法,可以用来取消协程,也可以使Job
挂起的函数cancelAndJoin
它合并了对cancel
以及join
的调用fun main() = runBlocking { val job = launch { repeat(1000) { i -> println("job: I'm sleeping $i ...") delay(500L) } } delay(1300L) // delay a bit println("main: I'm tired of waiting!") job.cancel() // cancels the job job.join() // waits for job's completion println("main: Now I can quit.") }
输出:
job: I'm sleeping 0 ... job: I'm sleeping 1 ... job: I'm sleeping 2 ... main: I'm tired of waiting! main: Now I can quit.
协程的取消是协作的,⼀段协程代码必须协作才能被取消,所有协程中的挂起函数都是可被取消的 。(它们检查协程的取消,并在取消时抛出 CancellationException
)
如果协程正在执行计算任务,并且没有检查取消的话,那么它是不能被取消的。因为与线程类似,协程Job.cancel()
函数仅仅只是将state
值改变而已
所以,外部对当前正在运行的协程的取消,协程不会立即取消,当下面两种情况之一发生时,协程才会取消
- 该协程的配合检查,协同进行取消,这和停止一个线程的执行类似(需要线程的配合检查)
- 当协程
suspend
的时候,协程也会被取消
fun main() = runBlocking {
val startTime = System.currentTimeMillis()
val job = launch(Dispatchers.Default) {
var nextPrintTime = startTime
var i = 0
while (i < 5) { // ⼀个执⾏计算的循环,只是为了占⽤ CPU
if (System.currentTimeMillis() >= nextPrintTime) {
println("job: I'm sleeping ${i++} ...")
nextPrintTime += 500L
}
}
}
delay(1300L) // 等待⼀段时间
println("main: I'm tired of waiting!")
job.cancelAndJoin() // 取消⼀个作业并且等待它结束
println("main: Now I can quit.")
}
输出:
job: I'm sleeping 0 ...
job: I'm sleeping 1 ...
job: I'm sleeping 2 ...
main: I'm tired of waiting!
job: I'm sleeping 3 ...
job: I'm sleeping 4 ...
main: Now I can quit.
有两种方法来使执行计算的代码可以被取消
- 定期调用挂起函数来检查取消。可以用
yield
- 显式的检查取消状态
fun main() = runBlocking {
val startTime = System.currentTimeMillis()
val job = launch(Dispatchers.Default) {
var nextPrintTime = startTime
var i = 0
while (isActive) { // ⼀个执行计算的循环,只是为了占用 CPU
// while (i < 5) {
// yield()
if (System.currentTimeMillis() >= nextPrintTime) {
println("job: I'm sleeping ${i++} ...")
nextPrintTime += 500L
}
}
}
delay(1300L) // 等待⼀段时间
println("main: I'm tired of waiting!")
job.cancelAndJoin() // 取消⼀个作业并且等待它结束
println("main: Now I can quit.")
}
可被取消的挂起函数,在被取消时抛出 CancellationException
,可用try {……} finally {……}
表达式以及 Kotlin
的 use
函数⼀般在协程被取消的时候执行它们的终结动作:
fun main() = runBlocking {
val job = launch {
try {
repeat(1000) { i ->
println("job: I'm sleeping $i ...")
delay(500L)
}
} catch (e: CancellationException) {
println("e:${e.message}")
} finally {
println("job: I'm running finoally")
}
}
delay(1300L)
println("main: I'm tired of waiting!")
job.cancelAndJoin()
println("main: Now I can quit.")
}
输出:
job: I'm sleeping 0 ...
job: I'm sleeping 1 ...
job: I'm sleeping 2 ...
main: I'm tired of waiting!
e:StandaloneCoroutine was cancelled
job: I'm running finoally
main: Now I can quit.
现在有一个问题:对于大多数资源的关闭和回收(比如关闭文件、取消job
等),都是瞬间的动作,都不会是阻塞的行为。可能在极少数情况下,关闭和回收的操作是阻塞的,是需要调用挂起函数的,但是在finally
中,如果协程已经被取消,那么此时对于挂起函数的调用,都会抛出一个CancellationException
的异常。那么这种情况下,我们又该如何去处理?可以使用withContext(NonCancellable)
,它能让挂起函数不被取消,比如:
fun main() = runBlocking{
//使用withContext(NonCancellable)后,即使取消,它里面的内容都会执行完,不使用的话,就会只delay后面就不执行了
val job = launch {
try {
repeat(1000) {
println("job: I'm sleeping $it")
delay(1000L)
}
}finally {
println("job: I'm running finally")
withContext(NonCancellable){
delay(1000L)
}
println("job: And I'v just delayed for 1 sec beca")
}
}
delay(1300L)
println("main: I'm tired of waiting!")
job.cancelAndJoin()
println("main: Now I can quit.")
}
运行结果:
job: I'm sleeping 0
job: I'm sleeping 1
main: I'm tired of waiting!
job: I'm running finally
job: And I'v just delayed for 1 sec beca
main: Now I can quit.
父协程取消,一般情况会取消子协程:
fun main(){
//创建一个Job,当然你也可以启动一个协程后返回
val job = GlobalScope.launch {
println("父协程开始")
//启动一个子协程
println("启动一个子协程")
launch {
println("子协程开始")
Thread.sleep(200)
println("子协程完成")
}
Thread.sleep(100)
println("父协程完成")
}
println("开始取消父协程")
job.cancel()
println("结束取消父协程")
TimeUnit.SECONDS.sleep(1)
println("结束")
}
运行结果:
开始取消父协程
结束取消父协程
结束
父协程取消时会取消子协程,而子协程收到取消后会递归的取消子协程和自身。
如果改一下程序:
fun main() = runBlocking{
//创建一个Job,当然你也可以启动一个协程后返回
val job = GlobalScope.launch {
println("父协程开始")
//启动一个子协程
println("启动一个子协程")
launch {
println("子协程开始")
Thread.sleep(200)
println("子协程完成")
}
Thread.sleep(100)
println("父协程完成")
}
println("开始取消父协程")
job.cancel()
println("结束取消父协程")
TimeUnit.SECONDS.sleep(1)
println("结束")
}
运行结果:
开始取消父协程
父协程开始
启动一个子协程
子协程开始
结束取消父协程
父协程完成
子协程完成
结束
或者:
开始取消父协程
父协程开始
启动一个子协程
结束取消父协程
父协程完成
结束
为什么main
包一层runBlocking
后,协程就取消不了了,而且执行结果还会变化,打印出isActive
再来看看
fun main() = runBlocking{
//创建一个Job,当然你也可以启动一个协程后返回
val job = GlobalScope.launch {
println("父协程开始 isActive:$isActive")
//启动一个子协程
println("启动一个子协程")
launch {
println("子协程开始 isActive:$isActive")
Thread.sleep(200)
println("子协程完成 isActive:$isActive")
}
Thread.sleep(100)
println("父协程完成 isActive:$isActive")
}
println("开始取消父协程")
job.cancel()
println("结束取消父协程")
TimeUnit.SECONDS.sleep(1)
println("结束")
}
运行结果有两种情况:
开始取消父协程
父协程开始 isActive:true
启动一个子协程
结束取消父协程
父协程完成 isActive:false
结束
或者
开始取消父协程
父协程开始 isActive:true
启动一个子协程
子协程开始 isActive:true
结束取消父协程
父协程完成 isActive:false
子协程完成 isActive:false
结束
为什么会这样, 主要是因为程序是多线程的,job.cancel()
执行前如果子协程已运行,则无法取消,如果还未运行,则取消成功,如何解决这种问题呢,可以在关键地方加上挂起函数或者判断点,比如:
fun main() = runBlocking{
//创建一个Job,当然你也可以启动一个协程后返回
val job = GlobalScope.launch {
//增加了一个挂起函数,所以job.cancel()会执行完成,delay时,会感知协程已取消,后面的代码不会再执行
delay(100)
println("父协程开始 isActive:$isActive")
//启动一个子协程
println("启动一个子协程")
launch {
println("子协程开始 isActive:$isActive")
Thread.sleep(200)
println("子协程完成 isActive:$isActive")
}
Thread.sleep(100)
println("父协程完成 isActive:$isActive")
}
println("开始取消父协程")
job.cancel()
println("结束取消父协程")
TimeUnit.SECONDS.sleep(1)
println("结束")
}
运行结果:
开始取消父协程
结束取消父协程
结束
2.CoroutineScope.cancel()
使用CoroutineScope.cancel()
则可以一次性取消该协程上下文创建的所有协程和子协程,一旦取消协程作用域,将不能使用该作用域去启动新的协程,其实CoroutineScope.cancel()
最终使用的也是Job.cancel()
取消协程。
(参考:https://blog.csdn.net/gqg_guan/article/details/126225574)
val scope = CoroutineScope(Job() + Dispatchers.IO + CoroutineName("Top Scope"))
bn1.setOnClickListener {
scope.launch {
Thread.sleep(2000)
Log.d(TAG, "onCreate: $isActive")
Log.d(TAG, "onCreate: ${threadName()},${coroutineContext[CoroutineName]?.name}")
}
}
bn2.setOnClickListener {
scope.cancel()
}
假如我们只点击bn1
开启协程,但是不点击bn2
去取消协程,那么输出为
D/abcde: onCreate: true
D/abcde: onCreate: DefaultDispatcher-worker-1,Top Scope
假设我们点击bn1
开启协程后,立即点击bn2
取消协程(此时协程仍然在Thread.sleep
期间),那么输出为
D/abcde: onCreate: false
D/abcde: onCreate: DefaultDispatcher-worker-2,Top Scope
可以看到,协程的isActive
的值变为false
,但是协程仍然会执行(虽然之后无法通过scope
再去启动新的协程)。
在上面的代码中,当调用了scope.cancel
(内部调用了job.cancel
)的时候,协程会进入Cancelling
状态,当协程内所有的工作都完成了,协程会进入 Cancelled
状态。
修改下上面的例子:
val scope = CoroutineScope(Job() + Dispatchers.IO + CoroutineName("Top Scope"))
bn1.setOnClickListener {
scope.launch {
Thread.sleep(2000)
Log.d(TAG, "onCreate: $isActive")
// 检查协程是否取消
ensureActive()
Log.d(TAG, "onCreate: ${threadName()},${coroutineContext[CoroutineName]?.name}")
}
}
bn2.setOnClickListener {
scope.cancel()
}
我们点击bn1
开启协程后,立即点击bn2
取消协程(此时协程仍然在Thread.sleep
期间),那么输出为
D/abcde: onCreate: false
可以看到,当前协程内部的ensureActive()
函数配合外部的cancel
操作,成功地将协程取消了。
外部对协程cancel
之后,运行的协程被suspend
的时候,协程也会被取消。
再改造一下上面的例子:
val scope = CoroutineScope(Job() + Dispatchers.IO + CoroutineName("Top Scope"))
bn1.setOnClickListener {
scope.launch {
Thread.sleep(2000)
Log.d(TAG, "onCreate: $isActive")
withContext(Dispatchers.Main) {
Log.d(TAG,
"onCreate: ${threadName()},${coroutineContext[CoroutineName]?.name}")
}
}
}
bn2.setOnClickListener {
scope.cancel()
}
假如我们只点击bn1
开启协程,但是不点击bn2
去取消协程,那么输出为
D/abcde: onCreate: true
D/abcde: onCreate: main,Top Scope
假设我们点击bn1
开启协程后,立即点击bn2
取消协程(此时协程仍然在Thread.sleep
期间),那么输出为
D/abcde: onCreate: false
可以看出,withContext
在suspend
当前协程的时候,协程被取消了。
就是说协程里所有suspend
函数都是可取消的,当协程cancel
时,遇到suspend
函数,协程会被取消。
因为协程调度器 CoroutineDispatcher
在继续正常执行之前会检查协程对应的 Job
的状态,如果 Job
被取消了,那么 CoroutineDispatcher
会终止正常的执行,并且调用相应的 cancellation handlers
,但是已经检查完毕后,在检查下一个suspend
函数前,不会被取消,下面是一个例子:
var job: Job? = null
// 启动协程
binding.start.setOnClickListener {
job = scope.launch {
withContext(Dispatchers.IO){
Thread.sleep(1000)
Log.d(TAG, "1")
}
Log.d(TAG, "2")
}
}
// 取消协程
binding.cancel.setOnClickListener {
job?.cancel()
}
先点击按钮启动协程,在协程的 Thread.sleep
执行期间,点击按钮取消协程,那么输出为:
D/abcde: 1
-
线程切换
协程是通过
Dispatchers
调度器来控制线程切换的,从使用上来讲,调度器就是我们使用的Dispatchers.Main
、Dispatchers.Default
、Dispatcher.IO
等Dispatchers.Main
:Android
中的主线程Dispatchers.IO
:针对磁盘和网络 IO 进行了优化,适合 IO 密集型的任务,比如:读写文件,操作数据库以及网络请求,它会和Dispatchers.Default
共享线程池Dispatchers.Default
:适合 CPU 密集型的任务,比如计算,它会使用后台共享的线程池Dispatchers.Unconfined
:不推荐使用。这个协程调度器会在调用者线程内启动协程, 但只会持续运行到第一次挂起点为止。在挂起之后, 它会在哪个线程内恢复执行, 完全由被调用的挂起函数来决定
如何指定或切换线程
1.创建协程作用域时指定Dispatchers
调度器(如不指定,默认使用Dispatchers.Default
)
override fun onCreate(savedInstanceState: Bundle?) {
......
test()
}
fun test() {
val coroutineScope = CoroutineScope(Job())
coroutineScope.launch {
println("Thread : ${Thread.currentThread().name}")
}
}
运行结果:
Thread : DefaultDispatcher-worker-1
test()
改一下:
fun test() {
val coroutineScope = CoroutineScope(Job() + Dispatchers.Main)
coroutineScope.launch {
println("Thread : ${Thread.currentThread().name}")
}
}
运行结果:
Thread : main
2.创建协程时,指定Dispatchers
调度器,如不指定,使用创建协程所在作用域的Dispatchers
test()
再改一下:
fun test() {
val coroutineScope = CoroutineScope(Job() + Dispatchers.Main)
//虽然coroutineScope指定了Dispatchers.Main,但是launch时又指定了Dispatchers.IO
coroutineScope.launch(Dispatchers.IO) {
println("launch Thread : ${Thread.currentThread().name}")
}
//async继承了coroutineScope的调度器
coroutineScope.async {
println("async Thread : ${Thread.currentThread().name}")
}
}
运行结果:
launch Thread : DefaultDispatcher-worker-1
async Thread : main
3.使用withContext
函数切换
withContext
必须在协程或者suspend
函数中调用,否则会报错。当需要切换线程时,必须显示指定代码块所运行的线程,它会阻塞当前上下文线程,有返回值,会返回代码块的最后一行的值。
fun test() {
val coroutineScope = CoroutineScope(Job() + Dispatchers.Main)
coroutineScope.launch(Dispatchers.IO) {
println("launch Thread : ${Thread.currentThread().name}")
withContext(Dispatchers.Main) {
println("launch Dispatchers Thread : ${Thread.currentThread().name}")
}
}
coroutineScope.async {
println("async Thread : ${Thread.currentThread().name}")
withContext(Dispatchers.IO) {
println("async Dispatchers Thread : ${Thread.currentThread().name}")
}
}
}
运行结果:
launch Thread : DefaultDispatcher-worker-1
async Thread : main
launch Dispatchers Thread : main
async Dispatchers Thread : DefaultDispatcher-worker-1
上面的例子说明同一个协程可以运行在不同的线程之中,比如协程可以在⼀个线程上挂起并在其它线程上恢复,这也印证了之前对协程的理解,即协程就是block
所代表的一段lambda
代码块,但是编译器会将代码块自动进行分段,将不同的分段根据情况运行在不同的线程之中或者运行在同一个线程之中。
-
协程返回值
获取协程的返回值,一般有三种方式:
async
函数、suspendCoroutine
函数、suspendCancellableCoroutine
函数1.async
使用
async
开启协程,然后调用async
返回的Deferred
对象 的await()
方法,即可获取async
协程运算的结果CoroutineScope(Dispatchers.Default).launch { val job = async { println("async 正在执行") return@async "返回值" } delay(1000) println("async 返回结果:${job.await()}") }
运行结果:
async 正在执行 async 返回结果:返回值
使用
async
和await
可实现协程高效并发,比如:private suspend fun intValue1(): Int { delay(1000) return 1 } private suspend fun intValue2(): Int { delay(2000) return 2 } fun main() = runBlocking { val elapsedTime = measureTimeMillis { val value1 = intValue1() val value2 = intValue2() println("the result is ${value1 + value2}") } println("the elapsedTime is $elapsedTime") }
运行结果:
the result is 3 the elapsedTime is 3018
使用
async
和await
改善代码:private suspend fun intValue1(): Int { delay(1000) return 1 } private suspend fun intValue2(): Int { delay(2000) return 2 } fun main() = runBlocking { val elapsedTime = measureTimeMillis { val value1 = async { intValue1() } val value2 = async { intValue2() } println("the result is ${value1.await() + value2.await()}") } println("the elapsedTime is $elapsedTime") }
运行结果:
the result is 3 the elapsedTime is 2020
await
的特点:不会阻塞当前线程;会等待,当计算完毕时,恢复执行;会返回结果值或者由于被取消而对应的异常
2.suspendCoroutine
suspendCoroutine
只是一个挂起函数,无法开启协程,所以需要在其它协程作用域里面使用,suspendCoroutine
返回的对象就是返回值,需要在回调中将结果值传入到Coutination
的resume
方法(或者resumeWithException
或resumeWith
)中,
比如:
fun main() = runBlocking {
val job = CoroutineScope(Dispatchers.Default).launch {
val result = suspendCoroutine<String> {
println("suspendCoroutine 正在执行")
it.resume("返回值")
}
println("suspendCoroutine 返回结果:$result")
}
job.join()
}
运行结果:
suspendCoroutine 正在执行
suspendCoroutine 返回结果:返回值
resumeWithException
提供了稍微优化一点的抛异常方式
fun main() = runBlocking {
try {
val result = suspendCoroutine<String> {
println("suspendCoroutine 正在执行")
it.resumeWithException(Exception("我是异常"))
}
println("suspendCoroutine 执行成功,返回结果:$result")
} catch (e: java.lang.Exception) {
println("suspendCoroutine 执行失败,返回异常:$e")
}
}
运行结果:
suspendCoroutine 正在执行
suspendCoroutine 执行失败,返回异常:java.lang.Exception: 我是异常
关于resumeWith
,实际上resume
与resumeWithException
最终都是调用的resumeWith
,见定义:
public inline fun <T> Continuation<T>.resume(value: T): Unit =
resumeWith(Result.success(value))
public inline fun <T> Continuation<T>.resumeWithException(exception: Throwable): Unit =
resumeWith(Result.failure(exception))
比如:
fun main() = runBlocking {
val job = CoroutineScope(Dispatchers.Default).launch {
val result = suspendCoroutine<String> {
println("suspendCoroutine 正在执行")
it.resumeWith(Result.success("返回结果"))
}
println("suspendCoroutine 返回结果:$result")
}
job.join()
}
运行结果:
suspendCoroutine 正在执行
suspendCoroutine 返回结果:返回结果
suspendCoroutine的作用
suspendCoroutine
可以用于将基于回调的API转换为协程
suspendCoroutine
可以取代回调函数,即可以直接返回值,而不是通过一个接口的回调函数来返回值(用withContext
是不是更好一点)
比如使用回调:
interface SingleMethodCallback {
fun onCallBack(value: String)
}
/**
* 模拟一个耗时操作
*/
private fun runTask(callback: SingleMethodCallback) {
thread {
Thread.sleep(1000)
callback.onCallBack("result")
}
}
//调用runTask方法,传入SingleMethodCallback的实现
private fun runTaskDefault() {
runTask(object : SingleMethodCallback {
override fun onCallBack(value: String) {
println("value is $value")
}
})
}
fun main() {
runTaskDefault()
}
运行结果:
value is result
使用suspendCoroutine
进行改造:
interface SingleMethodCallback {
fun onCallBack(value: String)
}
/**
* 模拟一个耗时操作
*/
private fun runTask(callback: SingleMethodCallback) {
thread {
Thread.sleep(1000)
callback.onCallBack("result")
}
}
//改造一下runTaskDefault ---> runTaskWithSuspend
suspend fun runTaskWithSuspend(): String {
// suspendCoroutine是一个挂起函数
return suspendCoroutine { continuation ->
runTask(object : SingleMethodCallback {
override fun onCallBack(value: String) {
continuation.resume(value)
}
})
}
}
fun main() = runBlocking {
val result = runTaskWithSuspend()
println("result is $result")
println("result println end")
delay(5000)
}
运行结果:
result is result
result println end
如果runTaskWithSuspend
运行在UI线程呢,比如:
override fun onCreate(savedInstanceState: Bundle?) {
......
test()
println("ddd test end")
}
fun test() {
val coroutineScope = CoroutineScope(Job() + Dispatchers.Main)
coroutineScope.launch {
val result = runTaskWithSuspend()
println("ddd result is $result")
println("ddd result println end")
}
}
运行结果:
ddd test end
ddd result is result
ddd result println end
可以看出suspendCoroutine
运行耗时动作时,其所在的协程会被挂起,协程后面的动作会继续执行。
异常处理有模板,可参考:https://blog.csdn.net/catzifeng/article/details/109262842
3.suspendCancellableCoroutine
当我们使用 suspendCoroutine
时,若该协程已被 cancel()
,调用 resume()
也是会正常返回值的,比如:
fun main() = runBlocking {
val job = CoroutineScope(Dispatchers.Default).launch {
val result = suspendCoroutine<String>{
println("suspendCoroutine 正在执行")
cancel()
it.resume("返回值")
}
println("suspendCoroutine 执行成功,返回结果:$result")
}
job.join()
}
运行结果:
suspendCoroutine 正在执行
suspendCoroutine 执行成功,返回结果:返回值
但是,这并不是我们想要的,因为都已经 cancel()
了,说明就不希望再要该返回值了。为了处理这种情况,我们可以考虑使用 suspendCancellableCoroutine
。
fun main() = runBlocking {
val job = CoroutineScope(Dispatchers.Default).launch {
try{
val result = suspendCancellableCoroutine<String>{
println("suspendCancellableCoroutine 正在执行")
cancel()
it.resume("返回值")
}
println("suspendCancellableCoroutine 执行成功,返回结果:$result")
}catch (e: java.lang.Exception){
println("suspendCancellableCoroutine 执行失败,返回异常:$e")
}
}
job.join()
}
运行结果:
suspendCancellableCoroutine 正在执行
suspendCancellableCoroutine 执行失败,返回异常:kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job=StandaloneCoroutine{Cancelling}@7604cd0a
另外,若使用 suspendCancellableCoroutine
的话,其 resume()
方法还有另外一个重载方法:
public fun resume(value: T, onCancellation: ((cause: Throwable) -> Unit)?)
可以通过 onCancellation
进行一些快速操作:
fun main() = runBlocking {
val job = CoroutineScope(Dispatchers.Default).launch {
try{
val result = suspendCancellableCoroutine<String>{
println("suspendCancellableCoroutine 正在执行")
cancel()
it.resume("返回值"){ cause->
println("suspendCancellableCoroutine 被取消了,cause:$cause")
}
}
println("suspendCancellableCoroutine 执行成功,返回结果:$result")
}catch (e: java.lang.Exception){
println("suspendCancellableCoroutine 执行失败,返回异常:$e")
}
}
job.join()
}
运行结果
suspendCancellableCoroutine 正在执行
suspendCancellableCoroutine 被取消了,cause:kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job=StandaloneCoroutine{Cancelling}@6c47d16e
suspendCancellableCoroutine 执行失败,返回异常:kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job=StandaloneCoroutine{Cancelling}@6c47d16e
需要注意的地方:
调用 resume()
之后,后续代码还会继续执行,第二次调用 resume()
后,后续代码不会被执行,并且会抛出异常,这一点,suspendCoroutine
和 suspendCancellableCoroutine
都是一样的
fun main() = runBlocking {
val job = CoroutineScope(Dispatchers.Default).launch {
try{
val result = suspendCancellableCoroutine<String>{
println("suspendCancellableCoroutine 正在执行")
it.resume("返回值")
println("suspendCancellableCoroutine 已经返回")
it.resume("返回值2")
println("suspendCancellableCoroutine 再次返回")
}
println("suspendCancellableCoroutine 执行成功,返回结果:$result")
}catch (e: java.lang.Exception){
println("suspendCancellableCoroutine 执行失败,返回异常:$e")
}
}
job.join()
}
运行结果:
suspendCancellableCoroutine 正在执行
suspendCancellableCoroutine 已经返回
suspendCancellableCoroutine 执行失败,返回异常:java.lang.IllegalStateException: Already resumed, but proposed with update 返回值2
(未完见第二部分)