一、概述
数据局部性是Flink中的一个关键原则,并且强烈影响状态的存储和访问方式。Flink中的状态都是Local State。Why local state is a fundamental primitive in stream processing
- 1、Apache Flink is a massively parallel distributed system that allows stateful stream processing at large scale. For scalability, a Flink job is logically decomposed into a graph of operators, and the execution of each operator is physically decomposed into multiple parallel operator instances. Conceptually, each parallel operator instance in Flink is an independent task that can be scheduled on its own machine in a network-connected cluster of shared-nothing machines.
Apache Flink是一个大规模并行分布式系统,允许大规模的有状态流处理。对于可伸缩性,Flink作业在逻辑上被分解为运算符图,并且每个运算符的执行在物理上被分解为多个并行运算符实例。从概念上讲,Flink中的每个并行运算符实例都是一个独立的任务,可以在无共享机器的网络连接集群中的自己的机器上进行调度。
- 2、For high throughput and low latency in this setting, network communications among tasks must be minimized. In Flink, network communication for stream processing only happens along the logical edges in the job’s operator graph (vertically), so that the stream data can be transferred from upstream to downstream operators.
对于此设置中的高吞吐量和低延迟,必须最小化任务之间的网络通信。在Flink中,用于流处理的网络通信仅发生在作业运算符图中的逻辑边缘(垂直),以便流数据可以从上游传输到下游operator。
- 3、For high throughput and low latency in this setting, network communications among tasks must be minimized. In Flink, network communication for stream processing only happens along the logical edges in the job’s operator graph (vertically), so that the stream data can be transferred from upstream to downstream operators.
对于此设置中的高吞吐量和低延迟,必须最小化任务之间的网络通信。在Flink中,用于流处理的网络通信仅发生在作业运算符图中的逻辑边缘(垂直),以便流数据可以从上游传输到下游operator。
- 4、However, there is no communication between the parallel instances of an operator (horizontally). To avoid such network communication, data locality is a key principle in Flink and strongly affects how state is stored and accessed.
但是,operator的并行实例之间没有通信(水平)。为了避免这种网络通信,数据局部性是Flink中的一个关键原则,并且强烈影响状态的存储和访问方式。
二、Rescaling Stateful Stream Processing Jobs
三、Reassigning Operator State When Rescaling
Operator States的动态扩展是非常灵活的,现提供了3种扩展,下面分别介绍:
- 1、ListState:并发度在改变的时候,会将并发上的每个List都取出,然后把这些List合并到一个新的List,然后根据元素的个数在均匀分配给新的Task;
- 2、UnionListState:相比于ListState更加灵活,把划分的方式交给用户去做,当改变并发的时候,会将原来的List拼接起来。然后不做划分,直接交给用户;
- 3、BroadcastState:如大表和小表做Join时,小表可以直接广播给大表的分区,在每个并发上的数据都是完全一致的。做的更新也相同,当改变并发的时候,把这些数据COPY到新的Task即可;
1、Operator State只有一种数据结构即:ListState<T>,并且是全局的, Operator State的每个SubTask贡献一部分T给ListState<T>。正是因为是List,Operator在rescaling的时候,才会进行分配。否则一个T,对于Flink,这个T就是一个黑盒,Flink无法进行分配。
2、为什么Operator State只提供了一种数据结构ListState<T>,就是因为Operator State的Rescale的问题。
- 1、Operator State不像Keyed State可以有一个全局的状态(对于每一个Key来说,当然Transform也是围绕一个个key独立进行),因为无论怎样改变并行度,所有具有相同key的Records都会落到一个Task上面,只是有可能这个key对应的Records换个Task而已,一个key只有一个并行度。
- 2、对于Operator State来说,当并行度改变的时候,上游过来的Records会重新散列到SubTask上面,可以理解为每个subTask上的Records变了。所以ListState中的T(item),我们希望是is considered an atomic, independently re-distributable part of the operator state.
图解
- As a generalized approach to solve this black box problem, we slightly modified the checkpointing interface, called ListCheckpointed. Figure 2B shows the new checkpointing interface, which returns and receives a list of state partitions. Introducing a list instead of a single object makes the meaningful partitioning of state explicit: each item in the list still remains a black box to Flink, but is considered an atomic, independently re-distributable part of the operator state.
作为解决这个黑盒问题的一种通用方法,我们稍微修改了一个名为的checkpointing接口,称为ListCheckpointed。图2B显示了新的检查点接口,它返回并接收状态分区列表。引入列表而不是单个对象会使状态的有意义分区显式化:列表中的每个T仍然是Flink的黑盒子,但被认为是原子的,可独立重新分配的operator state的一部分。
四、Reassigning Keyed State When Rescaling
1、Question:
- 1、While this automatically solves the problem of logically remapping the state to sub-tasks after rescaling, there is one more practical problem left to solve: how can we efficiently transfer the state to the subtasks’ local backends?
虽然这会自动解决重新缩放后逻辑上将状态重新映射到子任务的问题(因为由key😄),但还有一个实际问题需要解决:我们如何才能有效地将状态转移到子任务的本地后端?
2、Answer
- 1、Naive Approach
A naive approach might be to read all the previous subtask state from the checkpoint in all sub-tasks and filter out the matching keys for each sub-task. While this approach can benefit from a sequential read pattern, each subtask potentially reads a large fraction of irrelevant state data, and the distributed file system receives a huge number of parallel read requests.
一种天真的方法可能是从所有子任务中的检查点读取所有先前的子任务状态,并过滤掉每个子任务的匹配键。虽然这种方法可以从顺序读取模式中受益,但是每个子任务可能会读取大部分不相关的状态数据,并且分布式文件系统接收大量的并行读取请求。
- 2、Index
Another approach could be to build an index that tracks the location of the state for each key in the checkpoint. With this approach, all sub-tasks could locate and read the matching keys very selectively. This approach would avoid reading irrelevant data, but it has two major downsides. A materialized index for all keys, i.e. a key-to-read-offset mapping, can potentially grow very large. Furthermore, this approach can also introduce a huge amount of random I/O (when seeking to the data for individual keys, see Figure 3A, which typically entails very bad performance in distributed file systems.
另一种方法可以是构建一个索引,该索引跟踪检查点中每个密钥的状态位置。通过这种方法,所有子任务都可以非常有选择地定位和读取匹配的键。这种方法可以避免读取不相关的数据,但它有两个主要缺点。1、所有键的物化索引(即键 - 读 - 偏移映射)可能会变得非常大。此外,2、这种方法还可以引入大量的随机I/O(当寻求单个key的数据时,参见图3A,这通常在分布式文件系统中带来非常糟糕的性能)。
- 3、key-groups(the atomic unit of state assignment)
However, the new parallelism can be at most the previously configured max-parallelism. Once a job was started, the max-parallelism is baked into the savepoints and cannot be changed anymore.
新的并行度最多可以是先前配置的最大并行度。作业启动后,最大并行度将被烘焙到保存点中,并且无法再进行更改。除非抛弃所有状态,作为一个新job开始
1、Flink’s approach sits in between those two extremes by introducing key-groups as the atomic unit of state assignment. How does this work? The number of key-groups must be determined before the job is started and (currently) cannot be changed after the fact. As key-groups are the atomic unit of state assignment, this also means that the number of key-groups is the upper limit for parallelism. In a nutshell, key-groups give us a way to trade between flexibility in rescaling (by setting an upper limit for parallelism) and the maximum overhead involved in indexing and restoring the state.
2、We assign key-groups to subtasks as ranges. This makes the reads on restore not only sequential within each key-group, but often also across multiple key-groups. An additional benefit: this also keeps the metadata of key-group-to-subtask assignments very small. We do not maintain explicit lists of key-groups because it is sufficient to track the range boundaries.
3、We have illustrated rescaling from parallelism 3 to 4 with 10 key-groups in Figure 3B. As we can see, introducing key-groups and assigning them as ranges greatly improves the access pattern over the naive approach. Equation 2 and 3 in Figure 3B also details how we compute key-groups and the range assignment.