微服务开发中的问题和解决方法

问题

目标:

归纳一些常见的微服务开发问题, 分享一些通用设计方案, 并希望使用一些通用库解决微服务开发中的问题。

微服务中遇到的问题:

缓存
  • 缓存击穿造成雪崩:

    • 并发量大,同时访问数据库:

      缓存失效,同时击穿数据库,造成数据库雪崩(上游服务雪崩)

    • 空值击穿:

      查询缓存或者数据返回空值(key 对应数据不存在 或者访问失败),造成下次再次访问数据库,数据库不断击穿。

    • 缓存命中率的监控,异常监控。

  • 数据降级:

    • 在缓存访问空值或者失败的情况下,希望返回默认数据或者允许返回缓存超时数据。
异步任务

场景:特定条件触发爬虫。定时聚合各个服务的数据,轻量的计算任务。跨多个服务的分布式服务的事务。

异步任务需要满足的条件:

  • 异步执行,不同于请求的生命周期,需要在后台执行数十秒/分钟

  • 延时(定时)执行

  • 任务管理,可以重试恢复失败任务,即使服务器节点奔溃依然能保证任务。

  • 分布式调度,一个任务只需要跑在一台机器上

消息推送:

保证数据入库和消息发送的一致性:

  • 分布式系统中的消息发送通常情况下追求的是最少一次。

  • 先发送,再入库,可能数据存储失败,但是更新事件已经发送。

  • 数据先入库,再发送,但是发送如果失败,或者服务重启服务重试,消息可能丢失。

客户端(外部服务调用)
  • 数据库客户端数据监控,tracing, logging, metrics

  • 客户端熔断处理。

  • 客户端并发控制。

通用的简单的监控方式

  • 需要繁琐得添加日志,普罗米修斯监控,tracing. 每次hardcode效率低

goroutine 管理

  • 对于常驻后台的goroutine 缺失感知和管理的手段。
go func() {
    // long time run code ....
}
  • 对于go rounte 并行数量有控制的场景缺少并发/回收的控制。
分区管理

在很多情况下需要分区管理

  • 监听kafka 不同partition的消息。

  • 异步任务的调度,按照分区的特性。

  • 消息推送,需要分区执行,平衡吞吐。

  • 希望减少对外部组件的依赖。

代码式例

过程监控

对应需要监控一个特定的过程,使用一个通用的接口提供 log tracing metric 三种方式的监控。

// 对操作行为的记录,在操作之前生成一个未提交的记录,完成后提交记录
func ExampleBasicUse() {
    factory := EasyRecorders("test-record")

    func(ctx context.Context) {
        var err error
        recorder, ctx := factory.ActionRecorder(ctx, "do-something") // 生成一个记录
        defer func() {
            recorder.Commit(err, BoolField("remote err", true)) // 提交这个记录
        }()
        // err = doSomething(ctx)
    }(context.Background())
}

更加易于在拦截器中使用

func ExampleWrap() {
    factory := EasyRecorders("test-record")

    type operation func (ctx context.Context) error

    wrap := func(oper operation) operation {
        return func (ctx context.Context) error {
            var err error
            recorder, ctx := factory.ActionRecorder(ctx, "test-function") // 生成一个记录
            defer func() {
                recorder.Commit(err) // 提交这个记录
            }()
            err = oper(ctx)
            return err
        }
    }

    wrapped := wrap(func(ctx context.Context) error {
        // 真正的业务逻辑
        return errors.New("err occur")
    })

    wrapped(context.Background())
    return
}
缓存

缓存主要是围绕防止缓存击穿设计的,

  • 主要的模式是获取缓存数据 如果没有缓存数据/获取失败,则击穿缓存从数据库获取。
  • 行为上在从数据库获取数据的时候会block 所有相同key的请求,只有一个携程可以读取数据库内容,防止并行查询击穿。
  • 使用一个全局的击穿限流器(可选),当全局的击穿限流达到上限的时候会停止查询(会block 在限流器上,可以通过context控制超时)
  • 在限流或者访问缓存或者存储失败的情况下可以进行数据降级,返回默认的数据或者已经超时的数据。
