flink学习之八-keyby&reduce

上文学习了简单的map、flatmap、filter,在这里开始继续看keyBy及reduce

keyBy

先看定义,通过keyBy,DataStream→KeyedStream。

逻辑上将流分区为不相交的分区。具有相同Keys的所有记录都分配给同一分区。在内部,keyBy()是使用散列分区实现的。指定键有不同的方法。

此转换返回KeyedStream,其中包括使用被Keys化状态所需的KeyedStream

dataStream.keyBy("someKey") // Key by field "someKey"
dataStream.keyBy(0) // Key by the first element of a Tuple(数组)
    

注意 如果出现以下情况,则类型不能成为关键

  1. 它是POJO类型但不覆盖hashCode()方法并依赖于Object.hashCode()实现。
  2. 它是任何类型的数组。

看段代码:

public class KeyByTestJob {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
        env.fromElements(Tuple2.of(2L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(2L, 4L), Tuple2.of(1L, 2L))
                .keyBy(0) // 以数组的第一个元素作为key
                .map((MapFunction<Tuple2<Long, Long>, String>) longLongTuple2 -> "key:" + longLongTuple2.f0 + ",value:" + longLongTuple2.f1)
                .print();

        env.execute("execute");
    }
}

运行后,结果如下:

3> key:1,value:5
3> key:1,value:7
3> key:1,value:2
4> key:2,value:3
4> key:2,value:4

可以看到,前面的 3> 和 4> 输出 本身是个分组,而且顺序是从先输出key=1的tuple数组,再输出key=2的数组。

也就是说,keyby类似于sql中的group by,将数据进行了分组。后面基于keyedSteam的操作,都是组内操作。

断点看了下keyedStream的结构:

keyedStream.png

可以看到,包含了keyType、keySelector,以及转换后的PartitionTransformation,也就是已经做了分区了。后续的所有操作都是按照分区内数据来处理的。

reduce

reduce表示将数据合并成一个新的数据,返回单个的结果值,并且 reduce 操作每处理一个元素总是创建一个新值。而且reduce方法不能直接应用于SingleOutputStreamOperator对象,也好理解,因为这个对象是个无限的流,对无限的数据做合并,没有任何意义哈!

所以reduce需要针对分组或者一个window(窗口)来执行,也就是分别对应于keyBy、window/timeWindow 处理后的数据,根据ReduceFunction将元素与上一个reduce后的结果合并,产出合并之后的结果。

在上面代码的基础上修改:

public class KeyByTestJob {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
        env.fromElements(Tuple2.of(2L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(2L, 4L), Tuple2.of(1L, 2L))
                .keyBy(0) // 以数组的第一个元素作为key
                .reduce((ReduceFunction<Tuple2<Long, Long>>) (t2, t1) -> new Tuple2<>(t1.f0, t2.f1 + t1.f1)) // value做累加
                .print();

        env.execute("execute");
    }
}
3> (1,5)
3> (1,12)
3> (1,14)
4> (2,3)
4> (2,7)

可以看到,分组后,每次有一个数组进来,都会产生新的数据,依然是按照分组来输出的。

如果改下reduce中的实现:

ReduceFunction<Tuple2<Long, Long>>) (t2, t1) -> new Tuple2<>(t1.f0 + t2.f0, t2.f1 + t1.f1)

那么输出就是:

2019-01-22 12:04:56.083 [Keyed Reduce -> Sink: Print to Std. Out (2/4)] INFO  org.apache.flink.runtime.taskmanager.Task - Keyed Reduce -> Sink: Print to Std. Out (2/4) (7117b0831e59cae2201e6f7097356214) switched from RUNNING to FINISHED.
2019-01-22 12:04:56.083 [Keyed Reduce -> Sink: Print to Std. Out (2/4)] INFO  org.apache.flink.runtime.taskmanager.Task - Freeing task resources for Keyed Reduce -> Sink: Print to Std. Out (2/4) (7117b0831e59cae2201e6f7097356214).
2019-01-22 12:04:56.083 [Keyed Reduce -> Sink: Print to Std. Out (2/4)] INFO  org.apache.flink.runtime.taskmanager.Task - Ensuring all FileSystem streams are closed for task Keyed Reduce -> Sink: Print to Std. Out (2/4) (7117b0831e59cae2201e6f7097356214) [FINISHED]
4> (2,3)
4> (4,7)

...

