Flink之数据流类型

Flink为流处理和批处理分别提供了DataStream API和DataSet API。在开发工作中这些API极大的便利了开发者开发大数据应用。DataStream 在经过Transformation之后,会随之生成相应的数据流类型。比如:KeyedStream,JoinedStreams,CoGroupedStreams等等。但这些数据流类型之间是如何通过转换联系在一起的?。下文☞☞☞☞☞

流之间转换

图1

DataStream

DataStream 是Flink 流处理API中最核心的数据结构。他代表了一个运行在多个分区上的并行流。一个DataStream可以从StreamExecutionEnvironment 通过env.addSource(SourceFunction)获得,可以但是不限于这一种方式。

DataStream上的转换操作都是逐条的,比如map(),flatMap(),filter()。DataStream 也可以执行rebalance(再平衡,用来减轻数据倾斜) 和broadcaseted(广播)等分区转换。

val stream: DataStream[MyType] = env.addSource(new FlinkKafkaConsumer011[String](...)).setParallelism(3)
val str1: DataStream[(String, MyType)] = stream.flatMap { ... }
val str2: DataStream[(String, MyType)] = stream.rebalance()
val str3: DataStream[AnotherType] = stream.map { ... }

上面给出的DataStream代码块在运行时会转换成如下执行图:

图2

如上图的执行图所示,DataStream各个算子会并行运行,算子之间是数据流分区。如Source的第一个并行实例(S1)和flatMap()的第一个并行实例(f1)之间就是一个数据流分区。而flatMap()和map()之间犹豫加了rebalance(),它们之间的数据流分区就有3个自分区(f1的数据流向map()实例)。这与Apache Kafka是很类似的。把流想象成Kafka Topic,而一个流分区就表示一个Topic Partition,流的目标并行算子实例就是Kafka Consumers。

KeyedStream

KeyedStream用来表示根据指定的key进行分组的数据流。一个KeyStream可以通过调用DataStream.keyBy()来获得。而再KeyedStream上进行任何transformation都将转变回DataStream。而实现中,KeyStream是把key的信息写入到了transformation中。每条记录只能访问所属key的状态,其上的聚合函数可以方便地操作和保存对应key的状态。

WindowedStream & AllWindowedStream

WindowedStream代表了根据key分组,并且基于WindowAssigner分割窗口的数据流。所以WindowedStream都是从KeyStream衍生而来。而在WindowedStream上进行任何transformation也都将转变回DataStream。

val stream: DataStream[MyType] = ...
val windowed: WindowedDataStream[MyType] = stream
        .keyBy("userId")
        .window(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data
val result: DataStream[ResultType] = windowed.reduce(myReducer)

上述WindowedStream的样例代码在运行时会转换成如下的执行图:

图3

Flink的窗口实现中会将到达的数据缓存再对应的窗口buffer中(一个数据可能会对应多个窗口)。当到达窗口发送的条件时(由Trigger控制),Flink会对整个窗口中的数据进行处理。Flink在聚合类窗口有一定的优化,即不会保存窗口中的所有值,而是得到一个元素执行一次聚合函数,最终只会保存一份数据即可。

在key分组的流上进行窗口切分是比较常用的场景,也能够很好地并行化(不同的key上的窗口聚合可以分配到不同的task上去处理)。不过有时候我们也需要再普通流上进行窗口的操作,这就是AllWindowedStream。AllWindowedStream是直接在DataStream上进行windowAll(...)操作。AllWindowedStream的实现是基于WindowedStream的。Flink不推荐使用AllWindowedStream,因为在普通流上进行窗口操作,就势必需要将所有的分区的流都汇集到单个task中,而这个单个的Task很显然就会成为整个Job的瓶颈。

JoinedStream & CoGroupedStreams

双流Join也是一个非常常见的。在源码中可以发现JoinedStream和CoGroupedStreams的代码实现大部分都是一样的,JoinedStreams在底层又调用了CoGroupedStream来实现Join功能。除了名字不一样,一开始很难将他们区分开,而且为什么要提供两个功能类似的接口呢?

实际上着两者还是有区别的,首先co-group侧重的是group,是对同一个key上的两组集合进行操作,而join侧重的是pair,是对同一个key上的每对元素操作。co-group比join更通用一些,因为join只是co-group的一个特例,所以join是可以基于co-group来实现的(当然有优化的空间)。而在co-group之外又提供了join接口是因为用户更熟悉join,而且能够跟DataSet API保持一致,降低用户的学习成本。

JoinedStreams和CoGroupedStreams是基于Window上实现的,所以CoGroupStreams最终又调用了WindowedStream来实现。

val firstInput: DataStream[MyType] = ...
val secondInput: DataStream[AnotherType] = ...
 
val result: DataStream[(MyType, AnotherType)] = firstInput.join(secondInput)
    .where("userId").equalTo("id")
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply (new JoinFunction () {...})

上述JoinedStreams的样例代码在运行时会转变成如下的执行图:

图4

双流上的数据在同一个key会被分别分配到同一个Window窗口的左右两个篮子里,当window结束的时候,会对左右篮子进行笛卡尔积从而得到每一对pair,对每一对pair应用JoinFunction。目前join窗口的双流数据都是被缓存在内存中的,也就是说如果某个key上的窗口数据太多就会导致JVM OOM(然而数据倾斜是常态)。双流join的难点也正是这里,这也是社区对join操作的优化方向,列入可以借鉴Flink在批处理join中的优化方案,也可以用ManagedMemory来管理窗口中的数据,并当数据超过阀值时能spill到磁盘上。

ConnectedStreams

在DataStream上有一个union的转换 dataStream.union(otherStream1,otherStream2,...),用来合并多个流,新的流会包含所有流中的数据。union有一个限制,就是所有合并的流的类型必须是一致的。ConnectedStreams提供了和union类似的功能,用来连接两个流,但是与union转换有以下几个区别:
①.ConnectedStreams只能连接两个流,而union可以多余两个流;
②.ConnectedStreams连接的两个流类型可以不一致,而union连接的流的类型必须一致;
③.ConnectedStreams会对两个流的数据应用不同的处理方法,并且双流之间可以共享状态。这再第一个流的输入会影响第二流时,会非常有用。

如下ConnectedStreams的样例,连接input和other流,并在input流上应用map1方法,在other上应用map2方法,双流可以共享状态(比如计数)。

val input: DataStream[MyType] = ...
val other: DataStream[AnotherType] = ...
 
val connected: ConnectedStreams[MyType, AnotherType] = input.connect(other)
 
val result: DataStream[ResultType] = 
        connected.map(new CoMapFunction[MyType, AnotherType, ResultType]() {
            override def map1(value: MyType): ResultType = { ... }
            override def map2(value: AnotherType): ResultType = { ... }
        })

当并行度为2时,其执行图如下所示:

图5
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,684评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,143评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,214评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,788评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,796评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,665评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,027评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,679评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,346评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,664评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,766评论 1 331
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,412评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,015评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,974评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,073评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,501评论 2 343