DataStream同时支持批处理和流处理。流模式用增量修改的方法处理无界流,批模式处理有界流或有固定输入的不会持续的流并将结果一次性输出。Flink保证流模式和批模式处理有界流时结果是一样的,但批模式在聚合策略,任务调度和失败恢复方面做了优化,效率更高。
When to use BATCH mode
有界即知道数据是否已经全部到达或是否还会有新数据出现。
批模式只能用在有界流。
流模式既能用于有界也能用于无界流。
配置批模式
DataStream默认采用流模式。可通过配置改为流模式:
- 代码配置
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH); - 命令行配置
$ bin/flink run -Dexecution.runtime-mode=BATCH examples/streaming/WordCount.jar
推荐使用命令行方式,因为代码与配置分离能提高灵活性
流模式与批模式执行的差异
任务调度和网络交换
Flink job包含不同的操作,系统决定各个操作执行在哪个TaskManager上以及他们之间的数据交换传输。一个或一组operator作为一个调度的整体称为task. 一个task可以分为多个相同的subtask在多个TaskManager上并发执行。
BATCH和STREAMING下任务调度和网络交换的处理是不同的。举个例子:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> source = env.fromElements(...);
source.name("source")
.map(...).name("map1")
.map(...).name("map2")
.rebalance()
.map(...).name("map3")
.map(...).name("map4")
.keyBy((value) -> value)
.map(...).name("map5")
.map(...).name("map6")
.sinkTo(...).name("sink");
像map, flatMap, filter这种两个operation之间是一对一连接模式的可以直接将数据转发给下一个operator, 也就可以chain成一个task,而无需网络交换。
像keyBy, rebalance这种两个并行实例之间需要互相交换数据的就需要网络传输。
所以本例中可以将operator组合成3个task,
- STREAMING
所有task同时工作, 网络交换是pipeline的,即所有记录立即发送到下游task。 - BATCH
task一个接一个分阶段工作,即一个task将一批数据都处理完之后将结果输入给下一个task。
State Backend
流模式下,Flink使用State Backend控制state存储和checkpointing。批模式下不需要存储state
处理顺序
流模式下,数据到达即被处理。
批模式下某些操作Flink需要保证顺序:
我们可以简单把输入分成三种类型:
- 广播输入
- 普通输入
- 分组输入
三种输入的处理顺序是:
先处理广播输入,再处理普通输入,最后处理分组输入。如果同时有多个相同广播输入或普通输入,则Flink决定其处理顺序。如果有多个分组输入,则先处理完其中一个分组,再处理下一个分组。
事件时间和水印
Stream需要处理event time和watermark, batch因为已知输入所以无需处理event time和watermark.
处理时间
基于处理时间计算的结果是不可复现的,因为同一记录处理两次的时间戳是不一样的。STREAMING需要处理事件时间和处理时间的关系,BATCH不需要。
失败恢复
STREAMING使用checkpoint用于失败恢复。BATCH直接重新执行失败的task。