工作用Go: 异步任务怎么写

在响应应用请求的过程中, 有时候会遇到比较耗时的任务, 比如给用户发送邮件, 耗时任务时间不可能控, 很可能超过 1s, 为了给用户比较好的体验, 一般会控制请求响应时间(RT, response time)在300ms内(不考虑网络波动), 甚至在 200ms 内. 面对这样的工作场景, 就需要使用异步任务进行处理.

Go协程与异步

从一段简单的代码开始:

func TestTask(t *testing.T) {
 task()
 log.Print("req done")
}

func task() {
 // 模拟耗时任务
 time.Sleep(time.Second)
 log.Print("task done")
}
  • 代码在 Goland 中编写, 同时也推荐使用 Goland 进行 Go 开发
  • 这里使用单测(test)演示代码:
    • 输入 test 就可以快速生成代码(Goland 中称之为 live templates, 其实就是预设好的代码片段)
    • 在单测点击可以执行: 1. 点击左侧(gutter icon)的运行图标; 2. 函数上右键菜单键; 3. 快捷键 ctl-shift-R

上面使用 task() 模拟耗时 1s 的任务, 整个test代表一次请求, 执行如下:

=== RUN   TestTask
2022/11/17 20:11:15 task done
2022/11/17 20:11:15 req done
--- PASS: TestTask (1.00s)
PASS

Go基础知识: 天生并发, 使用 go 关键字就可以开新协程, 将代码放到新协程中执行

func TestTask(t *testing.T) {
 go task()
 log.Print("req done")
}

func task() {
 // 模拟耗时任务
 time.Sleep(time.Second)
 log.Print("task done")
}
  • 只需要在 task() 前添加 go 关键字, 就可以新开一个协程, 将 task() 在新协程中执行

不过在这里, 并没有得到预期的结果:

=== RUN   TestTask
2022/11/17 20:16:08 req done
--- PASS: TestTask (0.00s)
PASS
  • 输出显示: task() 中的日志没有输出, 看起来像没有执行

Go基础知识: Go的代码都在协程中执行, 入口 main() 函数是主协程, 之后使用 go 关键词开的协程都是子协程, 主协程退出后, 程序会终止(exit)

也就是说上面的 TestTask()(主协程) 和 go task()(子协程)都执行了, 但是主协程执行完, 程序退出了, 子协程没执行完(或者没调度到), 就被强制退出了

简单 Go 并发: 任务编排

上面的例子, 常见有 3 种解决方案:

  • 方案1: 等子协程执行完
func TestTask(t *testing.T) {
 go task()

 time.Sleep(time.Second) // 等待子协程执行完
 log.Print("req done")
}

func task() {
 // 模拟耗时任务
 time.Sleep(time.Second)
 log.Print("task done")
}
  • 方案2: 使用 WaitGroup
func TestTask(t *testing.T) {
 var wg sync.WaitGroup
 wg.Add(1)
 go func() {
  task()
  wg.Done()
 }()
 wg.Wait()

 log.Print("req done")
}

func task() {
 // 模拟耗时任务
 time.Sleep(time.Second)
 log.Print("task done")
}

WaitGroup 其实很好理解, 就是同时等待一组任务完成, 它分为 3 步: 1. Add: 总共有多少任务; 2. Done(): 表示任务执行完; 3. Wait(): 等待所有任务完成

  • 方案3: 使用 Go 的并发语言 chan
func TestTask(t *testing.T) {
 ch := make(chan struct{}) // 初始化 chan
 go func() {
  task()
  ch <- struct{}{} // 发送到 chan
 }()
 <-ch // 从 chan 获取

 log.Print("req done")
}

func task() {
 // 模拟耗时任务
 time.Sleep(time.Second)
 log.Print("task done")
}

Go基础知识: 通过 chan T 就可以申明 T 类型的 chan, 供协程间进行通信; struct{} 是 Go 中 0 memory use(0内存占用)类型, 适合上面使用 chan 进行 控制 而不需要 数据 进行通信的情况

