Golang-配置化 pipeline

Unix 中的 pipeline,用于程序间的统一接口,示例:

tail -f biz.log | grep "ERROR" | grep "error_no=1234" | wc -l

借鉴这个设计思路,我们想设计一个 pipeline 模块,需要有以下功能:

  1. 抽象数据处理器(Handler)
  2. 一组 Handler 可以串行
  3. 一组 Handler 可以并行
  4. Handler 可配置化
  5. Handler 可被引用


    pipeline.png

下面我们就以 Golang 为例,尝试将 pipeline 的基本模型搭建出来:

接口 Handler

pipeline 中数据处理器的对外接口

// Args args for handler
type Args struct {
    InValue interface{}
    Params  map[string]interface{}
}

// Resp response for handler
type Resp struct {
    OutValue interface{}
    Params   map[string]interface{}
}

// Handler defines some one who can handle something
type Handler interface {
    Handle(ctx context.Context, args Args) (resp *Resp, err error)
}

Handler Builder

一个 Handler 需要一个 Builder 去实例化

// HandlerBuilder build a Handler with JSON conf
type HandlerBuilder interface {
    BuildHandlerByJSON(id string, confJSON string) (Handler, error)
}

Handler Option

用 JSON 配置文件实例化一个 Handler:

  1. 可以引用一个已存在的 Handler
  2. 也可以用一个 HandlerBuilder 构建一个 Handler
  3. 需要额外配置 Required, Timeout, DefaultValue
type Option struct {
    // ID ref of a existing handler
    ID string `json:"id"`

    // create a handler from pipe
    PipeName string          `json:"pipe_name"`
    PipeConf json.RawMessage `json:"pipe_conf"`

    // handler conf
    TimeOutMillisecond int64       `json:"time_out_millisecond"`
    Required           bool        `json:"required"`
    DefaultValue       interface{} `json:"default_value"`

    // Handler underlying handler
    Handler Handler `json:"handler"`
}

并行 Handler

  1. 一组 Handler,为每个 Handler 起一个 goroutine 并发执行。
  2. 每个 Handler 都配置了 Required/Timeout/DefaultValue,当一个必要的(Requried=true) Handler 超时或报错了,整体处理报错,反之则以 DefaultValue 作为响应。
  3. 这个 并发执行 本身也是一种处理器,即实现了 Handler 接口,可用被当做一个 Handler 用于其他处理流中。
type Handlers []pipeline.Option

func (handlers Handlers) Handle(ctx context.Context, args pipeline.Args) (resp *pipeline.Resp, err error) {
    // prepare params
    var (
        wg       sync.WaitGroup
        fatalErr error
        hValChan = make(chan struct {
            idx int
            val interface{}
            err error
        })
        respData = make([]interface{}, len(handlers))
    )
    // set wait number
    wg.Add(len(handlers))

    // start goroutines to handle
    for i, h := range handlers {
        go func(index int, handler pipeline.Option) {
            defer wg.Done()         
            // do handle ...
            // push response
            hValChan <- struct {
                idx int
                val interface{}
                err error
            }{idx: index, val: respResult.val, err: respResult.err}
            return
        }(i, h)
    }

    // wait for response
    wg.Wait()
    close(hValChan)

    // handle responses
    for resp := range hValChan {
        if resp.err != nil {
            item := handlers[resp.idx]
            if item.Required {
                fatalErr = resp.err
                break
            }

            log.Printf("handle err: handler_id=%v, err=%v", item.ID, err)
            respData[resp.idx] = item.DefaultValue
            continue
        }

        respData[resp.idx] = resp.val
    }

    // build response
    return &pipeline.Resp{
        OutValue: respData,
        Params:   args.Params,
    }, fatalErr
}

串行 Handler

  1. 一组 Handler,一个接一个地执行。
  2. 每个 Handler 都配置了 Required/Timeout/DefaultValue,即当一个必要的(Requried=true) Handler 超时或报错了,整体处理报错,反之则以 DefaultValue 作为响应。
  3. 这个 串行执行 本身也是一个处理器,即实现了 Handler 接口,可用作其他数据流的一个环节。
type Handlers struct {
    ID               string            `json:"id"`
    Handlers         []pipeline.Option `json:"handlers"`
}

func (handlers *Handlers) Handle(ctx context.Context, args pipeline.Args) (resp *pipeline.Resp, err error) {
    for step, h := range handlers.Handlers {
        inArgs := args

        if h.TimeOutMillisecond <= 0 {
            resp, err = h.Handler.Handle(ctx, args)
        } else {
            resp, err = handlers.stepWithTimeout(ctx, h, args)
        }

        if err != nil {
            if h.Required {
                return
            }

            log.Printf("line-handler failed: id=%v, step=%v, err=%v", handlers.ID, step, err)
            args = pipeline.Args{
                InValue: h.DefaultValue,
                Params:  inArgs.Params,
            }
            continue
        }

        args = pipeline.Args{
            InValue: resp.OutValue,
            Params:  resp.Params,
        }
    }

    return
}

// stepWithTimeout handles the args with timeout
func (handlers *Handlers) stepWithTimeout(ctx context.Context, h pipeline.Option, args pipeline.Args) (*pipeline.Resp, error) {
    hValChan := make(chan struct {
        resp *pipeline.Resp
        err  error
    })

    go func() {
        resp, err := h.Handler.Handle(ctx, args)
        hValChan <- struct {
            resp *pipeline.Resp
            err  error
        }{resp: resp, err: err}
    }()

    select {
    case <-time.After(time.Millisecond * time.Duration(h.TimeOutMillisecond)):
        return &pipeline.Resp{Params: args.Params}, errors.New("timeout: handler_id=" + h.ID)
    case r := <-hValChan:
        return r.resp, r.err
    }
}

源码

github.com/Focinfi/pipeline

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,033评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,725评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,473评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,846评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,848评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,691评论 1 282
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,053评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,700评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,856评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,676评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,787评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,430评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,034评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,990评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,218评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,174评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,526评论 2 343

推荐阅读更多精彩内容

  • 前奏 https://tech.meituan.com/2016/11/04/nio.html 综述 netty通...
    jiangmo阅读 5,842评论 0 13
  • 人不满足,才能成长 书籍:书都不会读,你还想成功 字数:748字 前两年,知道了一种植物,它的名字叫做多肉,这个...
    红木姑娘阅读 379评论 1 2
  • 如果树上挂满幸福 随手摘取 会有多少心酸凋落 给予希望经幡 一世阳光 苦痛 清澈湖水蓝天 云朵倒挂树梢 表白柔甜 ...
    2016冰山来客阅读 222评论 0 4
  • Dear baby: 原本想临近预产期再对你在肚纸里的9个多月做个总结,但今天的学校放假,让妈咪的心也好像...
    海笑空灵阅读 211评论 1 1
  • 大环境影响,门庭冷清,忙而无利烦郁。 烦郁不如转去。罢罢罢,自个㩕清, 索性偷懒闲书中,攀谈周孔寻庄公。书罢养花植...
    nomoreoo阅读 237评论 0 1