2019-01-22 12:04:56.118 [flink-akka.actor.default-dispatcher-4] INFO  o.a.flink.runtime.executiongraph.ExecutionGraph - Keyed Reduce -> Sink: Print to Std. Out (2/4) (7117b0831e59cae2201e6f7097356214) switched from RUNNING to FINISHED.
2019-01-22 12:04:56.122 [flink-akka.actor.default-dispatcher-4] INFO  o.a.flink.runtime.executiongraph.ExecutionGraph - Keyed Reduce -> Sink: Print to Std. Out (1/4) (0fdc49eb18050efa3acec361978f3e93) switched from RUNNING to FINISHED.
2019-01-22 12:04:56.125 [flink-akka.actor.default-dispatcher-4] INFO  o.a.flink.runtime.executiongraph.ExecutionGraph - Keyed Reduce -> Sink: Print to Std. Out (4/4) (1607b502ab2791f2f567c61da214bd82) switched from RUNNING to FINISHED.
3> (1,5)
3> (2,12)
3> (3,14)

可以看到输出结果,一方面是是key-reduce的状态,从RUNNING迁移到FINISHED;另一方面是按组输出了最终的reduce值。

聚合

KeyedStream→DataStream

在被Keys化数据流上滚动聚合。min和minBy之间的差异是min返回最小值,而minBy返回该字段中具有最小值的数据元(max和maxBy类似)。

---TODO 这里存疑,因为返回的数据始终是数据源,难道是我写错了什么?SingleOutputStreamOperator<Tuple2>改成SingleOutputStreamOperator<Long> 也是一样的结果,等待后续继续验证。

keyedStream.sum(0);
keyedStream.sum("key");
keyedStream.min(0);
keyedStream.min("key");
keyedStream.max(0);
keyedStream.max("key");
keyedStream.minBy(0);
keyedStream.minBy("key");
keyedStream.maxBy(0);
keyedStream.maxBy("key");

继续在上面代码的基础上做实验:

sum
public class KeyByTestJob {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
        KeyedStream keyedStream =  env.fromElements(Tuple2.of(2L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(2L, 4L), Tuple2.of(1L, 2L))
                .keyBy(0) // 以数组的第一个元素作为key
                ;

        SingleOutputStreamOperator<Tuple2> sumStream = keyedStream.sum(0);
        sumStream.addSink(new PrintSinkFunction<>());

        env.execute("execute");
    }

对第一个元素(位置0)做sum,结果如下:

3> (1,5)
3> (2,5)
3> (3,5)
...
4> (2,3)
2019-01-22 21:27:07.401 [flink-akka.actor.default-dispatcher-3] INFO  o.a.flink.runtime.executiongraph.ExecutionGraph - Source: Collection Source (1/1) (f3368fedb9805b1e59f4443252a2fb2b) switched from RUNNING to FINISHED.
4> (4,3)

可以看到,对第一个数据(也就是key)做了累加,然后value以第一个进来的数据为准。

如过改成keyedStream.sum(1); 也就是针对第二个元素求和,得到的结果如下:

4> (2,3)
4> (2,7)
...
3> (1,5)
3> (1,12)
2019-01-23 10:50:47.498 [flink-akka.actor.default-dispatcher-5] INFO  o.a.flink.runtime.executiongraph.ExecutionGraph - Source: Collection Source (1/1) (df09751c6722a5942b058a1300ae9fb3) switched from RUNNING to FINISHED.
3> (1,14)
min
SingleOutputStreamOperator<Tuple2> sumStream = keyedStream.min(1);

得到的输出结果是:

3> (1,5)  -- 第一组 第一个数据到的结果
3> (1,5)  -- 第一组 第二个数据到的结果
4> (2,3)  -- 第二组 第一个数据到的结果
4> (2,3)  -- 第二组 第二个数据到的结果
3> (1,2)  -- 第一组 第三个数据到的结果

这里顺序有点乱,不过没问题,数据按照顺序一个一个的过来,然后计算当前数据过来时有最小value的数据。

minBy
SingleOutputStreamOperator<Tuple2> sumStream = keyedStream.minBy(1);
3> (1,5)
3> (1,5)
4> (2,3)
3> (1,2)
4> (2,3)

类似的,只是组间打印的顺序有区别而已。

max
SingleOutputStreamOperator<Tuple2> sumStream = keyedStream.max(1);
3> (1,5)
4> (2,3)
3> (1,7)
4> (2,4)
3> (1,7)

按照顺序,取最大的数据

maxBy
SingleOutputStreamOperator<Tuple2> sumStream = keyedStream.maxBy(1);

3> (1,5)
4> (2,3)
3> (1,7)
4> (2,4)
3> (1,7)

有一点要牢记,数据是一直流过来的,这些聚合方法都是在每次收到新的数据之后,重新计算/比较得出来的结果,而不是只有一个最终结果。

PS:有人评论说哪儿来的f0、f1,只能这里贴个图了...


image.png
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,684评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,143评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,214评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,788评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,796评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,665评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,027评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,679评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,346评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,664评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,766评论 1 331
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,412评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,015评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,974评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,073评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,501评论 2 343

推荐阅读更多精彩内容