在响应应用请求的过程中, 有时候会遇到比较耗时的任务, 比如给用户发送邮件, 耗时任务时间不可能控, 很可能超过 1s, 为了给用户比较好的体验, 一般会控制请求响应时间(RT, response time)在300ms内(不考虑网络波动), 甚至在 200ms 内. 面对这样的工作场景, 就需要使用异步任务进行处理.
Go协程与异步
从一段简单的代码开始:
func TestTask(t *testing.T) {
task()
log.Print("req done")
}
func task() {
// 模拟耗时任务
time.Sleep(time.Second)
log.Print("task done")
}
- 代码在 Goland 中编写, 同时也推荐使用 Goland 进行 Go 开发
- 这里使用单测(
test
)演示代码:- 输入
test
就可以快速生成代码(Goland 中称之为live templates
, 其实就是预设好的代码片段) - 在单测点击可以执行: 1. 点击左侧(gutter icon)的运行图标; 2. 函数上右键菜单键; 3. 快捷键
ctl-shift-R
- 输入
上面使用 task()
模拟耗时 1s 的任务, 整个test代表一次请求, 执行如下:
=== RUN TestTask
2022/11/17 20:11:15 task done
2022/11/17 20:11:15 req done
--- PASS: TestTask (1.00s)
PASS
Go基础知识: 天生并发, 使用
go
关键字就可以开新协程, 将代码放到新协程中执行
func TestTask(t *testing.T) {
go task()
log.Print("req done")
}
func task() {
// 模拟耗时任务
time.Sleep(time.Second)
log.Print("task done")
}
- 只需要在
task()
前添加go
关键字, 就可以新开一个协程, 将task()
在新协程中执行
不过在这里, 并没有得到预期的结果:
=== RUN TestTask
2022/11/17 20:16:08 req done
--- PASS: TestTask (0.00s)
PASS
- 输出显示:
task()
中的日志没有输出, 看起来像没有执行
Go基础知识: Go的代码都在协程中执行, 入口
main()
函数是主协程, 之后使用go
关键词开的协程都是子协程, 主协程退出后, 程序会终止(exit)
也就是说上面的 TestTask()
(主协程) 和 go task()
(子协程)都执行了, 但是主协程执行完, 程序退出了, 子协程没执行完(或者没调度到), 就被强制退出了
简单 Go 并发: 任务编排
上面的例子, 常见有 3 种解决方案:
- 方案1: 等子协程执行完
func TestTask(t *testing.T) {
go task()
time.Sleep(time.Second) // 等待子协程执行完
log.Print("req done")
}
func task() {
// 模拟耗时任务
time.Sleep(time.Second)
log.Print("task done")
}
- 方案2: 使用
WaitGroup
func TestTask(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)
go func() {
task()
wg.Done()
}()
wg.Wait()
log.Print("req done")
}
func task() {
// 模拟耗时任务
time.Sleep(time.Second)
log.Print("task done")
}
WaitGroup
其实很好理解, 就是同时等待一组任务完成, 它分为 3 步: 1. Add
: 总共有多少任务; 2. Done()
: 表示任务执行完; 3. Wait()
: 等待所有任务完成
- 方案3: 使用 Go 的并发语言
chan
func TestTask(t *testing.T) {
ch := make(chan struct{}) // 初始化 chan
go func() {
task()
ch <- struct{}{} // 发送到 chan
}()
<-ch // 从 chan 获取
log.Print("req done")
}
func task() {
// 模拟耗时任务
time.Sleep(time.Second)
log.Print("task done")
}
Go基础知识: 通过
chan T
就可以申明T
类型的 chan, 供协程间进行通信;struct{}
是 Go 中0 memory use
(0内存占用)类型, 适合上面使用 chan 进行 控制 而不需要 数据 进行通信的情况
虽然只是3个简单的 demo code, Go 提供的 2 种并发能力都有展示:
- 传统并发原语: 大部分集中在
sync
包下, 上面案例2中的sync.WaitGroup
就是其中之一 - Go 基于 CSP 的并发编程范式: 包括
go chan select
, 上面的案例3中展示了go+chan
的基本用法
简单 Go 并发讲完了, 那任务编排又是啥? 其实, 某等程度上, 任务编排=异步, 任务需要 分工 完成时, 也就是一个任务相对于另一个任务需要 异步处理. 而任务编排, 恰恰是 Go 语言中基于 chan 进行并发编程的强项.
Go 中有一个大的方向,就是任务编排用 Channel,共享资源保护用传统并发原语。
回到最初的代码, 在实际使用的使用, 到底使用的是哪种方案呢? 答案是 方案1. 看看接近真实场景的代码
func TestTrace(t *testing.T) {
for { // 服务以 daemon 的方式持续运行
// 不断处理用户的请求
{
go task()
log.Print("req done")
}
}
}
func task() {
// 模拟耗时任务
time.Sleep(time.Second)
log.Print("task done")
}
也就是真实场景下, 主协程所在的 server 会一直常驻, 请求(request)所有的子协程不用担心还没执行完就被强制退出了
避坑: 野生 Goroutine
在继续讲解之前, 一定要提一下使用 go
开协程的一个坑, 或者说一个非常重要的基础知识:
Go基础知识: panic只对当前goroutine的defer有效
Go中出现 panic()
, 程序会立即终止:
func TestPanic(t *testing.T) {
panic("panic")
log.Print("end")
}
=== RUN TestPanic
--- FAIL: TestPanic (0.00s)
panic: panic [recovered]
panic: panic
goroutine 118 [running]:
testing.tRunner.func1.2({0x103e15940, 0x10405c208})
/opt/homebrew/opt/go/libexec/src/testing/testing.go:1396 +0x1c8
testing.tRunner.func1()
/opt/homebrew/opt/go/libexec/src/testing/testing.go:1399 +0x378
panic({0x103e15940, 0x10405c208})
/opt/homebrew/opt/go/libexec/src/runtime/panic.go:884 +0x204
...
...
testing.tRunner(0x14000603040, 0x104058678)
/opt/homebrew/opt/go/libexec/src/testing/testing.go:1446 +0x10c
created by testing.(*T).Run
/opt/homebrew/opt/go/libexec/src/testing/testing.go:1493 +0x300
Process finished with the exit code 1
- 可以看到,
panic
后程序直接退出,panic
后的log.Print("end")
并没有执行
当然, 想要程序健壮一些, panic
是可以 吃掉
的:
func TestPanic(t *testing.T) {
defer func() {
if r := recover(); r != nil {
log.Print(r)
}
}()
panic("panic")
log.Print("end")
}
=== RUN TestPanic
2022/11/17 22:25:08 panic
--- PASS: TestPanic (0.00s)
PASS
使用 recover()
对 panic()
进行恢复, 程序就不会崩掉(exit)
但是, 一定要注意
panic只对当前goroutine的defer有效!
panic只对当前goroutine的defer有效!
panic只对当前goroutine的defer有效!
重要的事情说三遍.
func TestPanic(t *testing.T) {
defer func() {
if r := recover(); r != nil {
log.Print(r)
}
}()
go func() {
panic("panic")
}()
log.Print("end")
}
=== RUN TestPanic
panic: panic
goroutine 88 [running]:
...
...
...
Process finished with the exit code 1
而 go 里面开协程又是如此的方便, 简单一个 go
关键字即可, 所以大家给这种情况起了个外号: 野生 Goroutine. 最简单的做法就是对协程进行一次封装, 比如这样:
package gox
// Run start with a goroutine
func Run(fn func()) {
go func() {
defer func() {
if r := recover(); r != nil {
log.Print(r)
}
}()
fn()
}()
}
原本的 go task()
, 使用 gox.Run(task)
进行替换, 就可以 task 出现 panic 的时候, 程序还能恢复
Trace: 异步任务还能进行链路追踪么?
随着可观测技术的不断演进, 基建上的不断提升, 链路追踪技术也进行了演进
- trace1.0: opentracing jaeger
- trace2.0: otel
当用户请求进来时, 可以通过 traceId
串联起用户的完成调用链, 监控和排查问题能力大大增强!
{
"code": 200,
"status": 200,
"msg": "成功",
"errors": null,
"data": "env-t0",
"timestamp": 1668696256,
"traceId": "..."
}
trace 通过请求(request)中的 context
, 不断向下传递, 从而将当前请求的所用调用通过同一个 traceId 串联起来
func TestTrace(t *testing.T) {
Op1(ctx) // 比如操作了 DB
Op2(ctx) // 比如操作了 cache
Task(ctx)
log.Print("req done")
}
func Task(ctx context.Context) {
// 使用自定义span, 将当前操作上报到trace
_, span := otel.GetTracerProvider().Tracer("task").Start(ctx, "xxxTask")
defer span.End()
// 模拟耗时任务
time.Sleep(time.Second)
log.Print("task done")
}
如同上面演示的 demo code 演示:
- 通过
ctx
, 将当前请求(request)的所有操作使用同一个 traceId 串起来 - otel 默认了很多操作的 trace 上报, 比如 mysql/redis/kafka 等等, 也可以使用自定义
span
的方式进行新增
如果要进行耗时任务异步处理, 直觉上直接 go
一下:
func TestTrace(t *testing.T) {
Op1(ctx) // 比如操作了 DB
Op2(ctx) // 比如操作了 cache
go Task(ctx)
log.Print("req done")
}
这时候脑海中陡然蹦出一个声音: 野生Goroutine
func TestTrace(t *testing.T) {
Op1(ctx) // 比如操作了 DB
Op2(ctx) // 比如操作了 cache
gox.RunCtx(ctx, Task) // 在 gox.Run 的基础上, 添加 ctx 支持
log.Print("req done")
}
可是等测试一下, 就会发现, task()
并没有执行!
细心的小伙伴就会发现, 这和开始的例子有点像呀, 而且对比下就会知道, 此处多了一个 ctx
:
func TestTask(t *testing.T) {
go task(ctx)
log.Print("req done")
}
func task() {
// 模拟耗时任务
time.Sleep(time.Second)
log.Print("task done")
}
- 没有 ctx 的时候, 因为主协程一直在, 子协程可以处理完任务在退出, 也就是子协程的生命周期都在主协程内
- 有 ctx 的时候, 由于 ctx 的存在, 请求(request)中主协程需要接受 ctx 控制, 异步处理后, 请求也就结束了(上面
log.Print("req done")
模拟的部分), 这是 ctx 就会控制子协程一起结束掉, 也就是子协程的生命周期都在当前请求的协程内
于是, 又有了 2 种处理办法:
- 简单做法, 就像上面一样, 没有 ctx, 就没有问题了嘛. 如果用一句话来概括这种方法:
面试官: 你可以回家等消息了
- 既然又要执行异步任务, 又要有 trace, 那把 trace 继续传下, 用一个新的 ctx 就好了嘛
上代码:
- 复制 ctx, 把 trace 继续传下去
package ctxkit
// Clone 复制 ctx 中对应 key 的值,移除父级 cancel。
func Clone(preCtx context.Context, keys ...interface{}) context.Context {
newCtx := context.Background()
// 从 pctx 开启一个子 span,来传递 traceId
_, ospan := otel.GetTracerProvider().
Tracer(trace_in.InstrumentationPrefix+"/ctxkit").
Start(preCtx, "ctxkit.Clone", otel_trace.WithAttributes(
trace_attr.AttrAsyncFlag.Int(1), // 标记为异步
))
defer ospan.End()
newCtx = trace.ContextWithSpan(newCtx, ospan)
return ctxClone(newCtx, preCtx, keys...)
}
// CloneWithoutSpan 功能同 Clone,但不会创建 trace span,建议在大批数据 for 循环之前使用,避免 span 链路过长。
func CloneWithoutSpan(preCtx context.Context, keys ...interface{}) context.Context {
tid := trace_in.GetOtelTraceId(preCtx)
if tid == "" {
tid = trace_in.FakeTraceId()
}
newCtx := context.WithValue(context.Background(), ictx.CtxKeyFakeTraceId, tid)
return ctxClone(newCtx, preCtx, keys...)
}
func ctxClone(baseCtx, preCtx context.Context, keys ...interface{}) context.Context {
for _, key := range _ctxKeys {
if v := preCtx.Value(key); v != nil {
baseCtx = context.WithValue(baseCtx, key, v)
}
}
keys = append(keys, _strKeys...)
for _, key := range keys {
if v := preCtx.Value(key); v != nil {
baseCtx = context.WithValue(baseCtx, key, v) //nolint
}
}
return baseCtx
}
- 实际使用
func TestTask(t *testing.T) {
nexCtx := ctxkit.Clone(ctx)
go task(newCtx)
log.Print("req done")
}
func task() {
// 模拟耗时任务
time.Sleep(time.Second)
log.Print("task done")
}
异步任务: 能否更优雅点
如果是从请求过来的, 请求中自带 trace, 并会在请求(request)的初始化的时候建 trace 写入到请求的 ctx 中, 那如果直接执行一个异步任务呢?
那就需要手动初始化 trace 了.
上代码:
- 封装异步任务(job): 封装trace -> clone ctx -> 指标收集(jobMetricsWrap) -> 野生Goroutine捕获
package job
// AsyncJob 异步任务。
// name: 任务名。
// return: waitFunc,调用可以等待任务完成。
func AsyncJob(ctx context.Context, name string, fn func(ctx context.Context) error, opts ...Option) func() {
ctx = tel_in.CtxAdjuster(ctx) // 初始化 trace
newCtx := ctxkit.Clone(ctx)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
// 指标收集
jobMetricsWrap(newCtx, fn, applyOption(name, true, opts...))
}()
return wg.Wait
}
- 实际使用:
func TestJob(t *testing.T) {
ctx := context.Background()
// 异步任务
// 逻辑在协程中执行,已包装 recover 逻辑
wait := job.AsyncJob(ctx, "your_task_name", func(ctx context.Context) error {
// 内部处理使用传入的 ctx,已经执行过 citkit.Clone
return doAsyncTask(ctx)
})
wait() // 如果需要等待任务结束则调用 wait,不需要则忽略返回值
}
func doAsyncTask(ctx context.Context) error {
logs.InfoCtx(ctx, "async task done")
return nil
}
=== RUN TestJob
2022-11-18T10:18:39.014+0800 INFO tests/async_job_test.go:250 async task done {"traceId": "..."}
--- PASS: TestJob (0.00s)
PASS
PS: 这里需要查看效果, 所以调用了
wait()
等待异步任务结束, 实际使用可以直接使用job.AsyncJob()
或者_ = job.AsyncJob()
最后一起来看看 trace 使用的效果:
todo: img
Asynq: 专业异步任务框架
如果只是 异步一下, 上面讲解的内容也基本够用了; 如果有重度异步任务使用, 就得考虑专业的异步任务队列框架了, Go 中可以选择 Async
Features
- Guaranteed at least one execution of a task
- Scheduling of tasks
- Retries of failed tasks
- Automatic recovery of tasks in the event of a worker crash
- Weighted priority queues
- Strict priority queues
- Low latency to add a task since writes are fast in Redis
- De-duplication of tasks using unique option
- Allow timeout and deadline per task
- Allow aggregating group of tasks to batch multiple successive operations
- Flexible handler interface with support for middlewares
-
Ability to pause queue
to stop processing tasks from the queue - Periodic Tasks
- Support Redis Cluster for automatic sharding and high availability
- Support Redis Sentinels for high availability
- Integration with Prometheus to collect and visualize queue metrics
-
Web UI
to inspect and remote-control queues and tasks -
CLI
to inspect and remote-control queues and tasks
整体架构图
todo
实际使用
使用的 demo 就不贴了, asynq 的文档很详细, 说一下具体实践中遇到的 2个 case:
- 使用
web UI
: 处于安全考虑, 设置了ReadOnly
h := asynqmon.New(asynqmon.Options{
RootPath: "/monitoring", // RootPath specifies the root for asynqmon app
RedisConnOpt: tasks.GetRedis(),
ReadOnly: true, // admin web can't operation
})
r := mux.NewRouter()
r.PathPrefix(h.RootPath()).Handler(h)
srv := &http.Server{
Handler: r,
Addr: ":8080",
}
PS: 使用
web UI
由于涉及到使用新的端口, 而应用部署已经上 k8s 了, 如何顺利访问就需要一系列运维操作, 留个坑, 以后有机会再填
- 测试环境OK, 线上报错:
recoverer: could not move task to archive: INTERNAL_ERROR: redis eval error: ERR 'asynq:{}:t:' and 'asynq:{}:active' not in the same slot
对比发现, 是测试和线上使用的不同类型的 redis 实例导致的, 搜索云服务的帮助文档:
todo
集群架构实例的命令限制: 如需在集群架构实例中执行下述受限制的命令,请使用hash tag确保命令所要操作的key都分布在1个hash slot中
但是查看 asqnq 源码: 以 enqueue
操作为例, lua 操作中的部分 key 无法通过外部添加 hash tag
// github.com/hibiken/asynq/internal/rdb/rdb.go
// enqueueCmd enqueues a given task message.
//
// Input:
// KEYS[1] -> asynq:{<qname>}:t:<task_id>
// KEYS[2] -> asynq:{<qname>}:pending
// --
// ARGV[1] -> task message data
// ARGV[2] -> task ID
// ARGV[3] -> current unix time in nsec
//
// Output:
// Returns 1 if successfully enqueued
// Returns 0 if task ID already exists
var enqueueCmd = redis.NewScript(`
if redis.call("EXISTS", KEYS[1]) == 1 then
return 0
end
redis.call("HSET", KEYS[1],
"msg", ARGV[1],
"state", "pending",
"pending_since", ARGV[3])
redis.call("LPUSH", KEYS[2], ARGV[2])
return 1
`)
最终, 通过使用线上另一台主从版redis解决问题
写在最后
到这里, 工作用Go: 异步任务怎么写
就暂时告一段落了, 这个过程中:
- 一些计算机基础概念的理解: 同步与异步, 异步与任务编排, 协程与异步, 协程与生命周期
- 一些 Go 语言的基础知识以及基础不牢地动山摇的坑: 野生Goroutine, panic&recover
- 可观测的实践之一: trace
- 专业的异步任务框架 Asynq 以及踩坑记
一起拥抱变化, 直面问题和挑战, 不断精进, 我们下个话题再见👋🏻.