从多线程到分布式(八)Actor和协程.

Actor 使用单线程处理消息,所以不会出现并发问题。你可以把 Actor 内部的工作模式想象成只有一个消费者线程的生产者 - 消费者模式。所以,在 Actor 模型里,发送消息仅仅是把消息发出去而已,接收消息的 Actor 在接收到消息后,也不一定会立即处理,也就是说 Actor 中的消息机制完全是异步的。在 Actor 中发送消息,类似于现实中的写信,只需要知道对方的地址就可以,发送消息和接收消息的 Actor 可以不在一个进程中,也可以不在同一台机器上。因此,Actor 模型不但适用于并发计算,还适用于分布式计算。(一个步骤只有一个actor处理,可以把一件事细化成大量的步骤来做到提速效果。比如转账这件事,给每个转出账户单独一个actor操作,每个转入账户一个单独actor操作。解决共享资源冲突的思路是,不要让共享资源的访问操作并发。对于共享资源的操作使用同一个Actor来依次处理,同一时间只有一个访问操作,那就不会有冲突了。这里的账号是共享资源)

对A、B表的访问具有事务性,可以认为A、B表整体是一个资源,因此不能分割为多个Actor来操作,所以只需要一个Actor即可。它接受并处理两种消息,分别为用户甲、乙的请求,由于每次Actor只会处理一条消息,所以不会出现两个数据库连接同时打开的情况,也就不会锁表了。
如果使用传统的方法,那么可以把对A、B表的访问写在同一个方法内,调用时加锁,原理和上面差不多,这样避免了A、B表被不同的方法分别锁住的情况。如果对甲、乙的请求分成两个单独的处理方法,那很容易会发生A、B表被分别占有的死锁情况。(把读写 A,B表的操作分散为2个不同的Actor,就解决了问题。如果必须具有原子性,把操作合并成一个actor,先A后B,先B后A 是由同一个actor完成的即可)

同样是以消息传递的方式来避免共享,那 Golang 实现的 CSP 模型和 Actor 模型有什么区别呢?第一个最明显的区别就是:Actor 模型中没有 channel。虽然 Actor 模型中的 mailbox 和 channel 非常像,看上去都像个 FIFO 队列,但是区别还是很大的。Actor 模型中的 mailbox 对于程序员来说是“透明”的,mailbox 明确归属于一个特定的 Actor,是 Actor 模型中的内部机制;而且 Actor 之间是可以直接通信的,不需要通信中介。但 CSP 模型中的 channel 就不一样了,它对于程序员来说是“可见”的,是通信的中介,传递的消息都是直接发送到 channel 中的。

第二个区别是:Actor 模型中发送消息是非阻塞的,而 CSP 模型中是阻塞的。Golang 实现的 CSP 模型,channel 是一个阻塞队列,当阻塞队列已满的时候,向 channel 中发送数据,会导致发送消息的协程阻塞。(是协程阻塞,不是线程阻塞,这个就是go的亮点)

第三个区别则是关于消息送达的。我们介绍过 Actor 模型理论上不保证消息百分百送达,而在 Golang 实现的 CSP 模型中,是能保证消息百分百送达的。不过这种百分百送达也是有代价的,那就是有可能会导致死锁。

在下面的示例代码中,我们用了 4 个子协程来并行执行,这 4 个子协程分别计算[1, 25 亿]、(25 亿, 50 亿]、(50 亿, 75 亿]、(75 亿, 100 亿],最后再在主协程中汇总 4 个子协程的计算结果。主协程要汇总 4 个子协程的计算结果,势必要和 4 个子协程之间通信,Golang 中协程之间通信推荐的是使用 channel,channel 你可以形象地理解为现实世界里的管道。另外,calc() 方法的返回值是一个只能接收数据的 channel ch,它创建的子协程会把计算结果发送到这个 ch 中,而主协程也会将这个计算结果通过 ch 读取出来。

import (
  "fmt"
  "time"
)

func main() {
    // 变量声明
  var result, i uint64
    // 单个协程执行累加操作
  start := time.Now()
  for i = 1; i <= 10000000000; i++ {
    result += i
  }
  // 统计计算耗时
  elapsed := time.Since(start)
  fmt.Printf("执行消耗的时间为:", elapsed)
  fmt.Println(", result:", result)

    // 4个协程共同执行累加操作
  start = time.Now()
  ch1 := calc(1, 2500000000)
  ch2 := calc(2500000001, 5000000000)
  ch3 := calc(5000000001, 7500000000)
  ch4 := calc(7500000001, 10000000000)
    // 汇总4个协程的累加结果
  result = <-ch1 + <-ch2 + <-ch3 + <-ch4
  // 统计计算耗时
  elapsed = time.Since(start)
  fmt.Printf("执行消耗的时间为:", elapsed)
  fmt.Println(", result:", result)
}
// 在协程中异步执行累加操作,累加结果通过channel传递
func calc(from uint64, to uint64) <-chan uint64 {
    // channel用于协程间的通信
  ch := make(chan uint64)
    // 在协程中执行累加操作
  go func() {
    result := from
    for i := from + 1; i <= to; i++ {
      result += i
    }
        // 将结果写入channel
    ch <- result
  }()
    // 返回结果是用于通信的channel
  return ch
}