func ExampleBasicUse() {
    var m = &sync.Map{}

    handler := ResourceHandler{
        ThroughLimit: rate.NewLimiter(rate.Every(100 *time.Millisecond), 10), // 全局的缓存击穿限流
        // 主要的功能
        FindFromCacheFunc: func(ctx context.Context, request Request) (*Resource, error) {
            // 从缓存中获取数据
            v, ok := m.Load(request.ID)
            if !ok { // 如果数据不存在则返回空
                return nil, nil
            }
            return &Resource{
                Data: v,
            }, nil
        },
        FetchFromStoreFunc: func(ctx context.Context, request Request) (*Resource, error) {
            // 从数据库(上游获取数据)
            // data = database.query()
            var data interface{}
            return &Resource{
                Data: data,
            }, nil
        },
        UpdateCacheFunc: func(ctx context.Context, request Request, res Resource) (*Resource, error) {
            // 跟新缓存数据
            m.Store(request.ID, res.Data)
            return &res, nil
        },

        // 辅助的工作函数
        ThroughOnCacheErrFunc: func(ctx context.Context, resource Request, err error) (goThrough bool) {
            // 缓存失效的时候是否需要击穿,全局的击穿限流依然保持
            if err == errors.New("good err") {
                return true
            }
            return false
        },
        Downgrade: func(ctx context.Context, resource Request, err error) (*Resource, error) {
            // 缓存失效的时候是否启用降级的数据
            if err == errors.New("good err") {
                return &Resource{
                    Data: "default value",
                }, nil
            }
            return nil, err
        },
    }

    factory := record.EasyRecorders("test-cache-record") // 记录器

    mid := NewRecorderMid(factory) // 插件 就是一个拦截器

    repository := NewRepository(handler, zap.L(), mid) // 生成一个repository对象

    ctx, _ := context.WithTimeout(context.Background(), time.Second)

    res, err := rep.Find(ctx, Request{
        ID: "1",
    })

    if err != nil {
        log.Println("find res is err")
        return
    }

    if res == nil {
        log.Println("can not found resource")
        return
    }

    log.Println("did get data:", res.Data)
}

任务管理
  • 任务管理核心需要解决的是任务的可恢复。
  • 核心的业务逻辑是由调度器接口实现的 Scheduler 负责触发任务,任务存储,在任务没有更新的情况下多次触发。
  • 调度器需要“持久化”任务,目前实现的是mongodb 版本和 memory 版本。mongodb 版本支持持久化任务。
  • 如果执行长时间没有响应,调度器会根据配置强制重新执行任务,这个机制确保任务的可恢复。如果使用mongodb 版本的调度器,服务重启之后依然能确保任务的再次执行。
  • 使用可以覆盖已经存在的任务,如果任务在运行,任务会被设法停止(cancel context),再执行。如果超过停止时间则直接执行覆盖任务流程。
func ExampleBasicUse() {
    // 任务调度器,负责触发任务,任务存储
    scheduler := NewMemoScheduler(time.Minute) 

    // 执行器,执行具体的业务逻辑
    executor := ExecutorFunc(func(ctx Context, res *Result) {
        // 执行业务逻辑的代码
        var err error
        // doSomething async
        if err == nil {
            res.Finish() //标记为结束, 不会再执行。
        } else {
            res.WaitAndReDo(3 * time.Minute) // 可能执行失败,等三分钟再执行。
        }
        return
    })

    mid := NewRecorderMiddle(record.EasyRecorders("task-record")) // 插件,用于监控任务变化。

    // 构建一个taskManager 任务管理器
    taskManager := NewManager(scheduler, executor, ManagerOption{
        MaxBusterTask: 10, // 同时并发执行的任务
        DefaultExecMaxDuration: time.Minute, //最大执行的任务的时间
        DefaultRecoverCount: 10, // 任务可以从失败中恢复的次数
        Middle: []Middle{mid}, // record mid
    })

    //具体如何执行一个任务的流程

    opt := Option{}. // 一个任务具体的选项
        SetOverLap(true). // 如果任务已经存在则可以覆盖这个任务. 会先尽量停止已有的任务,再开始新的任务
        SetExpectStartTime(time.Now().Add(time.Minute)) //定时执行,这个任务可以被延后执行

    // 声明一个新的任务
    err := taskManager.ApplyNewTask(context.Background(), "task-key-1", opt)
    if err != nil {
        log.Println("apply new task err:", err)
    }
}