虽然只是3个简单的 demo code, Go 提供的 2 种并发能力都有展示:

  • 传统并发原语: 大部分集中在sync包下, 上面案例2中的 sync.WaitGroup 就是其中之一
  • Go 基于 CSP 的并发编程范式: 包括 go chan select, 上面的案例3中展示了 go+chan 的基本用法

简单 Go 并发讲完了, 那任务编排又是啥? 其实, 某等程度上, 任务编排=异步, 任务需要 分工 完成时, 也就是一个任务相对于另一个任务需要 异步处理. 而任务编排, 恰恰是 Go 语言中基于 chan 进行并发编程的强项.

Go 中有一个大的方向,就是任务编排用 Channel,共享资源保护用传统并发原语。

回到最初的代码, 在实际使用的使用, 到底使用的是哪种方案呢? 答案是 方案1. 看看接近真实场景的代码

func TestTrace(t *testing.T) {
 for { // 服务以 daemon 的方式持续运行
  // 不断处理用户的请求
  {
   go task()

   log.Print("req done")
  }
 }
}

func task() {
 // 模拟耗时任务
 time.Sleep(time.Second)
 log.Print("task done")
}

也就是真实场景下, 主协程所在的 server 会一直常驻, 请求(request)所有的子协程不用担心还没执行完就被强制退出了

避坑: 野生 Goroutine

在继续讲解之前, 一定要提一下使用 go 开协程的一个, 或者说一个非常重要的基础知识:

Go基础知识: panic只对当前goroutine的defer有效

Go中出现 panic(), 程序会立即终止:

func TestPanic(t *testing.T) {
 panic("panic")
 log.Print("end")
}
=== RUN   TestPanic
--- FAIL: TestPanic (0.00s)
panic: panic [recovered]
 panic: panic

goroutine 118 [running]:
testing.tRunner.func1.2({0x103e15940, 0x10405c208})
 /opt/homebrew/opt/go/libexec/src/testing/testing.go:1396 +0x1c8
testing.tRunner.func1()
 /opt/homebrew/opt/go/libexec/src/testing/testing.go:1399 +0x378
panic({0x103e15940, 0x10405c208})
 /opt/homebrew/opt/go/libexec/src/runtime/panic.go:884 +0x204
 ...
 ...
testing.tRunner(0x14000603040, 0x104058678)
 /opt/homebrew/opt/go/libexec/src/testing/testing.go:1446 +0x10c
created by testing.(*T).Run
 /opt/homebrew/opt/go/libexec/src/testing/testing.go:1493 +0x300


Process finished with the exit code 1
  • 可以看到, panic 后程序直接退出, panic 后的 log.Print("end") 并没有执行

当然, 想要程序健壮一些, panic 是可以 吃掉 的:

func TestPanic(t *testing.T) {
 defer func() {
  if r := recover(); r != nil {
   log.Print(r)
  }
 }()

 panic("panic")
 log.Print("end")
}
=== RUN   TestPanic
2022/11/17 22:25:08 panic
--- PASS: TestPanic (0.00s)
PASS

使用 recover()panic() 进行恢复, 程序就不会崩掉(exit)

但是, 一定要注意

panic只对当前goroutine的defer有效!
panic只对当前goroutine的defer有效!
panic只对当前goroutine的defer有效!

重要的事情说三遍.

func TestPanic(t *testing.T) {
 defer func() {
  if r := recover(); r != nil {
   log.Print(r)
  }
 }()

 go func() {
  panic("panic")
 }()

 log.Print("end")
}
=== RUN   TestPanic
panic: panic

goroutine 88 [running]:
...
...
...

Process finished with the exit code 1

而 go 里面开协程又是如此的方便, 简单一个 go 关键字即可, 所以大家给这种情况起了个外号: 野生 Goroutine. 最简单的做法就是对协程进行一次封装, 比如这样:

package gox

// Run start with a goroutine
func Run(fn func()) {
 go func() {
  defer func() {
   if r := recover(); r != nil {
    log.Print(r)
   }
  }()

  fn()
 }()
}

