Automi一个GO流处理API


【译文】原文地址
在流处理当中,数据被分解成序列元素可以后续对大数据集进行处理。本文是关于Automi这个项目是一个Go包,支持开发者创建项目使用流处理方法来处理数据。
Automi Github地址https://github.com/vladimirvivien/automi

总体上,Automi提供如下特性:

  • 使用Go语义来组合多个阶段数据流处理器
  • 支持函数编程风格的操作
  • 使用Go原生包作为数据source和sink(比如channel、slice等)。
  • 自带数据转换和过滤操作
  • 支持批处理操作(Sort、GroupBy、Sum等)
    在开始之前,我们思考一个例子。数据源会产生字符到数据流中。随着时间推移,每个字符会随着流转移并被处理。首先用一个过滤器将不需要的字符移除掉。然后将字符映射成字符串,最后收集每个字符串。下图用一个工厂流水线来说明流及其相关操作。


    流处理说明

    接下来,我们看看如何使用Automi来实现上面的例子,用代码将字符导入到流中并处理每个字符。下面代码从切片中创建了一个数据流,然后将过滤和转换操作函数添加到流处理中。

func main() {
   // 使用切片来创建一个流到数据源
   strm := stream.New(
      []rune("B世!ぽ[@opqDQRS](http://twitter.com/opqDQRS)#$%^...O6PTnVWXѬYZbcef7ghijCklrAstvw"),
   ) // 过滤非大写字母字符
   strm.Filter(func(item rune) bool {
      return item >= 65 && item < (65+26)
   }) // 字符转换为字符串
   strm.Map(func(item rune) string {
      return string(item)
   }) //使用collectors将每个字符打印到输出台
   strm.Into(collectors.Writer(os.Stdout)) // Open the stream
  if err := <-strm.Open(); err != nil {
    log.Fatal(err)
  }
}

我们详细分析上面代码每一步都做了什么。

  • str流对象实例是根据切片[]rune创建的。每个元素在流当中按顺序流出。
  • str.Filter方法使用用户定义的匿名函数func(item rune) bool处理流中的各字符。当接收到大写字母就返回true,非大写字母字符就返回fasle,过滤掉不需要的字符。
  • str.Map方法接收上一个过滤操作处理后的字符,并应用用户自定义的fun(item rune) string函数将字符转换成字符串,并将结果推送到下一步。
  • 最后,stri.Into方法指定一个搜集器,对上一步操作的字符进行搜集。在这个例子中,字符被收集并写入到标准输出当中。
    注意到目前这一步,只是将数据流申明好了,直到调用open函数才会启动流处理过程。执行流时,必须使用s t r.Open()方法来触发流启动并将定义的操作执行起来。
> go run .
BDQRSUEFGHIJKLMNOPTVWXYZCA

Automi概念

Automi使用的流水线管道,是通过Go channel来实现的,作为数据流的传递通道。如下所描述的,Automi使用了四个组件包括:stream流本身,数据发射器(emitter)产生数据到流当中,流操作器(operator)用于处理流中的每个对象,收集器(collectors)搜集流中的对象。


Automi组件

流代表可传输数据对象的管道。Automi内部使用Go channel作为管道来传送流中的数据对象。这意味着,流支持缓存、自动同步和并发安全等特性。

一个Automi流可以使用New函数来创建,需要传入一个发射器作为参数,或者一个可以封装成发射器等值。

strm := stream.New(<emitter or value>)

New函数返回的Stream类型,使用API来配置流,可不断地添加方法到这个流当中,如下所示:

func main() {
  strm := stream.New([]rune(...))
  strm.Filter(func(item rune) bool {
     return item >= 65 && item < (65+26)
  }).Map(func(item rune) string {
    return string(item) 
  }).Into(collectors.Writer(os.Stdout))
  ...
}

发射器(emitter)

流是从发射器开始的,发射器可以从内存、网络或者文件中获取数据并读取当流。emitters包实现了如下发射器:

emitters.Channel
emitters.CSV
emitters.Reader
emitters.Scanner
emitters.Slice

如果上面的无法满足你的需求,你还可以创建自定义emitter,通过实现如下接口即可:

type Emitter interface { 
   GetOutput() <-chan interface{}
}

流操作器

一个流操作器是将流中的每个对象应用到对应的操作中。Automi使用Go支持的函数式编程特性允许开发者自定义操作器。

stream.Filter(func(row []string) bool {
   count, err := strconv.Atoi(row[1])
   if err != nil {
      count = 0
   }
   return (count > 0)
})

Automi包含很多操作器函数,如下所示:

Stream.Filter
Stream.Map
Stream.FlatMap
Stream.Reduce
Stream.Process
Stream.GroupByKey
Stream.GroupByName
Stream.GroupByPos
Stream.Sort
Stream.SortByKey
Stream.SortByName
Stream.SortByPos
Stream.SortWith
Stream.Sum
Stream.SumByKey
Stream.SumByName
Stream.SumByPos
Stream.SumAllKeys

上面列出的很多操作器都支持用户自定义函数,其他的应用于配置、转换等功能到流中。

收集器(collector)

收集器可以是内存、网络、或文件资源,用于存放流中数据。Automi可以收集数据到如下定义的收集器中:

colelctors.CSV
collectors.Func
collectors.Null
collectors.Slice
collectors.Writer

如果你需要使用自定义的收集器,可以实现如下接口:

type Collector interface { 
   SetInput(<-chan interface{})
}

更多流处理例子

下面我们来探索更多的例子来展示Automi是如何使用的。

从channels中创建流

下面的例子展示了流数据是如何从channel中获得。如下代码所示,函数emitterFunc返回类型为<-chan time.Time对象,作为流的发射器参数传入流。然后将一个实现time.Time对象转换为字符串的流操作器应用到流中,最收集字符串到标准输出当中。


从channel读数据到流

以下代码实现了上面的流处理:

func main() {
   // emitterFunc returns a chan used for data
   emitterFunc := func() <-chan time.Time {
      times := make(chan time.Time)
      go func() {
         times <- time.Unix(100000, 0)
         times <- time.Unix(2*100000, 0)
         times <- time.Unix(4*100000, 0)
         times <- time.Unix(8*100000, 0)
         close(times)
      }()
      return times
   }
   strm := stream.New(emitterFunc())
   strm.Map(func(item time.Time) string {
      return item.String()
   }).Into(collectors.Writer(os.Stdout))
   if err := <-strm.Open(); err != nil {
      fmt.Println(err)
      return
   }
}

源代码示例2
在前面的示例中,当打开流时,仅仅将收集的数据打印到标准输出。

HTTP请求流

Automi流可以从实现了io.reader接口到值当中读取数据,或使用任何实现了io.writer接口到值来收集数据。以下通过描述一个服务器通过http.Request.Body中读取字节切片到流中来说明,并使用base64转换收到的每个切片,然后将转换后的数据写入到http.Response中。


HTTP服务器从request.Body读取流数据

以下代码就是上面的实现:

func main() {
   http.HandleFunc(
      "/",
      func(resp http.ResponseWriter, req *http.Request) {
         resp.Header().Add("Content-Type", "text/html")
         resp.WriteHeader(http.StatusOK)
         strm := stream.New(req.Body)
         strm.Process(func(data []byte) string {
            return base64.StdEncoding.EncodeToString(data)
         }).Into(resp)
         if err := <-strm.Open(); err != nil {
            resp.WriteHeader(http.StatusInternalServerError)
         }
      },
   )
   http.ListenAndServe(":4040", nil)
}

当服务器接收到“/”路径请求,会将req.Body内容注入到流当中,并应用用户定义操作方法来计算数据到base64编码,然后返回计算及过到resp中。

源代码示例3

使用CSVs创建流

下面的例子将展示流数据来源于CSV文件,并使用两个用户定义的操作器处理,最后将结果收集到另外一个CSV文件中。



以下代码展示上面的流功能。使用emitter.CSV来创建CSV发射器csvEmitter,将序列化每行内容到[]string并注入到流当中。Map函数应用一个操作器返回每行到前6个对象,Filter方法删除每行第二列(row[1])为0到数。

func main() {
   csvEmitter := emitters.CSV("./stats.txt").
      CommentChar('#').
      DelimChar(',').
      HasHeaders()
   stream := stream.New(csvEmitter)
   
   // select first 6 cols per row:
   stream.Map(func(row []string) []string {
      return row[:6]
   })
 
   // filter out rows with col[1] = 0
   stream.Filter(func(row []string) bool {
      count, err := strconv.Atoi(row[1])
      if err != nil {
         count = 0
      }
      return (count > 0)
   })
   stream.Into(collectors.CSV("./out.txt"))
   if err := <-stream.Open(); err != nil {
      fmt.Println(err)
      return
   }
}

当流被打开,会使用collects.CSV来收集流中的对象到另一个CSV文件中。

更多示例

在Github上还有很多用例:

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,214评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,307评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,543评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,221评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,224评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,007评论 1 284
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,313评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,956评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,441评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,925评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,018评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,685评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,234评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,240评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,464评论 1 261
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,467评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,762评论 2 345

推荐阅读更多精彩内容