一、基础信息
1.1 简介
分布式的大数据计算引擎。支持 无界数据流 和 无限数据流 进行有状态计算。
对于flink来说所有数据都是流数据,有界数据流其实是特殊状态的无界数据流
PS:这一点和spark正好相反,对于spark来说,流数据是特殊状态的批数据。
1.2 架构
[图片上传失败...(image-d0f54f-1705742431289)]
flink主要有四层架构:
-
分别为Depoly(flink部署):即flink部署层,flink部署目前有三种模式。
本地模式(Local)
单机模式(Standalone)
以及集群模式(Cluster)
-
Flink Runtime(包括flink批、流处理两套API)
DataSet
DataStream
-
flink Sql,CEP
Table API:Table API是用于流和批处理的统一关系API。Table API查询不是像SQL一样将字符串值指定为SQL,而是在Java或Scala中以嵌入语言的样式定义,并支持自动完成和语法验证等IDE支持.
Flink SQL:SQL查询是使用
sqlQuery()
方法指定的TableEnvironment
。该方法返回SQL查询的结果为Table
。可以转换为DataSet或DataStream,也可以写入TableSink。SQL和 Table API查询可以无缝混合,并进行整体优化并转换为单个程序。FlinkCEP - Flink的复杂事件处理。FlinkCEP是在Flink之上实现的复杂事件处理(CEP)库。它允许您在无休止的事件流中检测事件模式,让您有机会掌握数据中重要的事项。
以及最上层的机器学习
1.3 集群架构
1.3.1 任务提交过程
[图片上传失败...(image-62a873-1705742431288)]
1.3.2 运行架构
[图片上传失败...(image-17af03-1705742431288)]
运行时有两个主要的进程
-
JobManager:负责协调Flink程序的执行,包括:任务的调度、任务运行完成与失败的处理,协调检查点与恢复等,主要包括以下项职能
ResourceManager:负责资源分配,管理任务slot, 这个是flink集群资源管理的单位。
Dispatcher:提供应用程序提交的REST接口,对每一个提交的作业启动JobMaster,并运行Flink WebUI提供作业执行的信息。
JobMaster:负责单个作业图(JobGraph)的执行。一个集群可以同时运行多个作业,每个作业都有自己的JobMaster。
TaskManager
client不是Flink程序运行时和程序执行的一部分,它主要负责准备和提交dataFlow到JobManager, 并接收JobManager返回的程序执行结果。
1.3.3 任务和算子链
Flink将不同算子的子任务(subtask)链接到一个任务里,每一个任务在一个线程中执行
[图片上传失败...(image-d1a7b6-1705742431288)]
1.3.4 任务Slot和资源
每个worker(TaskManager)都是一个JVM进程,可以执行一个或多个子任务(subtask)。任务槽(task slot)就是为了控制一个worker能同时运行多少个任务的(至少一个)。
每个任务槽(task slot)代表TaskManager一个设定的资源子集。比如, 一个TaskManager有3个槽,会将其管理的1/3的内存分给每个槽位。将资源分成不同的槽位意味着一个子任务(subtask)不会跟其他作业的子任务竞争资源,而是会拥有一定量的保留资源。需要注意的是,这里不涉及CPU隔离,目前任务槽仅仅分割task管理的内存。
为了适配任务槽(task slot)的数量,用户可以定义子任务(subtask)是如何隔离的。如果每个TaskManager有一个槽,就意味着task组运行在不同的GJVM里。如果每个TaskManager有多个槽意味着多个字任务(subtask)共享同一个JVM。任务在同一个JVM运行可以共享TCP链接和心跳信息。它们可以共享数据集和数据结构,因此可以减少每个任务的开销。
[图片上传失败...(image-4444a4-1705742431288)]
二、DataStream API
Flink中的DataStream程序是实现数据流转换的常规程序(例如,Filter,更新状态,定义窗口,聚合)
2.1 数据源
2.1.1 基于文件
readTextFile(path)
-TextInputFormat
逐行读取文本文件,即符合规范的文件,并将它们作为字符串返回。readFile(fileInputFormat, path)
- 按指定的文件输入格式指定读取(一次)文件。readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)
- 这是前两个内部调用的方法。它path
根据给定的内容读取文件fileInputFormat
。根据提供的内容watchType
,此源可以定期监视(每interval
ms)新数据(FileProcessingMode.PROCESS_CONTINUOUSLY
)的路径,或者处理当前在路径中的数据并退出(FileProcessingMode.PROCESS_ONCE
)。使用该pathFilter
,用户可以进一步排除正在处理的文件。
notes:
如果
watchType
设置为FileProcessingMode.PROCESS_CONTINUOUSLY
,则在修改文件时,将完全重新处理其内容。这可以打破“完全一次”的语义,因为在文件末尾追加数据将导致其所有内容被重新处理。如果
watchType
设置为FileProcessingMode.PROCESS_ONCE
,则源扫描路径一次并退出,而不等待读者完成读取文件内容。当然读者将继续阅读,直到读取所有文件内容。在该点之后关闭源将导致不再有检查点。这可能会导致节点发生故障后恢复速度变慢,因为作业将从上一个检查点恢复读取。
2.1.2 socket接收字符串
-
socketTextStream
- 从套接字读取。数据元可以用分隔符分隔。
2.1.3 基于集合
fromCollection(Collection)
- 从Java Java.util.Collection创建数据流。集合中的所有数据元必须属于同一类型。fromCollection(Iterator, Class)
- 从迭代器创建数据流。该类指定迭代器返回的数据元的数据类型。fromElements(T ...)
- 从给定的对象序列创建数据流。所有对象必须属于同一类型。fromParallelCollection(SplittableIterator, Class)
- 并行地从迭代器创建数据流。该类指定迭代器返回的数据元的数据类型。generateSequence(from, to)
- 并行生成给定间隔中的数字序列。
2.1.4 自定义
addSource
- 附加新的源函数。例如,要从Apache Kafka中读取,您可以使用addSource(new FlinkKafkaConsumer08<>(...))
2.2 操作
2.2.1 迭代
IterativeStream<Integer> iteration = input.iterate();
iteration.closeWith(iterationBody.filter(/* one part of the stream */));
DataStream<Integer> output = iterationBody.filter(/* some other part of the stream */);
要关闭迭代并定义迭代尾部,请调用closeWith(feedbackStream)
方法IterativeStream
。赋予closeWith
函数的DataStream 将反馈给迭代头。常见的模式是使用过滤器来分离反馈的流的部分和向前传播的流的部分。这些滤波器可以例如定义“终止”逻辑,其中允许元件向下游传播而不是反馈。
2.2.2 执行参数
该StreamExecutionEnvironment
包含ExecutionConfig
允许为运行时设置工作的具体配置值。
- 容错
State&Checkpointing
描述了如何启用和配置Flink的检查点机制。
- 控制延迟
默认情况下,数据元不会逐个传输到网络上(这会导致不必要的网络流量),但会被缓冲。可以在Flink配置文件中设置缓冲区的大小(实际在计算机之间传输).虽然此方法适用于优化吞吐量,但当传入流速度不够快时,可能会导致延迟问题。要控制吞吐量和延迟,您可以env.setBufferTimeout(timeoutMillis)
在运行环境(或单个 算子)上使用以设置缓冲区填充的最长等待时间。在此之后,即使缓冲区未满,也会自动发送缓冲区。此超时的默认值为100毫秒。为了最大化吞吐量,设置setBufferTimeout(-1)
哪个将删除超时和缓冲区只有在它们已满时才会被刷新。要最小化延迟,请将超时设置为接近0的值(例如5或10 ms)。应避免缓冲区超时为0,因为它可能导致严重的性能下降。
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setBufferTimeout(timeoutMillis);
env.generateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);
2.2.3 调试
- 本地运行环境
LocalStreamEnvironment
在创建它的同一JVM进程中启动Flink系统。如果从IDE启动LocalEnvironment,则可以在代码中设置断点并轻松调试程序。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
DataStream<String> lines = env.addSource(/* some source */);
// build your program
env.execute();
- 收集数据源
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
DataStream<Integer> myInts = env.fromElements(1, 2, 3, 4, 5);
List<Tuple2<String, Integer>> data = ...
DataStream<Tuple2<String, Integer>> myTuples = env.fromCollection(data);
Iterator<Long> longIt = ...
DataStream<Long> myLongs = env.fromCollection(longIt, Long.class);
- 迭代器数据接收器
DataStreamUtils
DataStream<Tuple2<String, Integer>> myResult = ...
Iterator<Tuple2<String, Integer>> myOutput = DataStreamUtils.collect(myResult)
2.3 活动时间
2.3.1 时间概念
处理时间:处理时间是指执行相应 算子操作的机器的系统时间
时间时间:事件时间是每个事件在其生产设备上发生的时间。此时间通常在进入Flink之前嵌入记录中,并且 可以从每个记录中提取该事件时间戳。
提取时间:摄取时间是事件进入Flink的时间。
设定时间特征
KeyedStream.timeWindow(Time.seconds(30))
2.3.2 Event Time和WaterMark
Source传入的数据大多数情况下大都是有序的,但是避免不了因为网络等原因造成数据的乱序发送,无法知道乱序的数据到底会滞后多长时间,需要有一种机制来保证在指定的时间后,必须触发窗口进行计算,这个机制就是水印(Watermark)
顺序数据流中的watermark
数据流有序的情况,并不能很好的发挥watermark的作用,反而会增加应用的延迟
乱序数据流中的watermark
只有当watermark大于窗口结束时间时才会进行窗口操作,watermark一般都是根据event time计算的。
并行数据流中的watermark
对应并行度大于1的source task,它每个独立的subtask都会生成各自的watermark。这些watermark会随着流数据一起分发到下游算子,并覆盖掉之前的watermark。当有多个watermark同时到达下游算子的时候,flink会选择较小的watermark进行更新。当一个task的watermark大于窗口结束时间时,就会立马触发窗口操作。
[图片上传失败...(image-eef4a5-1705742431288)]
2.3.3 生成时间戳/水印
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//Watermark自动更新间隔
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//或手动指定Watermark自动更新间隔
env.getConfig().setAutoWatermarkInterval(300L);
setStreamTimeCharacteristic()方法实现源码:
public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) {
this.timeCharacteristic = Preconditions.checkNotNull(characteristic);
if (characteristic == TimeCharacteristic.ProcessingTime) {
getConfig().setAutoWatermarkInterval(0);
} else {
getConfig().setAutoWatermarkInterval(200); //设置时间类型为EventTime时,会默认设置自动Watermark更新间隔为200毫秒
}
}
分配时间戳
直接在数据流源中
-
通过时间戳分配器/水印生成器:在Flink中,时间戳分配器还定义要发出的水印
notes:
注意自1970-01-01T00:00:00Z的Java纪元以来,时间戳和水印都指定为毫秒。
带时间戳/水印的源函数
流源可以直接为它们生成的数据元分配时间戳,也可以发出水印。完成此 算子操作后,不需要时间戳分配器。请注意,如果使用时间戳分配器,则源将提供的任何时间戳和水印都将被覆盖。
要直接为源中的数据元分配时间戳,源必须使用该collectWithTimestamp(...)
方法SourceContext
。要生成水印,源必须调用该emitWatermark(Watermark)
函数。
@Override
public void run(SourceContext<MyType> ctx) throws Exception {
while (/* condition */) {
MyType next = getNext();
ctx.collectWithTimestamp(next, next.getEventTimestamp());
if (next.hasWatermarkTime()) {
ctx.emitWatermark(new Watermark(next.getWatermarkTime()));
}
}
}
时间戳分配器/水印生成器
如果我们使用的是Flink自带的外部数据源,那我们就不可以通过SourceFunction来生成数据的timestamp和watermark。
-
Periodic Watermarks 根据设定的时间间隔周期性地生成watermark,通过AssignerWithPeriodicWatermarks接口定义
s1.assignTimestampsAndWatermarks(new PeriodicWatermarkAssignerWrapper())
-
Punctuated Watermarks 根据特定的数据生成watermark,通过AssignerWithPunctuatedWatermarks接口定义。
s1.assignTimestampsAndWatermarks(new PunctuatedWatermarkAssignerWrapper())
每个Kafka分区的时间戳
当使用Apache Kafka作为数据源时,每个Kafka分区可能具有简单的事件时间模式(升序时间戳或有界无序)。但是,当从Kafka消费流时,多个分区通常并行消耗,交错来自分区的事件并破坏每个分区模式(这是Kafka的消费者客户端工作的固有方式)。
2.3.4 预定义的时间戳提取器/水印发射器
Flink提供抽象,允许程序员分配他们自己的时间戳并发出他们自己的水印。更具体地说,可以通过实现其中一个AssignerWithPeriodicWatermarks
和AssignerWithPunctuatedWatermarks
接口来实现,具体取决于用例
为了进一步简化此类任务的编程工作,Flink附带了一些预先实现的时间戳分配器。本节提供了它们的列表。除了开箱即用的函数外,它们的实现还可以作为自定义实现的示例
具有递增时间戳的分发者
定期水印生成的最简单的特殊情况是给定源任务看到的时间戳按升序发生的情况。
DataStream<MyEvent> stream = ...
DataStream<MyEvent> withTimestampsAndWatermarks =
stream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<MyEvent>() {
@Override
public long extractAscendingTimestamp(MyEvent element) {
return element.getCreationTime();
}
});
允许固定数量的迟到的分配者
定期水印生成的另一个例子是当水印滞后于在流中看到的最大(事件 - 时间)时间戳一段固定的时间。
DataStream<MyEvent> stream = ...
DataStream<MyEvent> withTimestampsAndWatermarks =
stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<MyEvent>(Time.seconds(10)) {
@Override
public long extractTimestamp(MyEvent element) {
return element.getCreationTime();
}
});
2.4 状态与容错
有状态函数和 算子在各个数据元/事件的处理中存储数据,使状态成为任何类型的更复杂 算子操作的关键构建块。
2.4.1 广播状态模式
使用State描述 算子状态,其在恢复时均匀分布在 算子的并行任务中,或者联合,整个状态用于初始化已恢复的并行任务。
- 它有一个Map格式,
- 它仅适用于具有广播流和非广播流的输入的特定算子
- 这样的算子可以具有不同名称的多个广播状态
被Keys化或非被Keys化
- 如果Keys化,则函数为
KeyedBroadcastProcessFunction
。 - 如果它是被非Keys化的,则函数是
BroadcastProcessFunction
。
这些函数有两种实现方法;
public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends BaseBroadcastProcessFunction {
public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception;
public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
}
public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> {
public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception;
public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception;
}
processBroadcastElement()
用于处理广播侧
processElement()
数据元和非广播侧数据元的方法
onTimer()
当使用TimerService设置的计时器触发时调用
ReadOnlyContext
只读访问权
Context
读写访问权限
这两个上下文(ctx
在下面的枚举中):
- 允许访问广播状态:
ctx.getBroadcastState(MapStateDescriptor<K,V> stateDescriptor)
- 允许查询数据元的时间戳:
ctx.timestamp()
, - 获取当前水印:
ctx.currentWatermark()
- 得到当前的处理时间:
ctx.currentProcessingTime()
和 - 向旁路输出发射数据元:
ctx.output(OutputTag<X> outputTag, X value)
。
2.4.2 状态运行
被Keys化状态和算子状态
Flink有两种基本的状态
被Keys化状态(
Keyed State
)算子状态(
Operator State
)
原始和管理状态
被Keys化状态和算子状态有两种形式
托管状态。托管状态由Flink运行时控制的数据结构表示,例如内部哈希表或RocksDB
原始状态。原始状态是算子保存在自己的数据结构中的状态。检查点时,它们只会将一个字节序列写入检查点。Flink对状态的数据结构一无所知,只看到原始字节。
所有数据流函数都可以使用托管状态,但原始状态接口只能在实现 算子时使用。建议使用托管状态(而不是原始状态),因为在托管状态下,Flink能够在并行性更改时自动重新分配状态,并且还可以进行更好的内存管理。
使用托管算子状态
要使用托管算子状态,有状态函数可以实现更通用的CheckpointedFunction
接口或ListCheckpointed<T extends Serializable>
接口。
CheckpointedFunction
void snapshotState(FunctionSnapshotContext context) throws Exception;
void initializeState(FunctionInitializationContext context) throws Exception;
ListCheckpointed
该ListCheckpointed
接口是比较有限的变体CheckpointedFunction
,它仅支持与恢复甚至分裂的再分配方案列表式的状态。它还需要实现两种方法:
List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
void restoreState(List<T> state) throws Exception;
使用托管被Keys化状态
用于 KeyedStream
,可以通过创建stream.keyBy(…)
ValueState<T>
ListState<T>
ReducingState<T>
AggregatingState<IN, OUT>
FoldingState<T, ACC>
MapState<UK, UV>
有状态源函数
使状态和输出集合的更新成为原子(在故障/恢复时精确一次的语义所需),用户需要从源的上下文中获取锁定。
2.4.3 检查点
为了使状态容错,Flink需要检查状态。检查点允许Flink-recovery流中的状态和位置,从而为应用程序提供与无故障执行相同的语义。
先决条件
持久化的数据源。持久消息队列(例如,Apache Kafka,RabbitMQ,Amazon Kinesis,Google PubSub)或文件系统(例如,HDFS,S3,GFS,NFS,Ceph,……)
状态的持久存储,通常是分布式文件系统(例如,HDFS,S3,GFS,NFS,Ceph ……)
启用和配置检查点
默认情况下,禁用检查点。开启,调用enableCheckpointing(n)
上StreamExecutionEnvironment
,其中 N 是以毫秒为单位的检查点间隔。
其它参数:
// start a checkpoint every 1000 ms
env.enableCheckpointing(1000);
// advanced options:
// set mode to exactly-once (this is the default)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// make sure 500 ms of progress happen between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig().setCheckpointTimeout(60000);
// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
默认情况下,检查点不会保存,仅用于从失败中恢复作业。取消程序时会删除它们。但是,您可以配置要保存的定期检查点。根据配置 ,当作业失败或取消时,不会自动清除这些保存的检查点。这样,如果您的工作失败,您将有一个检查点可以从中恢复。
// enable externalized checkpoints which are retained after job cancellation
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
更多参数和/或默认值conf/flink-conf.yaml
state.backend:
state.backend.async:
state.backend.fs.memory-threshold:
state.backend.incremental:
state.backend.local-recovery:
state.checkpoints.dir:
state.checkpoints.num-retained:
state.savepoints.dir:
taskmanager.state.local.root-dirs:
选择状态后台
Flink的检查点机制存储定时器和有状态算子中所有状态的一致SNAPSHOT,包括连接器,窗口和任何用户定义的状态。存储检查点的位置(例如,JobManager内存,文件系统,数据库)取决于配置的 状态后台。
默认情况下,状态保存在TaskManagers的内存中,检查点存储在JobManager的内存中。为了适当持久化大状态,Flink支持在其他状态后台中存储和检查点状态的各种方法。可以通过配置状态后台的选择
env.setStateBackend(new RocksDBStateBackend("hdfs:///checkpoints-data/");
迭代作业中的状态检查点
Flink目前仅为没有迭代的作业提供处理保证。在迭代作业上启用检查点会导致异常。为了强制对迭代程序进行检查点,用户在启用检查点时需要设置一个特殊标志:
env.enableCheckpointing(interval, force = true)
2.4.4 保存点
保存点是外部存储的自包含检查点,可用于停止和恢复或更新Flink程序
分配算子ID
更改是通过该uid(String)
方法手动指定算子ID
DataStream<String> stream = env.
// Stateful source (e.g. Kafka) with ID
.addSource(new StatefulSource())
.uid("source-id") // ID for the source operator
.shuffle()
// Stateful mapper with ID
.map(new StatefulMapper())
.uid("mapper-id") // ID for the mapper
// Stateless printing sink
.print(); // Auto-generated ID
如果您未手动指定ID,则会自动生成这些ID。只要这些ID不变,您就可以从保存点自动恢复。生成的ID取决于程序的结构,并且对程序更改很敏感。因此,强烈建议手动分配这些ID。
保存点状态
算子操作
-
触发保存点触发保存点时,会创建一个新的保存点目录,其中将存储数据和元数据。注意 目标目录必须是JobManager(s)和TaskManager(例如分布式文件系统上的位置)可访问的位置。
触发保存点
# 触发保存点 $ bin/flink savepoint :jobId [:targetDirectory] # 使用YARN触发保存点 $ bin/flink savepoint :jobId [:targetDirectory] -yid :yarnAppId #使用Savepoint取消作业 $ bin/flink cancel -s [:targetDirectory] :jobId # 从Savepoints恢复 $ bin/flink run -s :savepointPath [:runArgs]
从Savepoints恢复
# 这将提交作业并指定要从中恢复的保存点。您可以指定保存点目录或_metadata文件的路径。 $ bin/flink run -s :savepointPath [:runArgs] # 允许未恢复状态 $ bin/flink run -s :savepointPath -n [:runArgs]
处理保存点
$ bin/flink savepoint -d :savepointPath
配置
state.savepoints.dir: hdfs:///flink/savepoints
2.4.5 状态后台
用Data Stream API
编写的程序通常以各种形式保存状态:
- Windows会在触发数据元或聚合之前收集数据元或聚合
- 转换函数可以使用键/值状态接口来存储值
- 转换函数可以实现
CheckpointedFunction
接口以使其局部变量具有容错能力
可用的状态后台
开箱即用,Flink捆绑了这些状态后台:
- MemoryStateBackend
- FsStateBackend
- RocksDBStateBackend
如果没有配置其他任何内容,系统将使用MemoryStateBackend。
MemoryStateBackend
该MemoryStateBackend保存数据在内部作为Java堆的对象。键/值状态和窗口 算子包含存储值,触发器等的哈希表。
在检查点时,此状态后台将对状态进行SNAPSHOT,并将其作为检查点确认消息的一部分发送到JobManager(主服务器),JobManager也将其存储在其堆上。
以将MemoryStateBackend配置为使用异步SNAPSHOT。虽然我们强烈建议使用异步SNAPSHOT来避免阻塞管道,但请注意,默认情况下,此函数目前处于启用状态。要禁用此函数,用户可以MemoryStateBackend
在构造函数中将相应的布尔标志实例化为false
(这应该仅用于调试),例如:
new MemoryStateBackend(MAX_MEM_STATE_SIZE, false);
MemoryStateBackend的局限性:
- 默认情况下,每个状态的大小限制为5 MB。可以在MemoryStateBackend的构造函数中增加此值。
- 无论配置的最大状态大小如何,状态都不能大于akka帧大小(请参阅配置)。
- 聚合状态必须适合JobManager内存。
鼓励MemoryStateBackend用于:
- 本地开发和调试
- 几乎没有状态的作业,例如仅包含一次记录函数的作业(Map,FlatMap,Filter,…)。Kafka消费者需要很少的状态。
FsStateBackend
所述FsStateBackend配置有文件系统URL(类型,地址,路径),如“HDFS://名称节点:40010 /Flink/检查点”或“文件:///数据/Flink/检查点”。
FsStateBackend将正在运行的数据保存在TaskManager的内存中。在检查点时,它将状态SNAPSHOT写入配置的文件系统和目录中的文件。最小元数据存储在JobManager的内存中(或者,在高可用性模式下,存储在元数据检查点中)。
FsStateBackend 默认使用异步SNAPSHOT,以避免在编写状态检查点时阻塞处理管道。要禁用此函数,用户可以FsStateBackend
在构造函数集中使用相应的布尔标志来实例化false
,例如:
new FsStateBackend(path, false);
鼓励FsStateBackend:
- 具有大状态,长窗口,大键/值状态的作业。
- 所有高可用性设置。
RocksDBStateBackend
所述RocksDBStateBackend配置有文件系统URL(类型,地址,路径),如“HDFS://名称节点:40010 /Flink/检查点”或“文件:///数据/Flink/检查点”。
RocksDBStateBackend将RocksDB
数据库中的飞行中数据保存在(默认情况下)存储在TaskManager数据目录中。在检查点时,整个RocksDB数据库将被检查点到配置的文件系统和目录中。最小元数据存储在JobManager的内存中(或者,在高可用性模式下,存储在元数据检查点中)。
RocksDBStateBackend始终执行异步SNAPSHOT。
RocksDBStateBackend的局限性:
- 由于RocksDB的JNI桥接API基于byte [],因此每个Keys和每个值的最大支持大小为2 ^ 31个字节。重要提示:在RocksDB中使用合并 算子操作的状态(例如ListState)可以静默地累积> 2 ^ 31字节的值大小,然后在下次检索时失败。这是目前RocksDB JNI的一个限制。
我们鼓励RocksDBStateBackend:
- 具有非常大的状态,长窗口,大键/值状态的作业。
- 所有高可用性设置。
- RocksDBStateBackend是目前唯一提供增量检查点的后台
配置状态后台
如果您不指定任何内容,则默认状态后台是JobManager。如果要为群集上的所有作业建立不同的默认值,可以通过在flink-conf.yaml中定义新的默认状态后台来实现
-
设置每个作业状态后台
每个作业状态后台
StreamExecutionEnvironment
在作业上设置StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));
-
设置默认状态后台
可以在
flink-conf.yaml
使用配置Keys在配置中配置默认状态后台state.backend
config条目的可能值包括jobmanager(MemoryStateBackend),filesystem(FsStateBackend),rocksdb
该
state.checkpoints.dir
选项定义所有后台写入检查点数据和元数据文件的目录。# The backend that will be used to store operator state checkpoints state.backend: filesystem state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints
2.4.6 可查询状态Beta
此函数将Flink的托管Keys(分区)状态暴露给外部世界,并允许用户从Flink外部查询作业的状态。
notes: 不稳定,还在迭代。
QueryableStateClient
QueryableStateClientProxy
QueryableStateServer
2.4.7 管理状态的自定义序列化
使用Flink的托管状态,则可能需要为特殊用例实现自定义序列化逻辑。
使用自定义序列化器
public class CustomTypeSerializer extends TypeSerializer<Tuple2<String, Integer>> {...};
ListStateDescriptor<Tuple2<String, Integer>> descriptor =
new ListStateDescriptor<>(
"state-name",
new CustomTypeSerializer());
checkpointedState = getRuntimeContext().getListState(descriptor);
2.5 算子
2.5.1 DataStream转换
Map DataStream→DataStream
FlatMap DataStream→DataStream
Filter DataStream→DataStream
KeyBy DataStream→KeyedStream
Reduce KeyedStream→DataStream
Flod KeyedStream→DataStream
聚合 KeyedStream→DataStream
Window KeyedStream→WindowedStream
WindowAll DataStream→AllWindowedStream
Window Apply WindowedStream→DataStream AllWindowedStream→DataStream
Window Reduce WindowedStream→DataStream
Window Fold WindowedStream→DataStream
Windows上的聚合 WindowedStream→DataStream
Union DataStream *→DataStream
Window Join DataStream,DataStream→DataStream
Interval Join KeyedStream,KeyedStream→DataStream
Window CoGroup DataStream,DataStream→DataStream
连接 DataStream,DataStream→ConnectedStreams
-
CoMap,CoFlatMap ConnectedStreams→DataStream。类似于连接数据流上的map和flatMap。
connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() { @Override public Boolean map1(Integer value) { return true; } @Override public Boolean map2(String value) { return false; } }); connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() { @Override public void flatMap1(Integer value, Collector<String> out) { out.collect(value.toString()); } @Override public void flatMap2(String value, Collector<String> out) { for (String word: value.split(" ")) { out.collect(word); } } });
拆分 DataStream→SplitStream选择 SplitStream→DataStream迭代 DataStream→IterativeStream→DataStream
-
提取时间戳 DataStream→DataStream
stream.assignTimestamps (new TimeStampExtractor() {...});
2.5.2 物理分区
函数对转换后的精确流分区进行低级控制
-
自定义分区 DataStream→DataStream
dataStream.partitionCustom(partitioner, "someKey"); dataStream.partitionCustom(partitioner, 0);
-
随机分区 DataStream→DataStream
dataStream.shuffle();
-
Rebalance (循环分区) DataStream→DataStream
dataStream.rebalance();
Rescaling DataStream → DataStream
Broadcasting DataStream → DataStream。Flink DataStream基于Broadcast State动态更新配置以实现实时过滤数据并增加字段_flink 如何对流中的已有数据新增字段-CSDN博客
2.5.3 任务链和资源组
开始新的链条
someStream.filter(...).map(...).startNewChain().map(...);
禁用链接
someStream.map(...).disableChaining();
设置插槽共享组
someStream.filter(...).slotSharingGroup("name");
2.5.4 视窗
Windows是处理无限流的核心。Windows将流拆分为有限大小的“桶”,我们可以在其上应用计算。
被Keys化Windows
stream
.keyBy(...) <- keyed versus non-keyed windows
.window(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/fold/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"
非被Keys化Windows
stream
.windowAll(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/fold/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"
窗口分配器
-
翻滚的Windows:翻滚窗口分配器的每个数据元分配给指定的窗口的窗口大小。翻滚窗具有固定的尺寸,不重叠。例如,如果指定大小为5分钟的翻滚窗口,则将评估当前窗口,并且每五分钟将启动一个新窗口。
DataStream<T> input = ...; // tumbling event-time windows input .keyBy(<key selector>) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .<windowed transformation>(<window function>); // tumbling processing-time windows input .keyBy(<key selector>) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .<windowed transformation>(<window function>); // daily tumbling event-time windows offset by -8 hours. input .keyBy(<key selector>) .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))) .<windowed transformation>(<window function>);
notes:在中国,您必须指定偏移量
Time.hours(-8)
-
滑动窗口:配器分配元件以固定长度的窗口。如果幻灯片小于窗口大小,则滑动窗口可以重叠。在这种情况下,数据元被分配给多个窗口。
DataStream<T> input = ...; // sliding event-time windows input .keyBy(<key selector>) .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))) .<windowed transformation>(<window function>); // sliding processing-time windows input .keyBy(<key selector>) .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))) .<windowed transformation>(<window function>); // sliding processing-time windows offset by -8 hours input .keyBy(<key selector>) .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8))) .<windowed transformation>(<window function>);
-
会话窗口: 按活动会话分配器组中的数据元。会话窗口不重叠并且没有固定的开始和结束时间。相反,当会话窗口在一段时间内没有接收到数据元时,即当发生不活动的间隙时,会关闭会话窗口。
DataStream<T> input = ...; // event-time session windows with static gap input .keyBy(<key selector>) .window(EventTimeSessionWindows.withGap(Time.minutes(10))) .<windowed transformation>(<window function>); // event-time session windows with dynamic gap input .keyBy(<key selector>) .window(EventTimeSessionWindows.withDynamicGap((element) -> { // determine and return session gap })) .<windowed transformation>(<window function>); // processing-time session windows with static gap input .keyBy(<key selector>) .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10))) .<windowed transformation>(<window function>); // processing-time session windows with dynamic gap input .keyBy(<key selector>) .window(ProcessingTimeSessionWindows.withDynamicGap((element) -> { // determine and return session gap })) .<windowed transformation>(<window function>);
-
全局Windows:全局性的窗口分配器分配使用相同的Keys相同的单个的所有数据元全局窗
DataStream<T> input = ...; input .keyBy(<key selector>) .window(GlobalWindows.create()) .<windowed transformation>(<window function>);
窗口函数
ReduceFunction
聚合函数
FoldFunction
或其他可合并窗口一起使用。
ProcessWindowFunction
ProcessWindowFunction with Incremental Aggregation
在ProcessWindowFunction中使用每窗口状态
WindowFunction(留存)
触发器
确定何时窗口函数准备好处理窗口
-
onElement()
为添加到窗口的每个数据元调用该方法。 -
onEventTime()
在注册的事件时间计时器触发时调用该方法。 -
onProcessingTime()
在注册的处理时间计时器触发时调用该方法。 - 该
onMerge()
方法与状态触发器相关,并且当它们的相应窗口合并时合并两个触发器的状态,例如当使用会话窗口时。 - 最后,该
clear()
方法在移除相应窗口时执行所需的任何动作。
Flink附带了一些内置触发器。
-
EventTimeTrigger
基于水印测量的事件时间的进展而触发。 - 在
ProcessingTimeTrigger
基于处理时间的火灾。 -
CountTrigger
一旦窗口中的数据元数量超过给定限制,就会触发。 - 将
PurgingTrigger
另一个触发器作为参数作为参数并将其转换为清除触发器。
逐出器
逐出器必须从一个窗口中删除数据元之前和或之后被施加的窗口函数。
/**
* Optionally evicts elements. Called before windowing function.
*
* @param elements The elements currently in the pane.
* @param size The current number of elements in the pane.
* @param window The {@link Window}
* @param evictorContext The context for the Evictor
*/
void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
/**
* Optionally evicts elements. Called after windowing function.
*
* @param elements The elements currently in the pane.
* @param size The current number of elements in the pane.
* @param window The {@link Window}
* @param evictorContext The context for the Evictor
*/
void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
Flink带有预先实施的驱逐者
-
CountEvictor
:保持窗口中用户指定数量的数据元,并从窗口缓冲区的开头丢弃剩余的数据元。 -
DeltaEvictor
:取DeltaFunction
和athreshold
,计算窗口缓冲区中最后一个数据元与其余每个数据元之间的差值,并删除delta大于或等于阈值的值。 -
TimeEvictor
:以interval
毫秒为单位作为参数,对于给定窗口,它查找max_ts
其数据元中的最大时间戳,并删除时间戳小于的所有数据元max_ts - interval
。
允许迟到
使用事件时间窗口时,可能会发生数据元迟到的情况。默认默认情况下,允许的延迟设置为 0
。也就是说,到达水印后面的数据元将被丢弃。
注意使用GlobalWindows
窗口分配器时,由于全局窗口的结束时间戳为,因此没有数据被认为是迟到的Long.MAX_VALUE
-
将后期数据作为副输出
final OutputTag<T> lateOutputTag = new OutputTag<T>("late-data"){}; DataStream<T> input = ...; SingleOutputStreamOperator<T> result = input .keyBy(<key selector>) .window(<window assigner>) .allowedLateness(<time>) .sideOutputLateData(lateOutputTag) .<windowed transformation>(<window function>); DataStream<T> lateStream = result.getSideOutput(lateOutputTag);
迟到数据元考虑因素
使用窗口结果
-
水印和窗口的互动。当水印到达窗口 算子时,会触发两件事:
水印触发所有窗口的计算,其中最大时间戳(即 结束时间戳-1)小于新水印
水印被转发(按原样)到下游 算子操作
连续窗口算子操作。
有用的状态规模考虑因素(windows可优化项)
2.5.5 Join
窗口关联
窗口连接连接两个共享公共Keys并位于同一窗口中的流的数据元。然后将来自双方的数据元传递给用户定义的,JoinFunction
或者FlatJoinFunction
用户可以发出满足连接条件的结果。
-
翻滚窗口关联。行为类似于内连接
DataStream<Integer> orangeStream = ... DataStream<Integer> greenStream = ... orangeStream.join(greenStream) .where(<KeySelector>) .equalTo(<KeySelector>) .window(TumblingEventTimeWindows.of(Time.seconds(2))) .apply (new JoinFunction<Integer, Integer, String> (){ @Override public String join(Integer first, Integer second) { return first + "," + second; } });
滑动窗口关联
会话窗口关联
间隔关联
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
...
DataStream<Integer> orangeStream = ...
DataStream<Integer> greenStream = ...
orangeStream
.keyBy(<KeySelector>)
.intervalJoin(greenStream.keyBy(<KeySelector>))
.between(Time.milliseconds(-2), Time.milliseconds(1))
.process (new ProcessJoinFunction<Integer, Integer, String(){
@Override
public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {
out.collect(first + "," + second);
}
});
2.5.6 过程函数(低级 算子操作)
ProcessFunction
是一个低级流处理 算子操作,可以访问所有(非循环)流应用程序的基本构建块
- 事件(流数据元)
- state(容错,一致,仅在被Key化的数据流上)
- 定时器(事件时间和处理时间,仅限被Key化的数据流)
stream.keyBy(...).process(new MyProcessFunction())
低级联接
两个输入上实现低级算子操作,应用程序可以使用CoProcessFunction
s1.connect(s2).process(new CoProcessFunction())
KeyedProcessFunction
KeyedProcessFunction
作为其扩展ProcessFunction
,可以在其onTimer(...)
方法中访问计时器的键。
计时器
两种类型的计时器(处理时间和事件时间)都由内部维护TimerService
并排队执行。
容错
定时器合并
2.5.7 外部数据访问的异步I / O
假设有一个目标数据库的异步客户端,则需要三个部分来实现对数据库的异步I / O流转换:
-
AsyncFunction
实现 - 一个回调,它接受 算子操作的结果并将其交给
ResultFuture
- 在DataStream上应用异步I / O 算子操作作为转换
// This example implements the asynchronous request and callback with Futures that have the
// interface of Java 8's futures (which is the same one followed by Flink's Future)
/**
* An implementation of the 'AsyncFunction' that sends requests and sets the callback.
*/
class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, String>> {
/** The database specific client that can issue concurrent requests with callbacks */
private transient DatabaseClient client;
@Override
public void open(Configuration parameters) throws Exception {
client = new DatabaseClient(host, post, credentials);
}
@Override
public void close() throws Exception {
client.close();
}
@Override
public void asyncInvoke(String key, final ResultFuture<Tuple2<String, String>> resultFuture) throws Exception {
// issue the asynchronous request, receive a future for result
final Future<String> result = client.query(key);
// set the callback to be executed once the request by the client is complete
// the callback simply forwards the result to the result future
CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
try {
return result.get();
} catch (InterruptedException | ExecutionException e) {
// Normally handled explicitly.
return null;
}
}
}).thenAccept( (String dbResult) -> {
resultFuture.complete(Collections.singleton(new Tuple2<>(key, dbResult)));
});
}
}
// create the original stream
DataStream<String> stream = ...;
// apply the async I/O transformation
DataStream<Tuple2<String, String>> resultStream =
AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100);
超时处理
结果顺序
活动时间
容错保证
实施技巧
2.6 流连接器(Streaming Connectors)
2.7 旁路输出
除了DataStream
算子操作产生的主流之外,您还可以生成任意数量的附加旁路输出结果流。结果流中的数据类型不必与主流中的数据类型匹配,并且不同旁路输出的类型也可以不同。
// this needs to be an anonymous inner class, so that we can analyze the type
OutputTag<String> outputTag = new OutputTag<String>("side-output") {};
可以通过以下函数将数据发送到旁路输出:
ProcessFunction
CoProcessFunction
ProcessWindowFunction
ProcessAllWindowFunction
使用Context
在上述函数中向用户公开的参数将数据发送到由a标识的旁路输出OutputTag
。
DataStream<Integer> input = ...;
final OutputTag<String> outputTag = new OutputTag<String>("side-output"){};
SingleOutputStreamOperator<Integer> mainDataStream = input
.process(new ProcessFunction<Integer, Integer>() {
@Override
public void processElement(
Integer value,
Context ctx,
Collector<Integer> out) throws Exception {
// emit data to regular output
out.collect(value);
// emit data to side output
ctx.output(outputTag, "sideout-" + String.valueOf(value));
}
});
用getSideOutput(OutputTag)
检索在DataStream
算子操作结果上使用的旁路输出流。
final OutputTag<String> outputTag = new OutputTag<String>("side-output"){};
SingleOutputStreamOperator<Integer> mainDataStream = ...;
DataStream<String> sideOutputStream = mainDataStream.getSideOutput(outputTag);
2.8 实验特性
2.8.1 将预分区数据流重新解释为被Key化的数据流
三、Table API和SQL
3.1 概念和通用API
Table API和SQL集成在一个联合API中。此API的核心概念是Table
用作查询的输入和输出。
3.1.1 Table API和SQL程序的结构
// for batch programs use ExecutionEnvironment instead of StreamExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// create a TableEnvironment
// for batch programs use BatchTableEnvironment instead of StreamTableEnvironment
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// register a Table
tableEnv.registerTable("table1", ...) // or
tableEnv.registerTableSource("table2", ...); // or
tableEnv.registerExternalCatalog("extCat", ...);
// create a Table from a Table API query
Table tapiResult = tableEnv.scan("table1").select(...);
// create a Table from a SQL query
Table sqlResult = tableEnv.sqlQuery("SELECT ... FROM table2 ... ");
// emit a Table API result Table to a TableSink, same for SQL result
tapiResult.writeToSink(...);
// execute
env.execute();
3.1.2 创建一个TableEnvironment
TableEnvironment
是 Table API和SQL集成的核心概念
-
Table
在内部目录中注册 - 注册外部目录
- 执行SQL查询
- 注册用户定义的(标量,表或聚合)函数
- 转换
DataStream
或DataSet
转换为aTable
- 持有对
ExecutionEnvironment
或的引用StreamExecutionEnvironment
// ***************
// STREAMING QUERY
// ***************
StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
// create a TableEnvironment for streaming queries
StreamTableEnvironment sTableEnv = TableEnvironment.getTableEnvironment(sEnv);
// ***********
// BATCH QUERY
// ***********
ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment();
// create a TableEnvironment for batch queries
BatchTableEnvironment bTableEnv = TableEnvironment.getTableEnvironment(bEnv);
3.1.3 在目录中注册表
TableEnvironment
维护按名称注册的表的目录。有两种类型的表,输入表和输出表。输入表可以在 Table API和SQL查询中引用,并提供输入数据。输出表可用于将 Table API或SQL查询的结果发送到外部系统。
- 现有
Table
对象,通常是 Table API或SQL查询的结果。 -
TableSource
,访问外部数据,例如文件,数据库或消息传递系统。 -
DataStream
或DataSet
来自DataStream或DataSet程序
注册表
在TableEnvironment
以下注册Table
// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// Table is the result of a simple projection query
Table projTable = tableEnv.scan("X").select(...);
// register the Table projTable as table "projectedX"
tableEnv.registerTable("projectedTable", projTable);
如果多个查询引用相同的注册Table
,这将被内联的每个引用的查询和执行多次。
注册TableSource
TableSource
提供对外部数据的访问,存储在存储系统中,例如数据库(MySQL,HBase,…),具有特定编码的文件(CSV,Apache [Parquet,Avro,ORC] ……)或消息系统(Apache Kafka,RabbitMQ,……)
TableSource
在TableEnvironment
以下注册:
// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// create a TableSource
TableSource csvSource = new CsvTableSource("/path/to/file", ...);
// register the TableSource as table "CsvTable"
tableEnv.registerTableSource("CsvTable", csvSource);
注册TableSink
已注册TableSink
可用于将 Table API或SQL查询的结果发送到外部存储系统,例如数据库,键值存储,消息队列或文件系统(在不同的编码中,例如,CSV,Apache [Parquet] ,Avro,ORC],……)。
// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// create a TableSink
TableSink csvSink = new CsvTableSink("/path/to/file", ...);
// define the field names and types
String[] fieldNames = {"a", "b", "c"};
TypeInformation[] fieldTypes = {Types.INT, Types.STRING, Types.LONG};
// register the TableSink as table "CsvSinkTable"
tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink);
3.1.4 注册外部目录
外部目录可以提供有关外部数据库和表的信息,例如其名称,架构,统计信息以及有关如何访问存储在外部数据库,表或文件中的数据的信息。
可以通过实现ExternalCatalog
接口创建外部目录,并在TableEnvironment
以下内容中注册:
// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// create an external catalog
ExternalCatalog catalog = new InMemoryExternalCatalog();
// register the ExternalCatalog catalog
tableEnv.registerExternalCatalog("InMemCatalog", catalog);
注册后,可以指定完整路径
Table API
Table orders = tableEnv.from("Orders");
orders.select($("1"), $("2"))
.where($("1").isEqual("1")).groupBy($("1"));
SQL
// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// register Orders table
// compute revenue for all customers from France
Table revenue = tableEnv.sqlQuery(
"SELECT cID, cName, SUM(revenue) AS revSum " +
"FROM Orders " +
"WHERE cCountry = 'FRANCE' " +
"GROUP BY cID, cName"
);
// emit or convert Table
// execute query
如何指定将其结果插入已注册表的更新查询
// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// register "Orders" table
// register "RevenueFrance" output table
// compute revenue for all customers from France and emit to "RevenueFrance"
tableEnv.executeSql(
"INSERT INTO RevenueFrance " +
"SELECT cID, cName, SUM(revenue) AS revSum " +
"FROM Orders " +
"WHERE cCountry = 'FRANCE' " +
"GROUP BY cID, cName"
);
// execute query
混合 Table API和SQL
Table API和SQL查询可以轻松混合,因为它们都返回Table
对象:
- 可以在
Table
SQL查询返回的对象上定义 Table API 查询。 - 通过在SQL查询的子句中注册生成的表并在其中
TableEnvironment
引用它,可以在 Table API查询的结果上定义FROM
SQL查询。
3.1.5 Table Sink
TableSink
是支持各种文件格式(例如CSV,Apache Parquet,Apache Avro),存储系统(例如,JDBC,Apache HBase,Apache Cassandra,Elasticsearch)或消息传递系统(例如,Apache Kafka,RabbitMQ)的通用接口)。
批处理Table
只能写入 BatchTableSink
,而流式处理Table
需要 AppendStreamTableSink
, RetractStreamTableSink
或 UpsertStreamTableSink
。
有两种方法可以发出表格:
该Table.writeToSink(TableSink sink)
方法使用提供的方法发出表,TableSink
并使用要发出的表的模式自动配置接收器。- 该
Table.insertInto(String sinkTable)
方法查找TableSink
在TableEnvironment
目录中提供的名称下使用特定模式注册的方法。要发出的表的模式根据已注册的模式进行验证TableSink
。
3.1.6 翻译并执行查询
Table API和SQL查询将转换为DataStream
或DataSet
程序,具体取决于它们的输入是流式还是批量输入。查询在内部表示为逻辑查询计划,并分为两个阶段:
- 优化逻辑计划,
- 转换为DataStream或DataSet程序。
在以下情况下转换 Table API或SQL查询:
何时被
Table.writeToSink()
或Table.insertInto()
指定了SQL更新查询,即
TableEnvironment.execute()
调用时Table
转换为DataStream
或DataSet
一旦翻译,Table API或SQL查询像一个普通的数据流中或数据集处理程序,当被执行StreamExecutionEnvironment.execute()
或者ExecutionEnvironment.execute()
被调用。
3.1.7 与DataStream和DataSet API集成
Table API和SQL查询可以轻松集成并嵌入到DataStream
和DataSet
程序中
将DataStream或DataSet注册为表
// get StreamTableEnvironment
// registration of a DataSet in a BatchTableEnvironment is equivalent
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
DataStream<Tuple2<Long, String>> stream = ...
// register the DataStream as Table "myTable" with fields "f0", "f1"
tableEnv.registerDataStream("myTable", stream);
// register the DataStream as table "myTable2" with fields "myLong", "myString"
tableEnv.registerDataStream("myTable2", stream, "myLong, myString");
将DataStream或DataSet转换为表
// get StreamTableEnvironment
// registration of a DataSet in a BatchTableEnvironment is equivalent
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
DataStream<Tuple2<Long, String>> stream = ...
// Convert the DataStream into a Table with default fields "f0", "f1"
Table table1 = tableEnv.fromDataStream(stream);
// Convert the DataStream into a Table with fields "myLong", "myString"
Table table2 = tableEnv.fromDataStream(stream, "myLong, myString");
当转换一个Table
成DataStream
或DataSet
,需要指定将所得的数据类型DataStream
或DataSet
,即,数据类型到其中的行Table
是要被转换。通常最方便的转换类型是Row
。以下列表概述了不同选项的函数:
-
Row:字段按位置,任意数量的字段映射,支持
null
值,无类型安全访问。 -
POJO:字段按名称映射(POJO字段必须命名为
Table
字段),任意数量的字段,支持null
值,类型安全访问。 -
Case Class:字段按位置映射,不支持
null
值,类型安全访问。 -
元组:字段按位置映射,限制为22(Scala)或25(Java)字段,不支持
null
值,类型安全访问。 -
原子类型:
Table
必须具有单个字段,不支持null
值,类型安全访问。
将表转换为DataStream
一个Table
是流处理查询的结果将动态更新,即它正在改变,因为新记录的查询的输入流到达。因此,DataStream
转换这种动态查询需要对表的更新进行编码。
将a转换Table
为a 有两种模式DataStream
:
-
追加模式:只有在动态
Table
仅通过INSERT
更改修改时才能使用此模式,即它仅附加并且以前发出的结果永远不会更新。 -
缩进模式:始终可以使用此模式。它用标志编码
INSERT
和DELETE
改变boolean
// get StreamTableEnvironment.
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// Table with two fields (String name, Integer age)
Table table = ...
// convert the Table into an append DataStream of Row by specifying the class
DataStream<Row> dsRow = tableEnv.toAppendStream(table, Row.class);
// convert the Table into an append DataStream of Tuple2<String, Integer>
// via a TypeInformation
TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(
Types.STRING(),
Types.INT());
DataStream<Tuple2<String, Integer>> dsTuple =
tableEnv.toAppendStream(table, tupleType);
// convert the Table into a retract DataStream of Row.
// A retract stream of type X is a DataStream<Tuple2<Boolean, X>>.
// The boolean field indicates the type of the change.
// True is INSERT, false is DELETE.
DataStream<Tuple2<Boolean, Row>> retractStream =
tableEnv.toRetractStream(table, Row.class);
将数据类型映射到表模式
Flink的DataStream和DataSet API支持各种类型。复合类型(如Tuples(内置Scala和Flink Java元组),POJO,Scala案例类和Flink的Row类型)允许嵌套数据结构具有可在表表达式中访问的多个字段。其他类型被视为原子类型。
数据类型到表模式的映射可以以两种方式发生:基于字段位置或基于字段名称。
基于位置的映射
// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
DataStream<Tuple2<Long, Integer>> stream = ...
// convert DataStream into Table with default field names "f0" and "f1"
Table table = tableEnv.fromDataStream(stream);
// convert DataStream into Table with field names "myLong" and "myInt"
Table table = tableEnv.fromDataStream(stream, "myLong, myInt");
基于名称的映射
// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
DataStream<Tuple2<Long, Integer>> stream = ...
// convert DataStream into Table with default field names "f0" and "f1"
Table table = tableEnv.fromDataStream(stream);
// convert DataStream into Table with field "f1" only
Table table = tableEnv.fromDataStream(stream, "f1");
// convert DataStream into Table with swapped fields
Table table = tableEnv.fromDataStream(stream, "f1, f0");
// convert DataStream into Table with swapped fields and field names "myInt" and "myLong"
Table table = tableEnv.fromDataStream(stream, "f1 as myInt, f0 as myLong");
-
原子类型
Flink认为原语(
Integer
,Double
,String
)或通用类型作为原子类型(无法进行分析和分解类型)。 -
元组(Scala和Java)和案例类(仅限Scala)
Flink支持Scala的内置元组,并为Java提供自己的元组类。两种元组的DataStream和DataSet都可以转换为表。可以通过为所有字段提供名称(基于位置的映射)来重命名字段。如果未指定字段名称,则使用默认字段名称。如果原始字段名(
f0
,f1
,…为Flink元组和_1
,_2
…Scala元组)被引用时,API假设映射,而不是基于位置的基于域名的。基于名称的映射允许使用别名(as
)重新排序字段和Projection。// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); DataStream<Tuple2<Long, String>> stream = ... // convert DataStream into Table with default field names "f0", "f1" Table table = tableEnv.fromDataStream(stream); // convert DataStream into Table with renamed field names "myLong", "myString" (position-based) Table table = tableEnv.fromDataStream(stream, "myLong, myString"); // convert DataStream into Table with reordered fields "f1", "f0" (name-based) Table table = tableEnv.fromDataStream(stream, "f1, f0"); // convert DataStream into Table with projected field "f1" (name-based) Table table = tableEnv.fromDataStream(stream, "f1"); // convert DataStream into Table with reordered and aliased fields "myString", "myLong" (name-based) Table table = tableEnv.fromDataStream(stream, "f1 as 'myString', f0 as 'myLong'");
-
POJO(Java和Scala)
Flink支持POJO作为复合类型。
当转换一个POJO
DataStream
或DataSet
成Table
没有指定字段名,则使用原始POJO字段的名称。名称映射需要原始名称,不能通过位置来完成。可以使用别名(使用as
关键字),重新排序和Projection来重命名字段。// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); // Person is a POJO with fields "name" and "age" DataStream<Person> stream = ... // convert DataStream into Table with default field names "age", "name" (fields are ordered by name!) Table table = tableEnv.fromDataStream(stream); // convert DataStream into Table with renamed fields "myAge", "myName" (name-based) Table table = tableEnv.fromDataStream(stream, "age as myAge, name as myName"); // convert DataStream into Table with projected field "name" (name-based) Table table = tableEnv.fromDataStream(stream, "name"); // convert DataStream into Table with projected and renamed field "myName" (name-based) Table table = tableEnv.fromDataStream(stream, "name as myName");
-
Row
该
Row
数据类型支持字段和字段与任意数量的null
值。字段名称可以通过指定RowTypeInfo
。行类型支持按位置和名称映射字段。可以通过为所有字段提供名称(基于位置的映射)或为Projection/排序/重命名(基于名称的映射)单独选择字段来重命名字段。// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); // DataStream of Row with two fields "name" and "age" specified in `RowTypeInfo` DataStream<Row> stream = ... // convert DataStream into Table with default field names "name", "age" Table table = tableEnv.fromDataStream(stream); // convert DataStream into Table with renamed field names "myName", "myAge" (position-based) Table table = tableEnv.fromDataStream(stream, "myName, myAge"); // convert DataStream into Table with renamed fields "myName", "myAge" (name-based) Table table = tableEnv.fromDataStream(stream, "name as myName, age as myAge"); // convert DataStream into Table with projected field "name" (name-based) Table table = tableEnv.fromDataStream(stream, "name"); // convert DataStream into Table with projected and renamed field "myName" (name-based) Table table = tableEnv.fromDataStream(stream, "name as myName");
3.1.8 查询优化
Apache Flink利用Apache Calcite优化和翻译查询。
可以通过提供CalciteConfig
对象来调整在不同阶段应用的优化规则集。这可以通过调用构建器创建,并通过调用CalciteConfig.createBuilder())
提供给TableEnvironment tableEnv.getConfig.setCalciteConfig(calciteConfig)
。
解释表
Table API提供了一种机制来解释计算的逻辑和优化查询计划Table
。这是通过该TableEnvironment.explain(table)
方法完成的。它返回一个描述三个计划的String:
- 关系查询的抽象语法树,即未优化的逻辑查询计划,
- 优化的逻辑查询计划,以及
- 物理执行计划。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
DataStream<Tuple2<Integer, String>> stream1 = env.fromElements(new Tuple2<>(1, "hello"));
DataStream<Tuple2<Integer, String>> stream2 = env.fromElements(new Tuple2<>(1, "hello"));
Table table1 = tEnv.fromDataStream(stream1, "count, word");
Table table2 = tEnv.fromDataStream(stream2, "count, word");
Table table = table1
.where("LIKE(word, 'F%')")
.unionAll(table2);
String explanation = tEnv.explain(table);
System.out.println(explanation);
四、数据类型和序列化
常见类型交互
-
注册子类型:
.registerType(clazz)
-
注册自定义序列化程序:
.getConfig().addDefaultKryoSerializer(clazz, serializer)
添加类型提示:
手动创建
TypeInformation
:
4.1 Flink的TypeInformation类
基本类型:所有的Java原语及其盒装形式,加
void
,String
,Date
,BigDecimal
,和BigInteger
。基元数组和对象数组
-
复合类型
Flink Java Tuples(Flink Java API的一部分):最多25个字段,不支持空字段
Scala Case Class(包括Scala元组):最多22个字段,不支持空字段
Row:具有任意数量字段的元组并支持空字段
POJO:遵循某种类似bean的模式的类
辅助类型(选项,任一,列表,Map,……)
通用类型:这些不会被Flink本身序列化,而是由Kryo序列化。
4.1.1 POJO类型的规则
dataSet.join(another).where("name").equalTo("personName")
如果满足以下条件,Flink会将数据类型识别为POJO类型(并允许“按名称”字段引用):
- 该类是公共的和独立的(没有非静态内部类)
- 该类有一个公共的无参数构造函数
- 类(以及所有超类)中的所有非静态非瞬态字段都是公共的(和非最终的)或者具有公共getter和setter方法,该方法遵循getter和setter的Java bean命名约定。
4.1.2 创建TypeInformation或TypeSerializer
因为Java通常会擦除泛型类型信息,所以需要将类型传递给TypeInformation构造:
对于非泛型类型,您可以传递类:
TypeInformation<String> info = TypeInformation.of(String.class);
对于泛型类型,您需要通过以下方式“捕获”泛型类型信息TypeHint
:
TypeInformation<Tuple2<String, Double>> info = TypeInformation.of(new TypeHint<Tuple2<String, Double>>(){});
要创建一个TypeSerializer
,只需调用typeInfo.createSerializer(config)
该TypeInformation
对象即可。
该config
参数是类型ExecutionConfig
和保存有关该计划的注册的自定义序列化的信息。尽可能尝试将程序传递给ExecutionConfig。您通常可以通过电话DataStream
或DataSet
通过电话获取getExecutionConfig()
。在内部函数(如MapFunction
)中,您可以通过使函数成为和调用来获得它getRuntimeContext().getExecutionConfig()
4.2 在Java API中类型信息
Java 会类型擦除,Flink尝试使用Java保存的少量位(主要是函数签名和子类信息)通过反射重建尽可能多的类型信息。
4.2.1 在Java API中类型提示
在Flink无法重建已擦除的泛型类型信息的情况下,Java API提供所谓的类型提示。类型提示告诉系统函数生成的数据流或数据集的类型:
DataSet<SomeType> result = dataSet
.map(new MyGenericNonInferrableFunction<Long, SomeType>())
.returns(SomeType.class);
该returns
语句指定生成的类型,在本例中通过类。提示支持通过类型定义
- 用于非参数化类型的类(无泛型)
- TypeHints的形式
returns(new TypeHint<Tuple2<Integer, SomeType>>(){})
。该TypeHint
班可以捕获泛型类型信息,并保存它运行时(通过一个匿名子类)
4.2.2 Java 8 lambdas的类型提取
4.2.3 POJO类型的序列化
TypeInformation正在为POJO中的所有字段创建序列化器。标准类型(如int,long,String等)由我们随Flink提供的序列化程序处理。对于所有其他类型,我们回到Kryo。
如果Kryo无法处理该类型,使用Avro序列化POJO。
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableForceAvro();
希望Kryo序列化程序处理整个POJO类型
env.getConfig().addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass)
程序可能希望明确避免将Kryo用作泛型类型的回退
env.getConfig().disableGenericTypes();
五、管理执行
5.1 执行配置
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ExecutionConfig executionConfig = env.getConfig();
可以使用以下配置选项:(默认为粗体)
enableClosureCleaner()
/disableClosureCleaner()
。默认情况下启用闭包清理器。getParallelism()
/setParallelism(int parallelism)
设置作业的默认并行度。getMaxParallelism()
/setMaxParallelism(int parallelism)
设置作业的默认最大并行度。getNumberOfExecutionRetries()
/setNumberOfExecutionRetries(int numberOfExecutionRetries)
设置重新执行失败任务的次数。值为零可有效禁用容错。值为-1
表示应使用系统默认值。getExecutionRetryDelay()
/setExecutionRetryDelay(long executionRetryDelay)
设置在重新执行作业之前系统在作业失败后等待的延迟(以毫秒为单位)。在TaskManagers上成功停止所有任务后,延迟开始,一旦延迟过去,任务就会重新启动。此参数对于延迟重新执行非常有用,以便在尝试重新执行之前让某些超时相关故障完全浮出水面(例如尚未完全超时的断开连接),并且由于同样的问题而再次立即失败。仅当执行重试次数为一次或多次时,此参数才有效。getExecutionMode()
/setExecutionMode()
。默认执行模式为PIPELINED。设置执行模式以执行程序。执行模式定义数据交换是以批处理还是以流水线方式执行。enableForceKryo()
/disableForceKryo
。Kryo默认不会被迫强制GenericTypeInformation将Pryo序列化程序用于POJO,即使我们可以将它们分析为POJO。enableForceAvro()
/disableForceAvro()
。默认情况下不会强制使用Avro。强制Flink AvroTypeInformation使用Avro序列化程序而不是Kryo来序列化Avro POJO。enableObjectReuse()
/disableObjectReuse()
默认情况下,对象不会在Flink中重复使用。启用对象重用模式将指示运行时重用用户对象以获得更好的性能。请记住,当 算子操作的用户代码函数不知道此行为时,这可能会导致错误。enableSysoutLogging()
/disableSysoutLogging()
JobManager状态更新System.out
默认打印到。此设置允许禁用此行为。getGlobalJobParameters()
/setGlobalJobParameters()
此方法允许用户将自定义对象设置为作业的全局配置。由于ExecutionConfig
可以在所有用户定义的函数中访问,因此这是一种在作业中全局可用的配置的简单方法。addDefaultKryoSerializer(Class<?> type, Serializer<?> serializer)
为给定的注册Kryo序列化程序实例type
。addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass)
为给定的注册Kryo序列化程序类type
。registerTypeWithKryoSerializer(Class<?> type, Serializer<?> serializer)
使用Kryo注册给定类型并为其指定序列化程序。通过使用Kryo注册类型,类型的序列化将更加高效。registerKryoType(Class<?> type)
如果类型最终被Kryo序列化,那么它将在Kryo注册以确保只写入标签(整数ID)。如果某个类型未在Kryo中注册,则其整个类名将与每个实例序列化,从而导致更高的I / O成本。registerPojoType(Class<?> type)
使用序列化堆栈注册给定类型。如果类型最终被序列化为POJO,则该类型将在POJO序列化程序中注册。如果类型最终被Kryo序列化,那么它将在Kryo注册以确保只写入标签。如果某个类型未在Kryo中注册,则其整个类名将与每个实例序列化,从而导致更高的I / O成本。请注意,注册的类型registerKryoType()
不适用于Flink的Kryo序列化程序实例。disableAutoTypeRegistration()
默认情况下启用自动类型注册。自动类型注册是使用Kryo和POJO序列化器注册用户代码使用的所有类型(包括子类型)。setTaskCancellationInterval(long interval)
设置在连续尝试取消正在运行的任务之间等待的间隔(以毫秒为单位)。取消interrupt()
任务时,如果任务线程未在特定时间内终止,则创建新线程,该线程定期调用任务线程。此参数是指连续呼叫之间的时间interrupt()
,默认设置为30000毫秒或30秒。
5.2 程序打包和分布式执行
5.2.1 打包程序
只需将所有涉及的类导出为JAR文件即可。JAR文件的清单必须指向包含程序入口点的类(具有public main
方法的类 )。最简单的方法是将main-class条目放入清单
5.2.2 通过计划打包程序
实现 org.apache.flink.api.common.Program
接口,定义getPlan(String...)
方法。传递给该方法的字符串是命令行参数。可以通过该ExecutionEnvironment#createProgramPlan()
方法从环境创建程序的计划。打包程序的计划时,JAR清单必须指向实现 org.apache.flink.api.common.Program
接口的类,而不是使用main方法的类。
5.2.3 概要
调用打包程序的整个过程如下:
搜索JAR的清单以查找主类或程序类属性。如果找到这两个属性,则program-class属性优先于main-class属性。对于JAR清单既不包含属性的情况,命令行和Web界面都支持手动传递入口点类名的参数。
如果入口点类实现了
org.apache.flink.api.common.Program
,则系统调用该getPlan(String...)
方法以获取要执行的程序计划。如果入口点类没有实现
org.apache.flink.api.common.Program
接口,系统将调用该类的main方法。
5.3 并行执行
Flink程序由多个任务(转换/ 算子,数据源和接收器)组成。任务被分成几个并行实例以供执行,每个并行实例处理任务输入数据的子集。任务的并行实例数称为并行性
如果要使用保存点,还应考虑设置最大并行度(或max parallelism
)。从保存点恢复时,您可以更改特定 算子或整个程序的并行度,此设置指定并行度的上限。这是必需的,因为Flink在内部将状态划分为Keys组,并且我们不能拥有+Inf
多个Keys组,因为这会对性能产生不利影响。
5.3.1 设置并行性
算子级别
通过调用setParallelism()
方法来定义单个算子
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = [...]
DataStream<Tuple2<String, Integer>> wordCounts = text
.flatMap(new LineSplitter())
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1).setParallelism(5);
wordCounts.print();
env.execute("Word Count Example");
运行环境级别
Flink程序在运行环境的上下文中执行。运行环境为其执行的所有算子,数据源和数据接收器定义默认并行性。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
DataStream<String> text = [...]
DataStream<Tuple2<String, Integer>> wordCounts = [...]
wordCounts.print();
env.execute("Word Count Example");
客户级别
在向Flink提交作业时,可以在客户端设置并行性
./bin/flink run -p 10 ../examples/*WordCount-java*.jar
系统级别
可以通过设置parallelism.default
属性来定义所有运行环境的系统范围默认并行度 ./conf/flink-conf.yaml
5.3.2 设置最大并行度
可以设置并行度的位置设置最大并行度(客户端级别和系统级别除外)
用 setMaxParallelism()
设置最大并行度
最大并行度的默认设置大致operatorParallelism + (operatorParallelism / 2)
为下限127
和上限32768
5.4 执行计划
根据各种参数(如数据大小或群集中的计算机数量),Flink的优化程序会自动为您的程序选择执行策略
5.4.1 计划可视化工具
Flink附带了一个用于执行计划的可视化工具。包含可视化工具的HTML文档位于tools/planVisualizer.html
。它采用作业执行计划的JSON表示,并将其可视化为具有执行策略的完整注释的图形。
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
...
System.out.println(env.getExecutionPlan());
要可视化执行计划,请执行以下 算子操作:
-
planVisualizer.html
使用Web浏览器打开 - 将JSON字符串 粘贴 到文本字段中,然后
- 按下绘图按钮。
Flink提供用于提交和执行作业的Web界面。该接口是JobManager的Web界面监测的一部分,每默认情况下,通过此接口端口8081作业提交运行要求您已经设置web.submit.enable: true
在flink-conf.yaml
。
5.5 重启策略
Flink支持不同的重启策略,可以控制在发生故障时如何重新启动作业。
5.5.1 概览
认重启策略是通过Flink的配置文件设置的flink-conf.yaml
。配置参数restart-strategy定义采用的策略。如果未启用检查点,则使用“无重启”策略。如果激活了检查点并且尚未配置重启策略,则固定延迟策略将用于 Integer.MAX_VALUE
重启尝试。
固定延迟重启策略
固定延迟重启策略尝试给定次数重新启动作业。如果超过最大尝试次数,则作业最终会失败。在两次连续重启尝试之间,重启策略等待一段固定的时间。
通过在中设置以下配置参数,此策略默认启用flink-conf.yaml
。
restart-strategy: fixed-delay
配置参数 | 描述 | 默认值 |
---|---|---|
restart-strategy.fixed-delay.attempts |
Flink在作业声明失败之前重试执行的次数。 | 1,或者Integer.MAX_VALUE 如果通过检查点激活 |
restart-strategy.fixed-delay.delay |
延迟重试意味着在执行失败后,重新执行不会立即开始,而是仅在一定延迟之后。当程序与外部系统交互时,延迟重试可能会有所帮助,例如,在尝试重新执行之前,连接或挂起的事务应该达到超时。 |
akka.ask.timeout ,如果通过检查点激活,则为10秒 |
例如:
restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 10 s
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // number of restart attempts
Time.of(10, TimeUnit.SECONDS) // delay
));
故障率重启策略
故障率重启策略在故障后重新启动作业,但是当failure rate
超过(每个时间间隔的故障)时,作业最终会失败。在两次连续重启尝试之间,重启策略等待一段固定的时间。
通过在中设置以下配置参数,此策略默认启用flink-conf.yaml
。
restart-strategy: failure-rate
配置参数 | 描述 | 默认值 |
---|---|---|
<it>重新启动-strategy.failure-rate.max-故障每间隔</it> | 失败作业之前给定时间间隔内的最大重启次数 | 1 |
<it>重启strategy.failure-rate.failure速率间隔</it> | 测量故障率的时间间隔。 | 1分钟 |
<it>重启strategy.failure-rate.delay</it> | 两次连续重启尝试之间的延迟 | <it>akka.ask.timeout</it> |
restart-strategy.failure-rate.max-failures-per-interval: 3
restart-strategy.failure-rate.failure-rate-interval: 5 min
restart-strategy.failure-rate.delay: 10 s
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.failureRateRestart(
3, // max failures per interval
Time.of(5, TimeUnit.MINUTES), //time interval for measuring failure rate
Time.of(10, TimeUnit.SECONDS) // delay
));
不重启策略
作业直接失败,不尝试重启。
restart-strategy: none
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.noRestart());