原本的 go task(), 使用 gox.Run(task)进行替换, 就可以 task 出现 panic 的时候, 程序还能恢复

Trace: 异步任务还能进行链路追踪么?

随着可观测技术的不断演进, 基建上的不断提升, 链路追踪技术也进行了演进

  • trace1.0: opentracing jaeger
  • trace2.0: otel

当用户请求进来时, 可以通过 traceId 串联起用户的完成调用链, 监控和排查问题能力大大增强!

{
    "code": 200,
    "status": 200,
    "msg": "成功",
    "errors": null,
    "data": "env-t0",
    "timestamp": 1668696256,
    "traceId": "..."
}

trace 通过请求(request)中的 context, 不断向下传递, 从而将当前请求的所用调用通过同一个 traceId 串联起来

func TestTrace(t *testing.T) {
 Op1(ctx) // 比如操作了 DB
 Op2(ctx) // 比如操作了 cache
 Task(ctx)

 log.Print("req done")
}

func Task(ctx context.Context) {
 // 使用自定义span, 将当前操作上报到trace
 _, span := otel.GetTracerProvider().Tracer("task").Start(ctx, "xxxTask")
 defer span.End()

 // 模拟耗时任务
 time.Sleep(time.Second)
 log.Print("task done")
}

如同上面演示的 demo code 演示:

  • 通过 ctx, 将当前请求(request)的所有操作使用同一个 traceId 串起来
  • otel 默认了很多操作的 trace 上报, 比如 mysql/redis/kafka 等等, 也可以使用自定义 span 的方式进行新增

如果要进行耗时任务异步处理, 直觉上直接 go 一下:

func TestTrace(t *testing.T) {
 Op1(ctx) // 比如操作了 DB
 Op2(ctx) // 比如操作了 cache
 go Task(ctx)

 log.Print("req done")
}

这时候脑海中陡然蹦出一个声音: 野生Goroutine

func TestTrace(t *testing.T) {
 Op1(ctx) // 比如操作了 DB
 Op2(ctx) // 比如操作了 cache
 gox.RunCtx(ctx, Task) // 在 gox.Run 的基础上, 添加 ctx 支持

 log.Print("req done")
}

可是等测试一下, 就会发现, task() 并没有执行!

细心的小伙伴就会发现, 这和开始的例子有点像呀, 而且对比下就会知道, 此处多了一个 ctx:

func TestTask(t *testing.T) {
 go task(ctx)

 log.Print("req done")
}

func task() {
 // 模拟耗时任务
 time.Sleep(time.Second)
 log.Print("task done")
}
  • 没有 ctx 的时候, 因为主协程一直在, 子协程可以处理完任务在退出, 也就是子协程的生命周期都在主协程内
  • 有 ctx 的时候, 由于 ctx 的存在, 请求(request)中主协程需要接受 ctx 控制, 异步处理后, 请求也就结束了(上面log.Print("req done")模拟的部分), 这是 ctx 就会控制子协程一起结束掉, 也就是子协程的生命周期都在当前请求的协程内

于是, 又有了 2 种处理办法:

  • 简单做法, 就像上面一样, 没有 ctx, 就没有问题了嘛. 如果用一句话来概括这种方法: 面试官: 你可以回家等消息了
  • 既然又要执行异步任务, 又要有 trace, 那把 trace 继续传下, 用一个新的 ctx 就好了嘛

上代码:

  • 复制 ctx, 把 trace 继续传下去
package ctxkit

// Clone 复制 ctx 中对应 key 的值,移除父级 cancel。
func Clone(preCtx context.Context, keys ...interface{}) context.Context {
 newCtx := context.Background()

 // 从 pctx 开启一个子 span,来传递 traceId
 _, ospan := otel.GetTracerProvider().
  Tracer(trace_in.InstrumentationPrefix+"/ctxkit").
  Start(preCtx, "ctxkit.Clone", otel_trace.WithAttributes(
   trace_attr.AttrAsyncFlag.Int(1), // 标记为异步
  ))
 defer ospan.End()
 newCtx = trace.ContextWithSpan(newCtx, ospan)

 return ctxClone(newCtx, preCtx, keys...)
}

