一种 golang 实现 多协程任务处理的套路

一种 golang 实现 多协程任务处理的套路

那么是什么样的任务呢,一般是在生产者-消费者模式的消费者进程 ,举几个例子

  1. 消费kafka 数据
  2. 消费redis 数据
  3. 轮询处理数据库数据
  4. ...

下面来分析一下

  1. 业务逻辑处理协程
    到底多少个呢 ?处理一个数据 就 go 一个吗,也可以不过有点粗暴,协程也不是越多越好,调度也是要好性能的
    所以还是控制一下,一般吧 弄个cpu * 2 就差不多了
    (runtime.NumCPU() *2)

  2. 获取数据协程
    由于我要分析的例子 都是一个 for 循环 不停读取数据 交个任务处理协程,所以这里就 用一个协程

  3. 进程如何关闭
    总不能kill -9 粗暴处理吧,这样容易造成数据异常或者丢数据,一般都是 捕捉 信号
    signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)

直接上代码

package main

import (
    "fmt"
    "os"
    "os/signal"
    "runtime"
    "sync/atomic"
    "syscall"
    "time"
)

type TaskData struct {
}
type Service struct {
    capacity     int
    tasks        chan *TaskData
    numThread    int
    closeChans   chan struct{}
    stopFlag     int32
    loopStopChan chan struct{}
}

func NewService(capacity int) *Service {
    service := &Service{}
    service.capacity = capacity
    service.numThread = runtime.NumCPU() * 2
    service.tasks = make(chan *TaskData, capacity)
    service.stopFlag = 0
    service.closeChans = make(chan struct{}, service.numThread)
    service.loopStopChan = make(chan struct{})
    return service
}

func (this *Service) Stop() {
    atomic.StoreInt32(&this.stopFlag, 1)
    <-this.loopStopChan
    close(this.tasks)
    for i := 0; i < this.numThread; i++ {
        <-this.closeChans
    }
}

func (this *Service) Run() {
    for i := 0; i < this.numThread; i++ {
        go this.run(i)
    }
    go this.LoopConsume()
}

func (this *Service) run(i int) {
    fmt.Println("go run:", i)
loop:
    for {
        select {
        case task, ok := <-this.tasks:
            if ok {
                //#TODO process
                fmt.Println("process", task)
            } else {
                break loop
            }
        }
    }
    this.closeChans <- struct{}{}
}

func (this *Service) LoopConsume() {
    fmt.Println("loop")
    for atomic.LoadInt32(&this.stopFlag) == 0 {
        //TODO ReadData
        task := &TaskData{}
        this.tasks <- task

        fmt.Println("consume.")
        time.Sleep(time.Second * 2)
    }
    this.loopStopChan <- struct{}{}
}

