【译文】原文地址
在流处理当中,数据被分解成序列元素可以后续对大数据集进行处理。本文是关于Automi这个项目是一个Go包,支持开发者创建项目使用流处理方法来处理数据。
Automi Github地址https://github.com/vladimirvivien/automi
总体上,Automi提供如下特性:
- 使用Go语义来组合多个阶段数据流处理器
- 支持函数编程风格的操作
- 使用Go原生包作为数据source和sink(比如channel、slice等)。
- 自带数据转换和过滤操作
-
支持批处理操作(Sort、GroupBy、Sum等)
在开始之前,我们思考一个例子。数据源会产生字符到数据流中。随着时间推移,每个字符会随着流转移并被处理。首先用一个过滤器将不需要的字符移除掉。然后将字符映射成字符串,最后收集每个字符串。下图用一个工厂流水线来说明流及其相关操作。
接下来,我们看看如何使用Automi来实现上面的例子,用代码将字符导入到流中并处理每个字符。下面代码从切片中创建了一个数据流,然后将过滤和转换操作函数添加到流处理中。
func main() {
// 使用切片来创建一个流到数据源
strm := stream.New(
[]rune("B世!ぽ[@opqDQRS](http://twitter.com/opqDQRS)#$%^...O6PTnVWXѬYZbcef7ghijCklrAstvw"),
) // 过滤非大写字母字符
strm.Filter(func(item rune) bool {
return item >= 65 && item < (65+26)
}) // 字符转换为字符串
strm.Map(func(item rune) string {
return string(item)
}) //使用collectors将每个字符打印到输出台
strm.Into(collectors.Writer(os.Stdout)) // Open the stream
if err := <-strm.Open(); err != nil {
log.Fatal(err)
}
}
我们详细分析上面代码每一步都做了什么。
- str流对象实例是根据切片[]rune创建的。每个元素在流当中按顺序流出。
- str.Filter方法使用用户定义的匿名函数func(item rune) bool处理流中的各字符。当接收到大写字母就返回true,非大写字母字符就返回fasle,过滤掉不需要的字符。
- str.Map方法接收上一个过滤操作处理后的字符,并应用用户自定义的fun(item rune) string函数将字符转换成字符串,并将结果推送到下一步。
- 最后,stri.Into方法指定一个搜集器,对上一步操作的字符进行搜集。在这个例子中,字符被收集并写入到标准输出当中。
注意到目前这一步,只是将数据流申明好了,直到调用open函数才会启动流处理过程。执行流时,必须使用s t r.Open()方法来触发流启动并将定义的操作执行起来。
> go run .
BDQRSUEFGHIJKLMNOPTVWXYZCA
Automi概念
Automi使用的流水线管道,是通过Go channel来实现的,作为数据流的传递通道。如下所描述的,Automi使用了四个组件包括:stream流本身,数据发射器(emitter)产生数据到流当中,流操作器(operator)用于处理流中的每个对象,收集器(collectors)搜集流中的对象。
流
流代表可传输数据对象的管道。Automi内部使用Go channel作为管道来传送流中的数据对象。这意味着,流支持缓存、自动同步和并发安全等特性。
一个Automi流可以使用New函数来创建,需要传入一个发射器作为参数,或者一个可以封装成发射器等值。
strm := stream.New(<emitter or value>)
New函数返回的Stream类型,使用API来配置流,可不断地添加方法到这个流当中,如下所示:
func main() {
strm := stream.New([]rune(...))
strm.Filter(func(item rune) bool {
return item >= 65 && item < (65+26)
}).Map(func(item rune) string {
return string(item)
}).Into(collectors.Writer(os.Stdout))
...
}
发射器(emitter)
流是从发射器开始的,发射器可以从内存、网络或者文件中获取数据并读取当流。emitters包实现了如下发射器:
emitters.Channel
emitters.CSV
emitters.Reader
emitters.Scanner
emitters.Slice
如果上面的无法满足你的需求,你还可以创建自定义emitter,通过实现如下接口即可:
type Emitter interface {
GetOutput() <-chan interface{}
}
流操作器
一个流操作器是将流中的每个对象应用到对应的操作中。Automi使用Go支持的函数式编程特性允许开发者自定义操作器。
stream.Filter(func(row []string) bool {
count, err := strconv.Atoi(row[1])
if err != nil {
count = 0
}
return (count > 0)
})
Automi包含很多操作器函数,如下所示:
Stream.Filter
Stream.Map
Stream.FlatMap
Stream.Reduce
Stream.Process
Stream.GroupByKey
Stream.GroupByName
Stream.GroupByPos
Stream.Sort
Stream.SortByKey
Stream.SortByName
Stream.SortByPos
Stream.SortWith
Stream.Sum
Stream.SumByKey
Stream.SumByName
Stream.SumByPos
Stream.SumAllKeys
上面列出的很多操作器都支持用户自定义函数,其他的应用于配置、转换等功能到流中。
收集器(collector)
收集器可以是内存、网络、或文件资源,用于存放流中数据。Automi可以收集数据到如下定义的收集器中:
colelctors.CSV
collectors.Func
collectors.Null
collectors.Slice
collectors.Writer
如果你需要使用自定义的收集器,可以实现如下接口:
type Collector interface {
SetInput(<-chan interface{})
}
更多流处理例子
下面我们来探索更多的例子来展示Automi是如何使用的。
从channels中创建流
下面的例子展示了流数据是如何从channel中获得。如下代码所示,函数emitterFunc返回类型为<-chan time.Time对象,作为流的发射器参数传入流。然后将一个实现time.Time对象转换为字符串的流操作器应用到流中,最收集字符串到标准输出当中。
以下代码实现了上面的流处理:
func main() {
// emitterFunc returns a chan used for data
emitterFunc := func() <-chan time.Time {
times := make(chan time.Time)
go func() {
times <- time.Unix(100000, 0)
times <- time.Unix(2*100000, 0)
times <- time.Unix(4*100000, 0)
times <- time.Unix(8*100000, 0)
close(times)
}()
return times
}
strm := stream.New(emitterFunc())
strm.Map(func(item time.Time) string {
return item.String()
}).Into(collectors.Writer(os.Stdout))
if err := <-strm.Open(); err != nil {
fmt.Println(err)
return
}
}
源代码示例2
在前面的示例中,当打开流时,仅仅将收集的数据打印到标准输出。
HTTP请求流
Automi流可以从实现了io.reader接口到值当中读取数据,或使用任何实现了io.writer接口到值来收集数据。以下通过描述一个服务器通过http.Request.Body中读取字节切片到流中来说明,并使用base64转换收到的每个切片,然后将转换后的数据写入到http.Response中。
以下代码就是上面的实现:
func main() {
http.HandleFunc(
"/",
func(resp http.ResponseWriter, req *http.Request) {
resp.Header().Add("Content-Type", "text/html")
resp.WriteHeader(http.StatusOK)
strm := stream.New(req.Body)
strm.Process(func(data []byte) string {
return base64.StdEncoding.EncodeToString(data)
}).Into(resp)
if err := <-strm.Open(); err != nil {
resp.WriteHeader(http.StatusInternalServerError)
}
},
)
http.ListenAndServe(":4040", nil)
}
当服务器接收到“/”路径请求,会将req.Body内容注入到流当中,并应用用户定义操作方法来计算数据到base64编码,然后返回计算及过到resp中。
源代码示例3
使用CSVs创建流
下面的例子将展示流数据来源于CSV文件,并使用两个用户定义的操作器处理,最后将结果收集到另外一个CSV文件中。
以下代码展示上面的流功能。使用emitter.CSV来创建CSV发射器csvEmitter,将序列化每行内容到[]string并注入到流当中。Map函数应用一个操作器返回每行到前6个对象,Filter方法删除每行第二列(row[1])为0到数。
func main() {
csvEmitter := emitters.CSV("./stats.txt").
CommentChar('#').
DelimChar(',').
HasHeaders()
stream := stream.New(csvEmitter)
// select first 6 cols per row:
stream.Map(func(row []string) []string {
return row[:6]
})
// filter out rows with col[1] = 0
stream.Filter(func(row []string) bool {
count, err := strconv.Atoi(row[1])
if err != nil {
count = 0
}
return (count > 0)
})
stream.Into(collectors.CSV("./out.txt"))
if err := <-stream.Open(); err != nil {
fmt.Println(err)
return
}
}
当流被打开,会使用collects.CSV来收集流中的对象到另一个CSV文件中。
更多示例
在Github上还有很多用例:
- Batch — 批处理操作器用例(i.e. sort, sum, count, etc)
- Collectors — 不同收集器示例
- Emitters — 不同发射器组件示例
- Error handling —使用Automi流配置错误处理
- gRPC streaming — 从gRPC服务中读取数据到流中
- Logging — 在运行时记录日志时间流
- MD5 — implements the MD5 example from Sameer Ajmani pipeline post
- Network —网络流示例
- Wordcount — word count用例。