// CloneWithoutSpan  功能同 Clone,但不会创建 trace span,建议在大批数据 for 循环之前使用,避免 span 链路过长。
func CloneWithoutSpan(preCtx context.Context, keys ...interface{}) context.Context {
 tid := trace_in.GetOtelTraceId(preCtx)
 if tid == "" {
  tid = trace_in.FakeTraceId()
 }
 newCtx := context.WithValue(context.Background(), ictx.CtxKeyFakeTraceId, tid)
 return ctxClone(newCtx, preCtx, keys...)
}

func ctxClone(baseCtx, preCtx context.Context, keys ...interface{}) context.Context {
 for _, key := range _ctxKeys {
  if v := preCtx.Value(key); v != nil {
   baseCtx = context.WithValue(baseCtx, key, v)
  }
 }

 keys = append(keys, _strKeys...)
 for _, key := range keys {
  if v := preCtx.Value(key); v != nil {
   baseCtx = context.WithValue(baseCtx, key, v) //nolint
  }
 }

 return baseCtx
}
  • 实际使用
func TestTask(t *testing.T) {
 nexCtx := ctxkit.Clone(ctx)
 go task(newCtx)

 log.Print("req done")
}

func task() {
 // 模拟耗时任务
 time.Sleep(time.Second)
 log.Print("task done")
}

异步任务: 能否更优雅点

如果是从请求过来的, 请求中自带 trace, 并会在请求(request)的初始化的时候建 trace 写入到请求的 ctx 中, 那如果直接执行一个异步任务呢?

那就需要手动初始化 trace 了.

上代码:

  • 封装异步任务(job): 封装trace -> clone ctx -> 指标收集(jobMetricsWrap) -> 野生Goroutine捕获
package job

// AsyncJob 异步任务。
// name: 任务名。
// return: waitFunc,调用可以等待任务完成。
func AsyncJob(ctx context.Context, name string, fn func(ctx context.Context) error, opts ...Option) func() {
 ctx = tel_in.CtxAdjuster(ctx) // 初始化 trace
 newCtx := ctxkit.Clone(ctx)

wg := sync.WaitGroup{}
 wg.Add(1)
 go func() {
  defer wg.Done()

  // 指标收集
  jobMetricsWrap(newCtx, fn, applyOption(name, true, opts...))
 }()
 return wg.Wait
}
  • 实际使用:
func TestJob(t *testing.T) {
 ctx := context.Background()

 // 异步任务
 // 逻辑在协程中执行,已包装 recover 逻辑
 wait := job.AsyncJob(ctx, "your_task_name", func(ctx context.Context) error {
  // 内部处理使用传入的 ctx,已经执行过 citkit.Clone
  return doAsyncTask(ctx)
 })
 wait() // 如果需要等待任务结束则调用 wait,不需要则忽略返回值
}

func doAsyncTask(ctx context.Context) error {
 logs.InfoCtx(ctx, "async task done")
 return nil
}
=== RUN   TestJob
2022-11-18T10:18:39.014+0800 INFO tests/async_job_test.go:250 async task done {"traceId": "..."}
--- PASS: TestJob (0.00s)
PASS

PS: 这里需要查看效果, 所以调用了 wait() 等待异步任务结束, 实际使用可以直接使用 job.AsyncJob() 或者 _ = job.AsyncJob()

最后一起来看看 trace 使用的效果:

todo: img

Asynq: 专业异步任务框架

如果只是 异步一下, 上面讲解的内容也基本够用了; 如果有重度异步任务使用, 就得考虑专业的异步任务队列框架了, Go 中可以选择 Async

Features

整体架构图

todo

实际使用

使用的 demo 就不贴了, asynq 的文档很详细, 说一下具体实践中遇到的 2个 case:

  • 使用 web UI: 处于安全考虑, 设置了 ReadOnly