func main() {
    service := NewService(100)
    go service.Run() //启动程序处理

    c := make(chan os.Signal)
    signal.Notify(c, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
    s := <-c //等待关闭信号
    fmt.Println(s)
    service.Stop() //关闭service 
    fmt.Println("exit :D")
}

思考 1:

service.stopFlag 
service.closeChans
service.loopStopChan 

这几个变量干什么用的,是为了安全退出程序用的

1. stopFlag

首先 要退出 LoopConsume 大循环 用什么去通知呢,用channel 也可以,就是要配合select 使用,但是用原子标记是不是更简洁呢? 所以 stopFlag 是为了退出 LoopConsume 用的

2. closeChans

由于我们go 了很多个协程,那么要监听每一个协程退出,就需要多个channel 去接收

    for i := 0; i < this.numThread; i++ {
        <-this.closeChans 
    }

这段代码的意思就是等待所有 处理协成退出

3. loopStopChan

这个又是干什么的呢,同样也是处理协程退出的 只不过是 LoopConsume,为什么不 closeChans 大小再加一个 而变成这样呢

service.closeChans = make(chan struct{}, service.numThread+1)
func (this *Service) Stop() {
    atomic.StoreInt32(&this.stopFlag, 1)
    close(this.closeChans)
    for i := 0; i < this.numThread+1; i++ {
        <-this.closeChans
    }
}

这么做会发生什么呢?,假如这样 一旦执行了stop 那么
this.stopFlag = 1,但是 LoopConsume 可能还在从 //TODO ReadData 获取数据阶段
当执行了 close(this.tasks) ,此时 恰好又要执行 this.tasks <- task,但是此时
tasks 已经关闭,那么就会panic
其实在整个例子里 LoopConsume 就相当于一个生产者,而run 相当于一个消费者,我们是不是应该先关不生产者 等待 消费者 消费完了 再退出呢,毫无疑问 肯定是的,所以就要有一个channel 等 生产者 退出了 再发送 channel 去 让消费者退出,所以单独用一个 loopStopChan

思考2:

func (this *Service) LoopConsume() {
    fmt.Println("loop")
    for atomic.LoadInt32(&this.stopFlag) == 0 {
        //TODO ReadData
        task := &TaskData{}
        this.tasks <- task

        fmt.Println("consume.")
        time.Sleep(time.Second * 2)
    }
    this.loopStopChan <- struct{}{}
}

这段代码其实就是不停的获取数据,我这里没有写获取数据的部分,因为这个是和业务相关的,举个实际点的例子 比如 比如 读取mysql
SELECT ID, * FROM DATA WHERE ID > OFFSET LIMIT N;
每次 从OFFSET 位置读取 N 条数据,读取后 如果获取的条数 为 num ,若num等于 N , 那么 OFFSET += N 继续 read,否则 说明数据不够了 ,则,OFFSET += num,并且 sleep n 秒 (避免没有数据的时候空跑)

上伪代码

func (this *Service) LoopConsume() {
    fmt.Println("loop")
    for atomic.LoadInt32(&this.stopFlag) == 0 {
        rows:= Read(offset)
        if rows 行数 ==  N {
            task := &TaskData{}
            this.tasks <- task
            offset + = N
        }else{
            time.Sleep(time.Second * 20)
            offset +=   rows 行数
        }   
    }
    this.loopStopChan <- struct{}{}
}

这里有没有发现问题呢,假如程序刚好进入了time.Sleep(time.Second * 20) 这里呢,此时stop 岂不是 要等待20s 可是其实进程已经很闲了,有什么办法解决 呢,还是可以用标记的方法,一段程序进入sleep 可以设置一个标记

上伪代码

func (this *Service) LoopConsume() {
    fmt.Println("loop")
    for atomic.LoadInt32(&this.stopFlag) == 0 {
        atomic.StoreInt32(&this.forcestopFlag, 0)
        rows:= Read(offset)
        if rows 行数 ==  N {
            task := &TaskData{}
            this.tasks <- task
            offset + = N
        }else{
            atomic.StoreInt32(&this.forcestopFlag, 1)
            time.Sleep(time.Second * 20)
            offset +=   rows 行数
        }   
    }
    this.loopStopChan <- struct{}{}
}


func (this *Service) Stop() {
    atomic.StoreInt32(&this.stopFlag, 1)
    if  atomic.LoadInt32(&this.forcestopFlag) == 0{
        <-this.loopStopChan //只有当forcestopFlag = 0 的时候才需要等待 LoopConsume退出
    }
    close(this.tasks)
    for i := 0; i < this.numThread; i++ {
        <-this.closeChans
    }
}

思考3:

此模型 run 里面 或者 LoopConsume 还可以 go 协程出来吗,显然不行,因为一旦go 出来 了,那么现有的 stop 就失效了,因为无法获取这些协程是否退出。
其实我觉得也没有必要 再go 一个出来,因为LoopConsume 一般是读 ,速度比 业务处理的要高, 如果这个不满足你的实际业务需求,你可以 go 多个 LoopConsume ,同样把 loopStopChan 也弄成 长度为 N 的channel

<-this.loopStopChan 变成这样

for i := 0; i <N ; i++ {
    <-this.loopStopChan
}

再极端 你的业务非得 在 LoopConsume go 一个或者多个协程,那么你得思考 该怎么同步了,至于用什么方法,得好好思考了,可以提出来大伙一起讨论讨论

同理在 run 里面你也想 go 一个或者多个协程, 还是一样得想办法考虑同步问题

总结

这个小service 只是一种多协程处理任务的套路,常用在生产者消费者模型的消费者进程。
对于对性能要求比较高的可能不适合,比如这里的 协程数是固定的,可以改进成伸缩的动态变化,
代码写的比较简单,一些错误之处 还望各位 大神多多指正,欢迎讨论。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 200,392评论 5 470
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,258评论 2 377
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 147,417评论 0 332
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,992评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,930评论 5 360
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,199评论 1 277
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,652评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,327评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,463评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,382评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,432评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,118评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,704评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,787评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,999评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,476评论 2 346
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,057评论 2 341

推荐阅读更多精彩内容

  • 轻量级线程:协程 在常用的并发模型中,多进程、多线程、分布式是最普遍的,不过近些年来逐渐有一些语言以first-c...
    Tenderness4阅读 6,350评论 2 10
  • 原文链接:https://github.com/EasyKotlin 在常用的并发模型中,多进程、多线程、分布式是...
    JackChen1024阅读 10,694评论 3 23
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,566评论 18 139
  • 文/五柳家先生 记忆中我看过最美的景色,是小时候跟着年轻的父亲爬上一座不知名的山,从上向下看一片湖光山色,...
    五柳家先生阅读 1,136评论 4 8
  • 在我们每个人心中,都有着自己的小九九,每天都在各种测试与测试中度过,每天都在研究或者渴望有着新的黑科技出现,自己然...
    一王二山阅读 225评论 0 0