消息投递

  • 消息投递主要需要解决的是入库和消息投递成功这个最终需要保持一周。只要broker接收之后就认为消息投递成功。
  • 解决方案,数据库更新将更新数据和推送事件在一个数据库事务/原子操作内入库,监控数据库日志(mongodb change stream)
    func (rep *Repository) saveResult(data Data, events []Event) error {}
  • 数据库日志的特点是在发送失败的情况下可以回滚,记录最后一次'logindex' 即可,使用'logindex'即可以保证数据不丢失。并且对业务代码没有侵入性。
  • 投递器会监控并拦截相关日志,获取到更新事件的具体数据。进行投递,如果投递成功则更新logindex。
func ExampleNotifierBasicUse() {
    // publisher 是一个推送的抽象接口
    var publisher Publisher = MockPublisher{
        PublishFunc: func(ctx context.Context, message []OutputMessage) error {
            log.Println("did push message")
            return nil
        },
    }

    // stream 可以中断并且重续,并从一个流节点开始,这个是内存版本的stream 也有基于mongodb change stream的
    var stream OutputStream = NewMemoMessageStream() 

    notifier := New(stream, publisher, Option{
        MaxBlockCount: 1, // 可以合并请求的发送数量
        MaxBlockDuration: time.Second, // 可以合并请求的发送时间
    })

    notifier.Start() //start 之后就会不断从 stream 对象中获取数据
    // add data into stream
    notifier.Close()
}

分区

  • 为了分区,对于无状态的服务来说,需要为每一个分区选择一个 master / leader。被选择作为主的节点就可以做这个分区的事情该做的事情。
  • 使用 Elector 抽象接口来解决竞选冲突。每个member 再一定的时间间隔内发起自己的竞选申请发送给Elector,Elector会裁决并决定哪个member是leader。并通过Elector异步返回竞选结果给所有的member,member 以此来判断自己是不是leader。
  • 如果member 确认自己被选为leader 之后,需要定时发送KeepLive 消息给Elector, Elector转交给所有的member,否则member如果在指定时间内没有收到keepLive消息就应该发起选举。
  • 目前 Elector 有mongodb 版本和内存版本,内存版本一般只用来测试。
  • 基于数据库的Elector是希望能够减少服务初期的接入门槛,一般微服务都会带一个数据库,使用数据库即可,但是这个是不建议作为分布式锁来使用的,分布式锁还是需要使用一致性协议实现的Elector。
func ExampleLeaderBasicUse() {
    var parts []*Partition
    for i := 0 ; i < 10; i ++ {
        member := newSimpleTestMember("node192.168.0.1")
        partition := NewPartition(PartitionID(i), member, nil)
        parts = append(parts, partition)
    }

    leaderGroup := NewLeaders(parts...)

    // SyncLeader 是同步leader, 当member 成为某一个分区leader的时候,会调用func匿名函数,如果失去leader 资格,context 会被关闭
    leaderGroup.SyncLeader(context.Background(), func(ctx context.Context, part *Partition) {
        // start notify kafka topic
        // start task manager
        // start push part message
    })
}

func newSimpleTestMember(nodeID string) *Member {
    // Elector 是一个选举裁决者的接口
    var elector Elector = NewMemoElector()

    // 生成选举数据
    var electionFactory ElectionFactory = &ConstElectionFactory{
        ConstID: nodeID, // 唯一表示当前节点的唯一id, 可以用ip container_id 随机数等等。
    }

    member := NewMember(elector, electionFactory, Option{
        MaxElectDuration: 2 * time.Second,
        MaxKeepLiveDuration: time.Second,
    })

    member.Start()

    // SyncLeader 是同步leader, 当member 成为leader的时候,会调用func匿名函数,如果失去leader 资格,context 会被关闭
    /*member.SyncLeader(context.Background(), func(ctx context.Context) {

    })*/
}

