Actor 使用单线程处理消息,所以不会出现并发问题。你可以把 Actor 内部的工作模式想象成只有一个消费者线程的生产者 - 消费者模式。所以,在 Actor 模型里,发送消息仅仅是把消息发出去而已,接收消息的 Actor 在接收到消息后,也不一定会立即处理,也就是说 Actor 中的消息机制完全是异步的。在 Actor 中发送消息,类似于现实中的写信,只需要知道对方的地址就可以,发送消息和接收消息的 Actor 可以不在一个进程中,也可以不在同一台机器上。因此,Actor 模型不但适用于并发计算,还适用于分布式计算。(一个步骤只有一个actor处理,可以把一件事细化成大量的步骤来做到提速效果。比如转账这件事,给每个转出账户单独一个actor操作,每个转入账户一个单独actor操作。解决共享资源冲突的思路是,不要让共享资源的访问操作并发。对于共享资源的操作使用同一个Actor来依次处理,同一时间只有一个访问操作,那就不会有冲突了。这里的账号是共享资源)
对A、B表的访问具有事务性,可以认为A、B表整体是一个资源,因此不能分割为多个Actor来操作,所以只需要一个Actor即可。它接受并处理两种消息,分别为用户甲、乙的请求,由于每次Actor只会处理一条消息,所以不会出现两个数据库连接同时打开的情况,也就不会锁表了。
如果使用传统的方法,那么可以把对A、B表的访问写在同一个方法内,调用时加锁,原理和上面差不多,这样避免了A、B表被不同的方法分别锁住的情况。如果对甲、乙的请求分成两个单独的处理方法,那很容易会发生A、B表被分别占有的死锁情况。(把读写 A,B表的操作分散为2个不同的Actor,就解决了问题。如果必须具有原子性,把操作合并成一个actor,先A后B,先B后A 是由同一个actor完成的即可)
同样是以消息传递的方式来避免共享,那 Golang 实现的 CSP 模型和 Actor 模型有什么区别呢?第一个最明显的区别就是:Actor 模型中没有 channel。虽然 Actor 模型中的 mailbox 和 channel 非常像,看上去都像个 FIFO 队列,但是区别还是很大的。Actor 模型中的 mailbox 对于程序员来说是“透明”的,mailbox 明确归属于一个特定的 Actor,是 Actor 模型中的内部机制;而且 Actor 之间是可以直接通信的,不需要通信中介。但 CSP 模型中的 channel 就不一样了,它对于程序员来说是“可见”的,是通信的中介,传递的消息都是直接发送到 channel 中的。
第二个区别是:Actor 模型中发送消息是非阻塞的,而 CSP 模型中是阻塞的。Golang 实现的 CSP 模型,channel 是一个阻塞队列,当阻塞队列已满的时候,向 channel 中发送数据,会导致发送消息的协程阻塞。(是协程阻塞,不是线程阻塞,这个就是go的亮点)
第三个区别则是关于消息送达的。我们介绍过 Actor 模型理论上不保证消息百分百送达,而在 Golang 实现的 CSP 模型中,是能保证消息百分百送达的。不过这种百分百送达也是有代价的,那就是有可能会导致死锁。
在下面的示例代码中,我们用了 4 个子协程来并行执行,这 4 个子协程分别计算[1, 25 亿]、(25 亿, 50 亿]、(50 亿, 75 亿]、(75 亿, 100 亿],最后再在主协程中汇总 4 个子协程的计算结果。主协程要汇总 4 个子协程的计算结果,势必要和 4 个子协程之间通信,Golang 中协程之间通信推荐的是使用 channel,channel 你可以形象地理解为现实世界里的管道。另外,calc() 方法的返回值是一个只能接收数据的 channel ch,它创建的子协程会把计算结果发送到这个 ch 中,而主协程也会将这个计算结果通过 ch 读取出来。
import (
"fmt"
"time"
)
func main() {
// 变量声明
var result, i uint64
// 单个协程执行累加操作
start := time.Now()
for i = 1; i <= 10000000000; i++ {
result += i
}
// 统计计算耗时
elapsed := time.Since(start)
fmt.Printf("执行消耗的时间为:", elapsed)
fmt.Println(", result:", result)
// 4个协程共同执行累加操作
start = time.Now()
ch1 := calc(1, 2500000000)
ch2 := calc(2500000001, 5000000000)
ch3 := calc(5000000001, 7500000000)
ch4 := calc(7500000001, 10000000000)
// 汇总4个协程的累加结果
result = <-ch1 + <-ch2 + <-ch3 + <-ch4
// 统计计算耗时
elapsed = time.Since(start)
fmt.Printf("执行消耗的时间为:", elapsed)
fmt.Println(", result:", result)
}
// 在协程中异步执行累加操作,累加结果通过channel传递
func calc(from uint64, to uint64) <-chan uint64 {
// channel用于协程间的通信
ch := make(chan uint64)
// 在协程中执行累加操作
go func() {
result := from
for i := from + 1; i <= to; i++ {
result += i
}
// 将结果写入channel
ch <- result
}()
// 返回结果是用于通信的channel
return ch
}
从操作系统的角度来看,线程是在内核态中调度的,而协程是在用户态调度的,所以相对于线程来说,协程切换的成本更低。协程虽然也有自己的栈,但是相比线程栈要小得多,典型的线程栈大小差不多有 1M,而协程栈的大小往往只有几 K 或者几十 K。所以,无论是从时间维度还是空间维度来看,协程都比线程轻量得多。线程是CPU的调度单元,而协程又是在线程上细分的调度单元。
import (
"fmt"
"time"
)
func hello(msg string) {
fmt.Println("Hello " + msg)
}
func main() {
//在新的协程中执行hello方法
go hello("World")
fmt.Println("Run in main")
//等待100毫秒让协程执行结束
time.Sleep(100 * time.Millisecond)
}
下面的示例代码是用 Golang 实现的 echo 程序的服务端,用的是 Thread-Per-Message 模式,为每个成功建立连接的 socket 分配一个协程,相比 Java 线程池的实现方案,Golang 中协程的方案更简单。
import (
"log"
"net"
)
func main() {
//监听本地9090端口
socket, err := net.Listen("tcp", "127.0.0.1:9090")
if err != nil {
log.Panicln(err)
}
defer socket.Close()
for {
//处理连接请求
conn, err := socket.Accept()
if err != nil {
log.Panicln(err)
}
//处理已经成功建立连接的请求
go handleRequest(conn)
}
}
//处理已经成功建立连接的请求
func handleRequest(conn net.Conn) {
defer conn.Close()
for {
buf := make([]byte, 1024)
//读取请求数据
size, err := conn.Read(buf)
if err != nil {
return
}
//回写相应数据
conn.Write(buf[:size])
}
}
Go的gorouting模型
把用户线程逻辑,拆分成G和P两部分切换,来达到避免真实的内核线程M导致的阻塞问题。
当一个OS线程M0陷入阻塞时,P转而在OS线程M1上运行。调度器保证有足够的线程来运行所以的context P。
所以逻辑线程P和操作系统线程M可以是1个P对应多个M的关系。
GPM 的基本结构:
G(Goroutine),每个Goroutine对应一个G结构体,存储了Goroutine的运行堆栈、状态以及任务函数,可重用。
M(Machine,操作系统线程),真正执行计算的资源;因为G可以跨M调度,所以M是不能保存G的状态的。M的数量是不定的,由Go Runtime调整,为了防止创建过多OS线程导致调度不过来,默认的最大限制数量为10000个。
P(Processor,逻辑处理器)提供了相关的执行环境(Context),如内存分配状态,任务队列等,P的数量决定了系统内最大可并行的G的数量(前提:物理CPU核数 >= P的数量),P的数量由用户设置的GOMAXPROCS决定(上限为256)。
G并非执行体,每个G需要绑定到P逻辑处理器才能被调度执行。
每个逻辑处理器P需要绑定到M,M会启动一个操作系统线程,进入调度循环;
粗糙地说调度就是决定哪个goroutine何时获得执行资源、哪个goroutine应该停止执行让出资源、哪个goroutine应该被唤醒恢复执行等;
获得执行资源的goroutine,会从其栈上每个创建的Goroutine会被Go调度器放入全局运行队列,然后分配到一个P逻辑处理器,并放入逻辑处理器本地的队列中,等待被执行;
P和M的创建时机
在确定了 P 的最大数量 n 后,程序运行时系统会根据这个数量创建 n 个 P。
当没有足够的M来绑定对应的P的时候会创建,包括以下两个场景
当某个G系统调用阻塞,P将与当前M解绑,寻找一个休眠的M重新进行绑定,当休眠的M不够时,则会创建一个新的,之前的M处理完任务后因为没有P与之绑定则会进入休眠状态
当在创建新的G时,会尝试去绑定空闲的P与M,如果这时休眠的M不足,也会创建新的M);
特点总结:
Go调度器是在用户层进行调度,不需要进入内核的上下文切换,避免在用户态和内核态线程的切换开销,调度成本会低很多。
特殊的M0和G0
M0 是启动程序后的编号为 0 的主线程,这个 M 对应的实例会在全局变量 runtime.m0 中,不需要在 heap 上分配,M0 负责执行初始化操作和启动第一个 G, 在之后 M0 就和其他的 M 一样了。
G0 是每次启动一个 M 都会第一个创建的 gourtine,G0 仅用于负责调度的 G,G0 不指向任何可执行的函数,每个 M 都会有一个自己的 G0。在调度或系统调用时会使用 G0 的栈空间,全局变量的 G0 是 M0 的 G0。
goroutine的轻量级
goroutine的栈不是固定的,一开始以一个很小的栈空间(2KB)开启生命周期,栈的大小会根据需要动态伸缩。和操作系统线程的栈有同样作用,会保存当前正在运行或挂起的函数的本地变量。
OS调度器的切换成本
线程会被OS调度到CPU上运行,每几毫秒会发生一次硬件计时器中断,当前线程需要让出CPU并将线程状态保存到寄存器中,CPU继续处理其他线程任务。此线程下一次获得CPU执行的时间片,会从寄存器中恢复该线程上次的状态并继续执行。线程在内核切换上下文是很慢的。
创建G时会尝试创建新的MP绑定关系
当创建G的时候,会尝试绑定空闲的M与P(如果没有空闲的P则放弃),然后新的绑定关系会从其他队列中偷取G运行;这样的机制下,假如我的P为4,main中通过for循环启动4个协程,配合work stealing机制最终G将均匀负载;
自旋线程
G0处于自旋不断尝试获取可执行G的状态下,对应的线程称为自旋线程,自旋状态将持续到找到可执行G或被回收为止;
核心点
面对系统级阻塞是如何处理的。
N 个协程调度器P绑定 1 个线程,优点就是协程在用户态线程即完成切换,不会陷入到内核态,这种切换非常的轻量快速。但也有很大的缺点,1 个进程的所有协程都绑定在 1 个线程上。一旦某协程阻塞,造成线程阻塞,本进程的其他协程都无法执行了,根本就没有并发的能力了。Python当前所支持的模式就是这种。这种模式适合本身底层API就是异步,比如异步I/O,在代码层需要异步转同步,这个模型就是辅助你在用户态异步转同步。JS应该也是这种模型。
N个操作系统线程绑定1个协程调度器P
这是为了保证P能一直持续运行,最大效率化执行。
总结下来就是M:N关系能解决所有问题。难点就在于P的实现变麻烦了,要分情况处理。不过这也是语言实现者关心的事。
如何实现重新调度
要实现一个G被重复调度,需要把G放回全局队列或者一个While循环里,然后记录当前的执行状态。
其他语言中的实现:
安卓的kotlin中用suspend标记一个执行块也就是G,因为还要区分UI线程和IO线程,所以绑定G和P的时候,还要指定是在什么线程中执行。创建Scope就是创建一个调度器。go中是通过配置来自动创建调度器的。
下面是一个创建P的例子:
val scope = CoroutineScope(Job() + Dispatchers.Main) //创建调度器,并绑定Main线程
下面是一个简单的创建G的例子
suspend fun fetchDocs() { // Dispatchers.Main
val result = get("https://developer.android.com") // Dispatchers.IO for `get`
show(result) // Dispatchers.Main
}
suspend fun get(url: String) = withContext(Dispatchers.IO) { /* ... */ }
理解了这些关系以后,就理解使用 suspend 不会让 Kotlin 在后台线程上运行函数。suspend 函数在主线程上运行是一种正常的现象。在主线程上启动协程的情况也很常见。当您需要确保主线程安全时(例如,从磁盘上读取数据或向磁盘中写入数据、执行网络操作或运行占用大量 CPU 资源的操作时),应始终在 suspend 函数内使用 withContext()。
可以通过以下两种方式来启动协程:
-
launch
会启动新协程而不将结果返回给调用方。任何被视为“一劳永逸”的工作都可以使用launch
来启动。 -
async
可启动一个新协程,并允许您使用一个名为await
的挂起函数返回结果。
通常,您应使用 launch
从常规函数启动新协程,因为常规函数无法调用 await
。只有在另一个协程内或在挂起函数内且在执行并行分解时,才使用 async
。
val job = Job()
CoroutineScope(job).launch {
val result= async {
//模拟耗时操作
delay(3000)
"操作成功"
}.await()
Log.d(TAG, result)
}
调用示例
exampleMethod 是一个正常函数,fetchDocs是一个协程,这里演示了如何在一个正常函数里启动协程。并如何设计一个函数celanUp去取消协程的执行
class ExampleClass {
// Job and Dispatcher are combined into a CoroutineContext which
// will be discussed shortly
val scope = CoroutineScope(Job() + Dispatchers.Main)
fun exampleMethod() {
// Starts a new coroutine within the scope
scope.launch {
// New coroutine that can call suspend functions
fetchDocs()
}
}
fun cleanUp() {
// Cancel the scope to cancel ongoing coroutines work
scope.cancel()
}
}
用协程的原始目的就是方便的异步转同步,最小代价的实现“挂起”操作。否则就要用到线程的“阻塞”操作。