从操作系统的角度来看,线程是在内核态中调度的,而协程是在用户态调度的,所以相对于线程来说,协程切换的成本更低。协程虽然也有自己的栈,但是相比线程栈要小得多,典型的线程栈大小差不多有 1M,而协程栈的大小往往只有几 K 或者几十 K。所以,无论是从时间维度还是空间维度来看,协程都比线程轻量得多。线程是CPU的调度单元,而协程又是在线程上细分的调度单元。

import (
  "fmt"
  "time"
)
func hello(msg string) {
  fmt.Println("Hello " + msg)
}
func main() {
    //在新的协程中执行hello方法
  go hello("World")
    fmt.Println("Run in main")
    //等待100毫秒让协程执行结束
  time.Sleep(100 * time.Millisecond)
}

下面的示例代码是用 Golang 实现的 echo 程序的服务端,用的是 Thread-Per-Message 模式,为每个成功建立连接的 socket 分配一个协程,相比 Java 线程池的实现方案,Golang 中协程的方案更简单。

import (
  "log"
  "net"
)

func main() {
    //监听本地9090端口
  socket, err := net.Listen("tcp", "127.0.0.1:9090")
  if err != nil {
    log.Panicln(err)
  }
  defer socket.Close()
  for {
        //处理连接请求  
    conn, err := socket.Accept()
    if err != nil {
      log.Panicln(err)
    }
        //处理已经成功建立连接的请求
    go handleRequest(conn)
  }
}

//处理已经成功建立连接的请求
func handleRequest(conn net.Conn) {
  defer conn.Close()
  for {
    buf := make([]byte, 1024)
        //读取请求数据
    size, err := conn.Read(buf)
    if err != nil {
      return
    }
        //回写相应数据  
    conn.Write(buf[:size])
  }
}

Go的gorouting模型

把用户线程逻辑,拆分成G和P两部分切换,来达到避免真实的内核线程M导致的阻塞问题。
当一个OS线程M0陷入阻塞时,P转而在OS线程M1上运行。调度器保证有足够的线程来运行所以的context P。
所以逻辑线程P和操作系统线程M可以是1个P对应多个M的关系。

image.png

GPM 的基本结构:
G(Goroutine),每个Goroutine对应一个G结构体,存储了Goroutine的运行堆栈、状态以及任务函数,可重用。
M(Machine,操作系统线程),真正执行计算的资源;因为G可以跨M调度,所以M是不能保存G的状态的。M的数量是不定的,由Go Runtime调整,为了防止创建过多OS线程导致调度不过来,默认的最大限制数量为10000个。
P(Processor,逻辑处理器)提供了相关的执行环境(Context),如内存分配状态,任务队列等,P的数量决定了系统内最大可并行的G的数量(前提:物理CPU核数 >= P的数量),P的数量由用户设置的GOMAXPROCS决定(上限为256)。

G并非执行体,每个G需要绑定到P逻辑处理器才能被调度执行。
每个逻辑处理器P需要绑定到M,M会启动一个操作系统线程,进入调度循环;
粗糙地说调度就是决定哪个goroutine何时获得执行资源、哪个goroutine应该停止执行让出资源、哪个goroutine应该被唤醒恢复执行等;
获得执行资源的goroutine,会从其栈上每个创建的Goroutine会被Go调度器放入全局运行队列,然后分配到一个P逻辑处理器,并放入逻辑处理器本地的队列中,等待被执行;

P和M的创建时机

在确定了 P 的最大数量 n 后,程序运行时系统会根据这个数量创建 n 个 P。
当没有足够的M来绑定对应的P的时候会创建,包括以下两个场景
当某个G系统调用阻塞,P将与当前M解绑,寻找一个休眠的M重新进行绑定,当休眠的M不够时,则会创建一个新的,之前的M处理完任务后因为没有P与之绑定则会进入休眠状态
当在创建新的G时,会尝试去绑定空闲的P与M,如果这时休眠的M不足,也会创建新的M);

特点总结:

Go调度器是在用户层进行调度,不需要进入内核的上下文切换,避免在用户态和内核态线程的切换开销,调度成本会低很多。

特殊的M0和G0
M0 是启动程序后的编号为 0 的主线程,这个 M 对应的实例会在全局变量 runtime.m0 中,不需要在 heap 上分配,M0 负责执行初始化操作和启动第一个 G, 在之后 M0 就和其他的 M 一样了。
G0 是每次启动一个 M 都会第一个创建的 gourtine,G0 仅用于负责调度的 G,G0 不指向任何可执行的函数,每个 M 都会有一个自己的 G0。在调度或系统调用时会使用 G0 的栈空间,全局变量的 G0 是 M0 的 G0。