h := asynqmon.New(asynqmon.Options{
   RootPath:     "/monitoring", // RootPath specifies the root for asynqmon app
   RedisConnOpt: tasks.GetRedis(),
   ReadOnly:     true, // admin web can't operation
})

r := mux.NewRouter()
r.PathPrefix(h.RootPath()).Handler(h)

srv := &http.Server{
   Handler: r,
   Addr:    ":8080",
}

PS: 使用 web UI 由于涉及到使用新的端口, 而应用部署已经上 k8s 了, 如何顺利访问就需要一系列运维操作, 留个坑, 以后有机会再填

  • 测试环境OK, 线上报错: recoverer: could not move task to archive: INTERNAL_ERROR: redis eval error: ERR 'asynq:{}:t:' and 'asynq:{}:active' not in the same slot

对比发现, 是测试和线上使用的不同类型的 redis 实例导致的, 搜索云服务的帮助文档:

Redis实例类型差异

todo

集群架构实例的命令限制: 如需在集群架构实例中执行下述受限制的命令,请使用hash tag确保命令所要操作的key都分布在1个hash slot中

但是查看 asqnq 源码: 以 enqueue 操作为例, lua 操作中的部分 key 无法通过外部添加 hash tag

// github.com/hibiken/asynq/internal/rdb/rdb.go
// enqueueCmd enqueues a given task message.
//
// Input:
// KEYS[1] -> asynq:{<qname>}:t:<task_id>
// KEYS[2] -> asynq:{<qname>}:pending
// --
// ARGV[1] -> task message data
// ARGV[2] -> task ID
// ARGV[3] -> current unix time in nsec
//
// Output:
// Returns 1 if successfully enqueued
// Returns 0 if task ID already exists
var enqueueCmd = redis.NewScript(`
if redis.call("EXISTS", KEYS[1]) == 1 then
 return 0
end
redis.call("HSET", KEYS[1],
           "msg", ARGV[1],
           "state", "pending",
           "pending_since", ARGV[3])
redis.call("LPUSH", KEYS[2], ARGV[2])
return 1
`)

最终, 通过使用线上另一台主从版redis解决问题

写在最后

到这里, 工作用Go: 异步任务怎么写 就暂时告一段落了, 这个过程中:

  • 一些计算机基础概念的理解: 同步与异步, 异步与任务编排, 协程与异步, 协程与生命周期
  • 一些 Go 语言的基础知识以及基础不牢地动山摇的坑: 野生Goroutine, panic&recover
  • 可观测的实践之一: trace
  • 专业的异步任务框架 Asynq 以及踩坑记

一起拥抱变化, 直面问题和挑战, 不断精进, 我们下个话题再见👋🏻.

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,772评论 6 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,458评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,610评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,640评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,657评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,590评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,962评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,631评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,870评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,611评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,704评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,386评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,969评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,944评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,179评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,742评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,440评论 2 342

推荐阅读更多精彩内容

  • 目录 1.go 各种代码运行 2.go 在线编辑代码运行 3.通过 Gob 包序列化二进制数据 4.使用 ...
    杨言锡阅读 1,112评论 0 1
  • 前言 go 的 goroutine 提供了一种较线程而言更廉价的方式处理并发场景, go 使用二级线程的模式, 将...
    MrQ被抢注了阅读 976评论 1 9
  • 1、进程/线程/协程基本概念 一个进程可以有多个线程,一般情况下固定2MB内存块来做栈,用来保存当前被调用/挂起的...
    ddu_sw阅读 699评论 0 5
  • 协程机制 Golang 线程和协程的区别 备注:需要区分进程、线程(内核级线程)、协程(用户级线程)三个概念。 进...
    Jabir_Zhang阅读 521评论 0 1
  • 转载自:超详细的讲解Go中如何实现一个协程池 并发(并行),一直以来都是一个编程语言里的核心主题之一,也是被开发者...
    紫云02阅读 1,025评论 0 1