flink解析之state

一。 概述

在流计算场景中,数据没有边界源源不断的流入的,每条数据流入都可能会触发计算,比如在进行count或sum这些操作,是选择每次触发计算将所有流入的历史数据重新计算一边还是每次计算都基于上次计算结果进行增量计算呢? 从综合考虑角度,很多人都会 选择增量计算,那么问题就产生了:上一次的中间计算结果保存在哪里?内存?这其中会由于本身的网络,硬件或软件等问题造成某个计算节点失败,对应的上次计算结果就会丢失,在节点恢复时,是需要将所有历史数据重新计算一遍的,对于这样的结果大家是很难接受的。

二。flink中state

而在flink中提出了state用来存放计算过程的节点中间结果或元数据等,并提供Exactly-Once语义,例如:执行aggregation时在state中记录中间聚合结果,再如从kafka中摄取记录时,是需要记录对应的partition的offset,而这些state数据在计算过程中会进行持久化的。state就变成了与时间相关的是对flink任务内部数据的快照。
由于流计算大多数场景下都是增量计算的,数据逐条被处理,每次当前结果都是基于上一次计算结果之上进行处理的,这也势必要将上一次的计算结果进行存储持久化,无论是机器,网络,脏数据等原因导致的程序错误,都能在job进行任务恢复时提供支持。基于这些已被持久化的state,而非将历史的数据重新计算一遍。
在flink内部提供三种state存储实现

  • 内存HeapStateBackend:存放数据量小,用于开发测试使用;生产不建议使用
  • HDFS的FsStateBackend :分布式文件持久化,每次都会产生网络io,可用于大state,不支持增量;可用于生产
  • RocksDB的RocksDBStateBackend:本地文件 + 异步hdfs持久化,也可用于大state数据量,唯一支持增量,可用于生产;
    比如使用RocksDB + HDFS进行state存储:首先state先在本地存储到RocksDB,然后异步写入到HDFS中,这样可以突破HeapStateBackend受单节点资源限制(物理内存,机器故障数据丢失等),也减少了分布式过程写入带来的网络io开销。
    (state详情)https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/state.html
    state

三。operator state与keyedstate相关

3.1 state 分类

从operator和data两个角度可将state划分为2类

  • KeyedState:一般groupby或PartitionBy组成的内容,即为key,每个key都有自己的state,并且key与key间的state是不可见的
  • OperatorState:Source Connector的实现中就会用OperatorState来记录source数据读取的offset。

3.2 state扩容重新分配

在flink中每一个并行运算操作实例就是一个独立的任务,可以在机器上调度到网络中其他的机器;并且flink能够进行大规模的有状态流处理,在逻辑上将这些分割成不同operator graph,同时operator也将被物理分解成多个操作实例。在flink的DAG图中随着data流向,垂直方向存在网络io,而水平方向的stateful节点间是没有网络通信的,这样每个operator维护一份自己本地的state,并保存在本地磁盘。
比如source有5个partition,将source并行度1->2,中间stateful operator并行度2->3,结果如下图:


扩容分布图

分析结果如下:在flink中不同的state有不同的扩容方法

  • 关于operatorstate的扩容
    这种state分配结合operator对应并行度parallelism有关联,比如FlinkKafkaConsumerBase的定义
public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T> implements
        CheckpointListener,
        ResultTypeQueryable<T>,
        CheckpointedFunction

看到在代码内部使用ListState:ListState<Tuple2<KafkaTopicPartition, Long>>

@PublicEvolving
public interface ListState<T> extends MergingState<T, Iterable<T>> {
    void update(List<T> values) throws Exception;

    void addAll(List<T> values) throws Exception;
}

通过源码可以看到ListState具体定义,T是Tuple2<KafkaTopicPartition,Long>说明了state存储了当前partition及其offset信息的列表,KafkaTopicPartition代表一个partition,Long代表当前partition的offset,
当source并行度=1,代表所有的partition都在同一个线程中读取,对应所有的partition的state也在同一个state维护:如下图


state存储信息

当把source并行度=2,对应的operator并行度=3,先看下parition与subtask之间的映射方法:
首先根据topic的hash值得到当前的index开始点,进行对齐,接着对当前operator的subtasks进行取模,得到的结果即为当前partition分配的subtask的index