goroutine的轻量级
goroutine的栈不是固定的,一开始以一个很小的栈空间(2KB)开启生命周期,栈的大小会根据需要动态伸缩。和操作系统线程的栈有同样作用,会保存当前正在运行或挂起的函数的本地变量。

OS调度器的切换成本
线程会被OS调度到CPU上运行,每几毫秒会发生一次硬件计时器中断,当前线程需要让出CPU并将线程状态保存到寄存器中,CPU继续处理其他线程任务。此线程下一次获得CPU执行的时间片,会从寄存器中恢复该线程上次的状态并继续执行。线程在内核切换上下文是很慢的。

创建G时会尝试创建新的MP绑定关系
当创建G的时候,会尝试绑定空闲的M与P(如果没有空闲的P则放弃),然后新的绑定关系会从其他队列中偷取G运行;这样的机制下,假如我的P为4,main中通过for循环启动4个协程,配合work stealing机制最终G将均匀负载;

自旋线程
G0处于自旋不断尝试获取可执行G的状态下,对应的线程称为自旋线程,自旋状态将持续到找到可执行G或被回收为止;

核心点

面对系统级阻塞是如何处理的。
N 个协程调度器P绑定 1 个线程,优点就是协程在用户态线程即完成切换,不会陷入到内核态,这种切换非常的轻量快速。但也有很大的缺点,1 个进程的所有协程都绑定在 1 个线程上。一旦某协程阻塞,造成线程阻塞,本进程的其他协程都无法执行了,根本就没有并发的能力了。Python当前所支持的模式就是这种。这种模式适合本身底层API就是异步,比如异步I/O,在代码层需要异步转同步,这个模型就是辅助你在用户态异步转同步。JS应该也是这种模型。

N个操作系统线程绑定1个协程调度器P
这是为了保证P能一直持续运行,最大效率化执行。

image.png

总结下来就是M:N关系能解决所有问题。难点就在于P的实现变麻烦了,要分情况处理。不过这也是语言实现者关心的事。

如何实现重新调度
要实现一个G被重复调度,需要把G放回全局队列或者一个While循环里,然后记录当前的执行状态。

其他语言中的实现:

安卓的kotlin中用suspend标记一个执行块也就是G,因为还要区分UI线程和IO线程,所以绑定G和P的时候,还要指定是在什么线程中执行。创建Scope就是创建一个调度器。go中是通过配置来自动创建调度器的。
下面是一个创建P的例子:

val scope = CoroutineScope(Job() + Dispatchers.Main)  //创建调度器,并绑定Main线程

下面是一个简单的创建G的例子

suspend fun fetchDocs() {                             // Dispatchers.Main
    val result = get("https://developer.android.com") // Dispatchers.IO for `get`
    show(result)                                      // Dispatchers.Main
}

suspend fun get(url: String) = withContext(Dispatchers.IO) { /* ... */ }

理解了这些关系以后,就理解使用 suspend 不会让 Kotlin 在后台线程上运行函数。suspend 函数在主线程上运行是一种正常的现象。在主线程上启动协程的情况也很常见。当您需要确保主线程安全时(例如,从磁盘上读取数据或向磁盘中写入数据、执行网络操作或运行占用大量 CPU 资源的操作时),应始终在 suspend 函数内使用 withContext()。

可以通过以下两种方式来启动协程:

  • launch 会启动新协程而不将结果返回给调用方。任何被视为“一劳永逸”的工作都可以使用 launch 来启动。
  • async可启动一个新协程,并允许您使用一个名为 await 的挂起函数返回结果。

通常,您应使用 launch 从常规函数启动新协程,因为常规函数无法调用 await。只有在另一个协程内或在挂起函数内且在执行并行分解时,才使用 async

val job = Job()
CoroutineScope(job).launch {
    val result= async {
        //模拟耗时操作
        delay(3000)
        "操作成功"
    }.await()
    Log.d(TAG, result)
}

调用示例
exampleMethod 是一个正常函数,fetchDocs是一个协程,这里演示了如何在一个正常函数里启动协程。并如何设计一个函数celanUp去取消协程的执行

class ExampleClass {

    // Job and Dispatcher are combined into a CoroutineContext which
    // will be discussed shortly
    val scope = CoroutineScope(Job() + Dispatchers.Main)

    fun exampleMethod() {
        // Starts a new coroutine within the scope
        scope.launch {
            // New coroutine that can call suspend functions
            fetchDocs()
        }
    }

    fun cleanUp() {
        // Cancel the scope to cancel ongoing coroutines work
        scope.cancel()
    }
}

用协程的原始目的就是方便的异步转同步,最小代价的实现“挂起”操作。否则就要用到线程的“阻塞”操作。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,937评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,503评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,712评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,668评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,677评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,601评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,975评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,637评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,881评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,621评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,710评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,387评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,971评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,947评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,189评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,805评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,449评论 2 342

推荐阅读更多精彩内容