背景知识
低延迟 vs 高吞吐
流处理系统与批处理系统最大不同在于节点间的数据传输方式。
对于一个流处理系统,当一条数据被处理完成后,序列化到缓存中,然后立刻通过网络传输到下一个节点,由下一个节点继续处理。而对于一个批处理系统,其节点间数据传输的标准模型是:当一条数据被处理完成后,序列化到缓存中,并不会立刻通过网络传输到下一个节点,当缓存写满,就持久化到本地硬盘上,当所有数据都被处理完成后,才开始将处理后的数据通过网络传输到下一个节点。
这两种数据传输模式是两个极端,对应的是流处理系统对低延迟的要求和批处理系统对高吞吐量的要求。
Flink以固定的缓存块为单位进行网络数据传输,用户可以通过缓存块超时值指定缓存块的传输时机。如果缓存块的超时值为0,则Flink的数据传输方式类似上文所提到流处理系统的标准模型,此时系统可以获得最低的处理延迟。如果缓存块的超时值为无限大,则Flink的数据传输方式类似上文所提到批处理系统的标准模型,此时系统可以获得最高的吞吐量。同时缓存块的超时值也可以设置为0到无限大之间的任意值。缓存块的超时阈值越小,则Flink流处理执行引擎的数据处理延迟越低,但吞吐量也会降低,反之亦然。通过调整缓存块的超时阈值,用户可根据需求灵活地权衡系统延迟和吞吐量
Stream & Transformation & Operator
用户实现的Flink程序是由Stream和Transformation这两个基本构建块组成,其中Stream是一个中间结果数据,而Transformation是一个操作,它对一个或多个输入Stream进行计算处理,输出一个或多个结果Stream。当一个Flink程序被执行的时候,它会被映射为Streaming Dataflow。一个Streaming Dataflow是由一组Stream和Transformation Operator组成,它类似于一个DAG图,在启动的时候从一个或多个Source Operator开始,结束于一个或多个Sink Operator。
下面是一个由Flink程序映射为Streaming Dataflow的示意图:
Parallel Dataflow
在Flink中,程序天生是并行和分布式的:一个Stream可以被分成多个Stream分区(Stream Partitions),一个Operator可以被分成多个Operator Subtask,每一个Operator Subtask是在不同的线程中独立执行的。一个Operator的并行度,等于Operator Subtask的个数,一个Stream的并行度总是等于生成它的Operator的并行度。
在Flink分布式执行环境中,会将多个Operator Subtask串起来组成一个Operator Chain,实际上就是一个执行链,每个执行链会在TaskManager上一个独立的线程中执行
组件栈
Flink是一个分层架构的系统,每一层所包含的组件都提供了特定的抽象,用来服务于上层组件。Flink分层的组件栈如下图所示:
Flink on YARN的部署模式
Flink YARN Client负责与YARN RM(Resouce manager)通信协商资源请求,Flink JobManager和Flink TaskManager分别申请到Container去运行各自的进程。通过上图可以看到,YARN AM(application master)与Flink JobManager在同一个Container中,这样AM可以知道Flink JobManager的地址,从而AM可以申请Container去启动Flink TaskManager。待Flink成功运行在YARN集群上,Flink YARN Client就可以提交Flink Job到Flink JobManager,并进行后续的映射、调度和计算处理
Runtime层: 提供了支持Flink计算的全部核心实现,比如:支持分布式Stream处理、JobGraph到ExecutionGraph的映射、调度等等,为上层API层提供基础服务。
API层: 主要实现了面向无界Stream的流处理和面向Batch的批处理API,其中面向流处理对应DataStream API,面向批处理对应DataSet API。
Libraries层: 该层也可以称为Flink应用框架层,根据API层的划分,在API层之上构建的满足特定应用的实现计算框架,也分别对应于面向流处理和面向批处理两类。面向流处理支持:CEP(复杂事件处理)、基于SQL-like的操作(基于Table的关系操作);面向批处理支持:FlinkML(机器学习库)、Gelly(图处理)。
基本架构
Flink系统的架构与Spark类似,是一个基于Master-Slave风格的架构,如下图所示:
如上图所示,Flink系统主要包含如下3个主要的进程:
JobManager: 系统的协调者,它负责接收Flink Job,调度组成Job的多个Task的执行。同时,JobManager还负责收集Job的状态信息,并管理Flink集群中从节点TaskManager
TaskManager: 是一个Actor,它是实际负责执行计算的Worker,在其上执行Flink Job的一组Task。每个TaskManager负责管理其所在节点上的资源信息,如内存、磁盘、网络,在启动的时候将资源的状态向JobManager汇报。TaskManager端可以分成两个阶段:
注册阶段: TaskManager会向JobManager注册,发送RegisterTaskManager消息,等待JobManager返回AcknowledgeRegistration,然后TaskManager就可以进行初始化过程。
可操作阶段: 该阶段TaskManager可以接收并处理与Task有关的消息,如SubmitTask、CancelTask、FailTask。如果TaskManager无法连接到JobManager,这是TaskManager就失去了与JobManager的联系,会自动进入“注册阶段”,只有完成注册才能继续处理Task相关的消息。
Client 当用户提交一个Flink程序时,会首先创建一个Client,该Client首先会对用户提交的Flink程序进行预处理,并提交到Flink集群中处理,所以Client需要从用户提交的Flink程序配置中获取JobManager的地址,并建立到JobManager的连接,将Flink Job提交给JobManager。Client会将用户提交的Flink程序组装一个JobGraph, 并且是以JobGraph的形式提交的。一个JobGraph是一个Flink Dataflow,它由多个JobVertex组成的DAG。其中,一个JobGraph包含了一个Flink程序的如下信息:JobID、Job名称、配置信息、一组JobVertex等。
调度机制
在JobManager, 会将一个JobGraph转换映射为一个ExecutionGraph. ExecutionGraph是JobGraph的并行表示.
Execution,是一个ExecutionVertex的一次运行Attempt,一个ExecutionVertex可能对应多个运行状态的Execution. 每个Execution通过ExecutionAttemptID来唯一标识,在TaskManager和JobManager之间进行Task状态的交换都是通过ExecutionAttemptID来实现的
迭代机制
在机器学习和图计算应用中,都会使用到迭代计算,Flink通过在迭代Operator中定义Step函数来实现迭代算法,这种迭代算法包括
Iterate (全量更新) : 每次迭代输入完整的上次迭代的数据集
和Delta Iterate (只更新增量)
Backpressure监控
一个Stream上进行处理的多个Operator之间,它们处理速度和方式可能非常不同,所以就存在上游Operator如果处理速度过快,下游Operator处可能机会堆积Stream记录.如果下游Operator能够将自己处理状态传播给上游Operator,使得上游Operator处理速度慢下来就会缓解上述问题
Flink Web界面上提供了对运行Job的Backpressure行为的监控,它通过使用Sampling线程对正在运行的Task进行堆栈跟踪采样来实现
Flink流处理容错
批处理系统比较容易实现容错机制,由于文件可以重复访问,当某个任务失败后,重启该任务即可。但是到了流处理系统,Flink基于分布式快照与可部分重发的数据源实现了容错。 主要的核心为Asynchronous Barrier Snapshots (即Chandy Lamport Algorithm 算法的变种https://yq.aliyun.com/articles/448900)
按照用户自定义的分布式快照间隔时间,Flink会定时在所有数据源中插入一种特殊的快照标记(barrier)消息,这些快照标记消息和其他消息一样在DAG中流动,每一个快照标记消息都将其所在的数据流分成两部分:本次快照(snapshot)数据和下次快照数据。快照标记消息沿着DAG流经各个操作符,当操作符处理到快照标记消息时,会对自己的状态进行快照,并存储起来。当一个操作符有多个输入的时候,Flink会将先抵达的快照标记消息及其之后的消息缓存起来(stream aligning),当所有的输入中对应该次快照的快照标记消息全部抵达后,操作符对自己的状态快照并存储,之后处理所有快照标记消息之后的已缓存消息。操作符对自己的状态快照并存储可以是异步与增量的操作,并不需要阻塞消息的处理. 一旦最后一个Stream接收到Barrier n,Operator会emit所有暂存在Buffer中的记录,然后向Checkpoint Coordinator发送Snapshot n
当然,因为Stream Aligning 为了排列对齐Barrier,会暂时缓存一部分Stream的记录到Buffer中,会造成一部分延迟。在Flink中,提供了一个开关,选择是否使用Stream Aligning,如果关掉则Exactly Once会变成At least once
这种实现拥有如下的优势:
1. 低延迟。由于操作符状态的存储可以异步,所以进行快照的过程基本上不会阻塞消息的处理,因此不会对消息延迟产生负面影响。
2. 高吞吐量。当操作符状态较少时,对吞吐量基本没有影响。当操作符状态较多时,相对于其他的容错机制,分布式快照的时间间隔是用户自定义的,所以用户可以权衡错误恢复时间和吞吐量要求来调整分布式快照的时间间隔。
错误恢复代价。分布式快照的时间间隔越短,错误恢复的时间越少,与吞吐量负相关。
3. 与业务逻辑的隔离。Flink的分布式快照机制与用户的业务逻辑是完全隔离的,用户的业务逻辑不会依赖或是对分布式快照产生任何影响。
将所有task的状态做一个快照(snapshot), 即为checkpoint,然后存储到memory/file syste, with a directory with (typically large) binary files on stable storage (e.g. HDFS, S3, …) and a (relatively small) meta data file
Snapshot并不仅仅是对数据流做了一个状态的Checkpoint(一个Flink Job,在一个特定时刻的一份全局状态快照),它也包含了一个Operator内部所持有的状态,这样才能够在保证在流处理系统失败时能够正确地恢复数据流处理。也就是说,如果一个Operator包含任何形式的状态,这种状态必须是Snapshot的一部分
State(state. Streaming应用的状态, Element in window is state, 一个具体的task/operator的状态) 会被存储到一个可配置的存储系统中,例如HDFS。在一个Checkpoint执行过程中,存储的状态信息及其交互
SavePoint
Checkpoints make state in Flink fault tolerant by allowing state and the corresponding stream positions to be recovered
A Savepoint is a consistent image of the execution state of a streaming job, created via Flink’s checkpointing mechanism. You can use Savepoints to stop-and-resume, fork, or update your Flink jobs. Savepoints consist of two parts: a directory with (typically large) binary files on stable storage (e.g. HDFS, S3, …) and a (relatively small) meta data file
Flink’s Savepoints are different from Checkpoints in a similar way that backups are different from recovery logs in traditional database systems. The primary purpose of Checkpoints is to provide a recovery mechanism in case of unexpected job failures. A Checkpoint’s lifecycle is managed by Flink
Savepoints are created, owned, and deleted by the user. Their use-case is for planned, manual backup and resume
Out of the box, Flink bundles these state backends: MemoryStateBackend, FsStateBackend, RocksDBStateBackend, If nothing else is configured, the system will use the MemoryStateBackend.
How to do something with the data? Windowing
How does the system handle large windows? Managed state
How do operate such a system 24x7? Savepoints
How to ensure correct results across failures? Checkpoints, Master HA
对于流处理系统来说,流入的消息不存在上限,所以对于聚合或是连接等操作,流处理系统需要对流入的消息进行分段,然后基于每一段数据进行聚合或是连接。消息的分段即称为窗口(window),流处理系统支持的窗口有很多类型,最常见的就是时间窗口,基于时间间隔对消息进行分段处理.
有关窗口操作的不同类型,可以分为如下几种:倾斜窗口(Tumbling Windows,记录没有重叠)、滑动窗口(Slide Windows,记录有重叠)、会话窗口(Session Windows)
由于不同节点的时钟可能不同,以及消息在流经各个节点的延迟不同,在某个节点属于同一个时间窗口处理的消息,流到下一个节点时可能被切分到不同的时间窗口中,从而产生不符合预期的结果。
Flink支持3种类型的时间窗口,分别适用于用户对于时间窗口不同类型的要求:
1. Operator Time。根据Task所在节点的本地时钟来切分的时间窗口。
2. Event Time。消息自带时间戳,根据消息的时间戳进行处理,确保时间戳在同一个时间窗口的所有消息一定会被正确处理。由于消息可能乱序流入Task,所以Task需要缓存当前时间窗口消息处理的状态,直到确认属于该时间窗口的所有消息都被处理,才可以释放,如果乱序的消息延迟很高会影响分布式系统的吞吐量和延迟。
3. Ingress Time。有时消息本身并不带有时间戳信息,但用户依然希望按照消息而不是节点时钟划分时间窗口,例如避免上面提到的第二个问题,此时可以在消息源流入Flink时由Flink自动生成时间戳,之后处理的流程与Event Time相同。Ingress Time可以看成是Event Time的一个特例,由于其在消息源处时间戳一定是有序的,所以在流处理系统中,相对于Event Time,其乱序的消息延迟不会很高,因此对Flink分布式系统的吞吐量和延迟的影响也会更小。
Flink借鉴了Google的MillWheel项目,通过WaterMark来支持基于Event Time的时间窗口。
但是由于消息可能是乱序的,所以操作符无法直接确认何时所有属于该时间窗口的消息全部流入此操作符。WaterMark包含一个时间戳,Flink使用WaterMark标记所有小于该时间戳的消息都已流入,
Flink的数据源在确认所有小于某个时间戳的消息都已输出到Flink流处理系统后,会生成一个包含该时间戳的WaterMark,插入到消息流中输出到Flink流处理系统中,Flink操作符按照时间窗口缓存所有流入的消息,当操作符处理到WaterMark时,它对所有小于该WaterMark时间戳的时间窗口数据进行处理并发送到下一个操作符节点
为了保证能够处理所有属于某个时间窗口的消息,操作符必须等到大于这个时间窗口的WaterMark之后才能开始对该时间窗口的消息进行处理,相对于基于Operator Time的时间窗口,Flink需要占用更多内存,且会直接影响消息处理的延迟时间。对此,一个可能的优化措施是,对于聚合类的操作符,可以提前对部分消息进行聚合操作,当有属于该时间窗口的新消息流入时,基于之前的部分聚合结果继续计算
在Flink流处理系统中,基于WaterMark,Flink实现了基于时间戳的全局排序。排序的实现思路如下:排序操作符缓存所有流入的消息,当其接收到WaterMark时,对时间戳小于该WaterMark的消息进行排序
由于WaterMark保证了在其之后不会出现时间戳比它小的消息,所以可以保证排序的正确性。需要注意的是,如果排序操作符有多个节点,只能保证每个节点的流出消息是有序的,节点之间的消息不能保证有序,要实现全局有序,则只能有一个排序操作符节点。
Flink内存管理
flink 基于大数据计算场景设计的特殊内存管理。 而不是向JAVA 这种面向多种目的的编程语言。与之类似的为Spark Tungsten project 也在逐步实现采用类似的技术
显式内存管理的前提步骤就是序列化,将Java对象序列化成二进制数据存储在内存上。
on heap:数据存储在 java heap 上, 会被GC
或是off-heap:不在JVM但还在内存,不会被GC
1. 所有的JVM由统一的内存管理器管理,避免内存碎片。 且数据都以二进制存储, 垃圾回收压力低
2. 只将操作相关的数据连续存储,可以最大化的利用L1/L2/L3缓存,减少Cache miss的概率。以排序为例,(1)将所有排序数据的Key与Value分开存储,排序时只需对key 和指向value的指针进行交换 ,(2) 并对Key连续存储,那么访问Key时的Cache命中率会大大提高
3. 定制化的序列工具:
a. 由于处理的数据流为同一类型, 可只保存一份对象的schema
b. 对于固定大小的类型, 可通过固定的偏移位置存取。这样在访问对象成员变量时, 只需要通过偏移量反序列化特定的对象成员变量。并不需要反序列化整个Java对象, 这样能够大大减少Java对象的创建开销,以及内存数据的拷贝大小
c. 对数据集的类型信息进行分析,然后自动生成定制的序列化工具类,.同时自动生成TypeComparator,用来辅助直接对序列化后的二进制数据直接进行compare、hash等操作. (java.io.Serializable将Java对象及其成员变量的所有元信息作为其序列化数据的一部分,序列化后的数据包含了所有反序列化所需的信息。这在某些场景中十分必要,但是对于Flink这样的分布式计算框架来说,这些元数据信息可能是冗余数据)
4. 显示内存管理
Flink implements its algorithms not against Java objects, arrays, or lists, but actually against a data structure similar to java.nio.ByteBuffer. Flink uses its own specialized version, called MemorySegment on which algorithms put and get at specific positions ints, longs, byte arrays, etc, and compare and copy memory
Flink将内存分为3个部分:
(1)Network buffers: 一些以32KB Byte数组为单位的buffer,主要被网络模块用于数据的网络传输。
(2)Memory Manager pool 大量以32KB (默认,相当大的空间) Byte数组为单位的内存池,所有的运行时算法(例如Sort/Shuffle/Join)都从这个内存池申请内存,并将序列化后的数据存储其中,结束后释放回内存池。
(3)Remaining (Free) Heap主要留给UDF中用户自己创建的Java对象,由JVM管理。
所有的运行时数据结构和算法只能通过内存池申请内存,保证了其使用的内存大小是固定的, 进行对象大小检查和阈值设置(spillling to disk), 不会因为运行时数据结构和算法而发生OOM。
off-heap的内存管理支持。好处有:(http://sungsoo.github.io/2015/09/24/fllink-off-heap.html)
启动分配了大内存(例如100G)的JVM很耗费时间,垃圾回收也很慢。如果采用off-heap,剩下的Network buffer和Remaining heap都会很小,垃圾回收也不用考虑MemorySegment中的Java对象了。
更有效率的IO操作。在off-heap下,将MemorySegment写到磁盘或是网络可以支持zeor-copy技术,而on-heap的话则至少需要一次内存拷贝. (Zero-copy 指CPU不需要先将数据从某处内存复制到另一个特定区域。 在Java 7 中新加入的零拷贝机制,使原来将数据从磁盘写入网卡需要经过四次拷贝,缩减到了三次。零拷贝机制能够省去其中从用户端到内核端的数据拷贝过程。关于零复制的介绍,可以参照下面的资料https://www.ibm.com/developerworks/cn/java/j-zerocopy/ )
off-heap可用于错误恢复,比如JVM崩溃,在on-heap时数据也随之丢失,但在off-heap下,off-heap的数据可能还在。此外,off-heap上的数据还可以和其他程序和进程共享, 暂时Flink 还没有利用这点 (https://flink.apache.org/news/2015/09/16/off-heap-memory.html)
参考文献:
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=53741525
http://sungsoo.github.io/2015/09/24/fllink-off-heap.html
https://www.jianshu.com/p/c4d6a7230973
https://qconlondon.com/london-2016/system/files/presentation-slides/robertmetzger.pdf
https://www.oreilly.com/library/view/stream-processing-with/9781491974285/ch01.html
https://www.jianshu.com/p/b74909d47fb9
http://shiyanjun.cn/archives/1508.html
https://flink.apache.org/news/2015/09/16/off-heap-memory.html
https://ci.apache.org/projects/flink/flink-docs-stable/dev/batch/iterations.html