public static int assign(KafkaTopicPartition partition, int numParallelSubtasks) {
        int startIndex = ((partition.getTopic().hashCode() * 31) & 0x7FFFFFFF) % numParallelSubtasks;

        // here, the assumption is that the id of Kafka partitions are always ascending
        // starting from 0, and therefore can be used directly as the offset clockwise from the start index
        return (startIndex + partition.getPartition()) % numParallelSubtasks;
    }
state扩容后

可以发现通过Operator使用List<T>作为state的存储结构,是很容易解决这类state扩容的,不过有一点source扩容后的parallelism是否可以超过Source物理存储上的partition个数?这样会造成资源的浪费,超过partition的并发永远分配不到待处理的partition。

  • KeyedState扩容处理
    在flink相对OperatorState的大小来说,KeyedState还是比较大的,如果采用OperatorState进行取模方式可能会带来网络拉取的成本较大,flink直接采用key-Groups(类似range的方式分配)
    1.Key-Groups:flink对keyedstate按照key进行分组的方式,每个key-group会包含N>0个key,是keystate分配的原子单元,在flink使用KeyGroupRange代表一个key-group。
public class KeyGroupRange implements KeyGroupsList, Serializable {
        ...
        ...
        private final int startKeyGroup;
        private final int endKeyGroup;
        ...
        ...
}

在KeyGroupRange中:startKeyGroup和endKeyGroup用来定义Operator关联的Key-Group个数。
不过参考flink源码可看到key-group在job启动之前对应的数量是需要确定并且运行中是不可变的。本身Operator的最大并行度<= key-group个数,每个Operator实例都会有自己的state,每个state关联至少一个key-group

       ...
    public static int assignToKeyGroup(Object key, int maxParallelism) {
        return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);
    }
        ...
    ...
    public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) {
        return MathUtils.murmurHash(keyHash) % maxParallelism;
    }
        ...

其实从分配key到key-group利用key的hash值与maxParallelism进行取模来完成的。比如parallelism=2 maxParallelism=10


key-state

如上图比如key=a对应hash(a)=97, hash(a) % 10 = 7则分配到KG-7,其他的以此类推。
flink源码中针对分配到key-group的操作如下:

public static KeyGroupRange computeKeyGroupRangeForOperatorIndex(
        int maxParallelism,
        int parallelism,
        int operatorIndex) {

        checkParallelismPreconditions(parallelism);
        checkParallelismPreconditions(maxParallelism);

        Preconditions.checkArgument(maxParallelism >= parallelism,
            "Maximum parallelism must not be smaller than parallelism.");
                //  当前operator实例
        int start = ((operatorIndex * maxParallelism + parallelism - 1) / parallelism);
                // 当前operator下一个实例的位置
        int end = ((operatorIndex + 1) * maxParallelism - 1) / parallelism;
        return new KeyGroupRange(start, end);
    }

如上代码就是用于将operator(已指定parallelism和maxparallelism)对应的key-groups进行分配,计算当前keygroup的start,end:先计算每个Operator实例至少分配的Key-Group个数,将不能整除的部分N个,平均分给前N个实例。最终每个Operator实例管理的Key-Groups会在GroupRange中表示,本质是一个区间值;实例图如下:


样例解析图

1.当parallelism=2时可得到KeyGroupRange:
operatorIndex=0,则得到start=0, end=4:如图kg-keys:0,1,2,3,4
operatorIndex=1,则得到start=5,end=9:如图kg-keys:5,6,7,8,9

2.当parallelism=3时可得到KeyGroupRange:
operatorIndex=0,则得到start=0, end=3:如图kg-keys:0,1,2,3
operatorIndex=1,则得到start=4,end=6:如图kg-keys:4,5,6
operatorIndex=2,则得到start=7, end=9:如图kg-keys:7,8,9

一旦当job修改了maxParallelism的值那么会直接影响到Key-Groups的数量和key的分配,也会打乱所有的Key-Group的分配,目前在Apache Flink系统中统一将maxParallelism的默认值调整到4096,最大程度的避免无法扩容的情况发生。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 196,200评论 5 462
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 82,526评论 2 373
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 143,321评论 0 325
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,601评论 1 267
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,446评论 5 358
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,345评论 1 273
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,753评论 3 387
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,405评论 0 255
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,712评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,743评论 2 314
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,529评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,369评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,770评论 3 300
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,026评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,301评论 1 251
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,732评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,927评论 2 336