客户端代理
  • 主要解决调用外部服务中的一些问题。
  • 使用插件/拦截器的形式来实现熔断,过程监控等功能呢。
func ExampleBasicUse() {
    // agent 是一个接口,其对应的即是真实的client。
    var agent RecoverableAgent // = .....

    // 监控用插件
    recordMid := NewRecorderMiddle(record.EasyRecorders("client-test"))

    // 基础的限流插件
    breakerMid := NewBasicBreakerMiddle(
        rate.NewLimiter(rate.Every(time.Second), 10),
        agent,
        time.Second, // 错误限流后等待的最少时间
        3 * time.Second, // 错误限流后最长的等待时间
    )

    opt := Option{}.
        SetParallelCount(10). // 并发数量
        AddMiddle(breakerMid, recordMid)

    client := New(agent, opt)

    err := client.Do(context.Background(), func(ctx context.Context, agent Agent) error {
        var err error
        // dbClient := agent.(*DbClient)
        // dbClient.query() ...
        // dbClient.Update() ...
        // return err
        return err
    }, ActionOption{}.
        SetPartition(1)) // 设置分区id, 分区能保证的是相同的分区id同时只有一个在执行。

    if err != nil {
        log.Println("client do err:", err)
    }
}

未解决以上问题提供了一些基础库

https://github.com/feynman-go/workshop

  • 特点,抽象部分和外部服务相关的功能为接口,提高扩展性。
  • 功能点普遍可以使用非侵入式的方式添加 tracing / logging / metric 监控。
  • 尽量会考虑部署无状态服务,减少外部依赖。
  • 使用代理插件/拦截器 实现对现有代码的非侵入修改,提高可扩展性。
野gorounte管理

使用一个探针对象去控制和管理 goroutine

func ExampleBasicUse() {
    // 创建一个探针
    pb := New(func(ctx context.Context) {
        // do something cost long time
        select {
        case <- ctx.Done():
        case <- time.After(time.Minute):
        }
    })

    pb.Start()
    // do something else

    // stop will cancel context
    pb.Stop()

    // wait goroutine stop
    <- pb.Stopped()
}

可以使用 context 推出的互斥量
func ExampleBasicUse() {
    mx := &Mutex{}
    ctx, _ := context.WithTimeout(context.Background(), time.Second)
    if mx.Hold(ctx) { // other invoke will block in this method
        // do something
        mx.Release()
    }
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,491评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,856评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,745评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,196评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,073评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,112评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,531评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,215评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,485评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,578评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,356评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,215评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,583评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,898评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,174评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,497评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,697评论 2 335

推荐阅读更多精彩内容

  • ORA-00001: 违反唯一约束条件 (.) 错误说明:当在唯一索引所对应的列上键入重复值时,会触发此异常。 O...
    我想起个好名字阅读 4,970评论 0 9
  • Zookeeper用于集群主备切换。 YARN让集群具备更好的扩展性。 Spark没有存储能力。 Spark的Ma...
    Yobhel阅读 7,217评论 0 34
  • 今天看到一位朋友写的mysql笔记总结,觉得写的很详细很用心,这里转载一下,供大家参考下,也希望大家能关注他原文地...
    信仰与初衷阅读 4,720评论 0 30
  • Swift1> Swift和OC的区别1.1> Swift没有地址/指针的概念1.2> 泛型1.3> 类型严谨 对...
    cosWriter阅读 11,067评论 1 32
  • 关于《优势教练》,您最关心的话题新鲜出炉啦! 1. 什么是优势教练? 优势教练是以专业教练技术体系为基础,结合积极...
    AHa雅涵Ani阅读 3,808评论 0 4