Flink State机制源码解析

在流计算中,事件会持续不断的产生,如果每次计算都是相互独立的,不依赖于上下游事件,则称为无状态 (stateless) 计算;如果每次的计算依赖于之前或者后续的事件,则称为有状态 (stateful) 计算。Flink 中的状态用 State 抽象,用来保存中间计算结果或者缓存数据,State 是 Flink 内部实现 Exactly-Once的基础。

状态类型

和 redis 类似,Flink 按照数据类型的不同,定义了多种 State 接口,具体如下:

  1. ValueState<T>

    单值状态,与数据的 key 绑定;提供了 update(T value) 方法更新值,value() 方法获取状态值。

  2. ListState<T>

    Key 上的状态值为一个 List;提供了 add ,get 方法来分别增加和获取数据。

  3. MapState<UK, UV>

    Key 上的状态值为一个 Map;提供了 put,putAll, get 方法来增加和获取数据。

  4. ReducingState<T>

    实现的 ReduceFunction 中使用的 state, 在 reduce 方法之前,会先调用 ReducingState 的 add 方法,reduce 方法中的第一个参数就是状态值。

  5. AggregatingState<IN, OUT>

    聚合 state; 在 AggregateFunction 的 add 方法调用之前,会先调用 AggregatingState 的 add 方法,传入 acc。

按照算子是否有 key, Flink State 又被划分为 KeyedState 和 OperatorState。

类型 state
KeyedState ValueState<br/ >ListState<br />ReducingState<br />AggregationState<br />MapState<br />
OperatorState ListState

今天我们重点讲 KeyedState 里的最简单的 ValueState 的实现。KeyedState,顾名思义,要与 key 绑定,即只能用在 KeyedStream 流之后;每一个 key 对应一个 State 值;Key 除了有分区的作用,在状态管理当中,它还用于计算 keyGroupIndex:

// AbstractStreamOperator.java line:463
private <T> void setKeyContextElement(StreamRecord<T> record, KeySelector<T, ?> selector) throws Exception {
    if (selector != null) {
        Object key = selector.getKey(record.getValue());
        setCurrentKey(key);
    }
}

// StreamOperatorStateHandler.java line:281
public void setCurrentKey(Object key) {
    if (keyedStateBackend != null) {
        try {
            // need to work around type restrictions
            @SuppressWarnings("unchecked,rawtypes")
            AbstractKeyedStateBackend rawBackend = (AbstractKeyedStateBackend) keyedStateBackend;

            rawBackend.setCurrentKey(key);
        } catch (Exception e) {
            throw new RuntimeException("Exception occurred while setting the current key context.", e);
        }
    }
}

// 接着看 AbstractKeyedStateBackend.java line:172
public void setCurrentKey(K newKey) {
    notifyKeySelected(newKey);
    this.keyContext.setCurrentKey(newKey);      this.keyContext.setCurrentKeyGroupIndex(KeyGroupRangeAssignment.assignToKeyGroup(newKey,numberOfKeyGroups));
}

从上面可以看出, 先设置 key, 再设置 keyGroupIndex,具体 keyGroupIndex 的作用是什么,我们后面会讲。

状态描述

State 是暴露给用户的接口,那么就需要指定状态的一些属性,如 name, type, ttl 等。Flink 中用 StateDescriptor 来描述一个状态,在对应的 StateBackend (状态后端) 中,调用 create 方法得到对应的 State 对象。下面以一个简单的 demo 来演示。

private transient ValueState<Tuple2<Integer, Integer>> state;
public void open(Configuration parameters) throws Exception {
    ValueStateDescriptor<Tuple2<Integer, Integer>> descriptor =
                    new ValueStateDescriptor<>("average", TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>() {}));
    state = getRuntimeContext().getState(descriptor);
}

我们顺着方法调用链走下去,跳过中间一些无关的代码,直接看 TtlStateFactory 的 createStateAndWrapWithTtlIfEnabled 方法:

public static <K, N, SV, TTLSV, S extends State, IS extends S> IS createStateAndWrapWithTtlIfEnabled(
        TypeSerializer<N> namespaceSerializer,
        StateDescriptor<S, SV> stateDesc,
        KeyedStateBackend<K> stateBackend,
        TtlTimeProvider timeProvider) throws Exception {
        ...
        // 是否为状态设置了 ttl,如果设置 ttl,使用TtlStateFactory创建state,如果没有,直接调用 stateBackend创建state
        return  stateDesc.getTtlConfig().isEnabled() ?
            new TtlStateFactory<K, N, SV, TTLSV, S, IS>(
                namespaceSerializer, stateDesc, stateBackend, timeProvider)
                .createState() :
            stateBackend.createInternalState(namespaceSerializer, stateDesc);
    }

上面两个创建 state 的不同分支区别就是是否设置了 ttl,其它的基本一样。接着看 stateBackend.createInternalState 方法的实现

public <N, SV, SEV, S extends State, IS extends S> IS createInternalState(
        @Nonnull TypeSerializer<N> namespaceSerializer,
        @Nonnull StateDescriptor<S, SV> stateDesc,
        @Nonnull StateSnapshotTransformFactory<SEV> snapshotTransformFactory) throws Exception {
        StateFactory stateFactory = STATE_FACTORIES.get(stateDesc.getClass());
        if (stateFactory == null) {
            String message = String.format("State %s is not supported by %s",
                stateDesc.getClass(), this.getClass());
            throw new FlinkRuntimeException(message);
        }
        // 这里是状态实现原理的重点, 状态的获取/更新都是通过 statetable 操作的,我们重点看下里面的实现
        StateTable<K, N, SV> stateTable = tryRegisterStateTable(
            namespaceSerializer, stateDesc, getStateSnapshotTransformFactory(stateDesc, snapshotTransformFactory));
        // 根据状态描述的类型,调用对应的构造方法。
        return stateFactory.createState(stateDesc, stateTable, getKeySerializer());
    }

StateTable 是状态实现原理的重点, 状态的获取/更新都是通过 statetable 操作的,我们重点看下里面的实现。

private <N, V> StateTable<K, N, V> tryRegisterStateTable(
        TypeSerializer<N> namespaceSerializer,
        StateDescriptor<?, V> stateDesc,
        @Nonnull StateSnapshotTransformFactory<V> snapshotTransformFactory) throws StateMigrationException {

        @SuppressWarnings("unchecked")
        StateTable<K, N, V> stateTable = (StateTable<K, N, V>) registeredKVStates.get(stateDesc.getName());

        TypeSerializer<V> newStateSerializer = stateDesc.getSerializer();

        if (stateTable != null) {
            // ...
        } else {
            RegisteredKeyValueStateBackendMetaInfo<N, V> newMetaInfo = new RegisteredKeyValueStateBackendMetaInfo<>(
                stateDesc.getType(),
                stateDesc.getName(),
                namespaceSerializer,
                newStateSerializer,
                snapshotTransformFactory);
            // 得到 statetable对象。启用 statetable的构造方法。初始化size为128(默认最大并行度)的 StateMap数组
            stateTable = snapshotStrategy.newStateTable(keyContext, newMetaInfo, keySerializer);
            registeredKVStates.put(stateDesc.getName(), stateTable);
        }

        return stateTable;
    }

StateTable 里提供的 get, put 操作实现了对状态值的 获取和更新,StateTable是一个抽象类,如下:

StateTable.java

NestedMapsStateTable 使用两层嵌套的 HashMap 保存状态数据,支持同步快照;CopyOnWriteStateTable 使用 CopyOnWriteStateMap 来保存状态数据,支持异步快照。下面是 CopyOnWriteStateTable里的 put 和 get 方法的实现

private S get(K key, int keyGroupIndex, N namespace) {
        checkKeyNamespacePreconditions(key, namespace);
        // stateMap数组默认size为128,keyGroupIndex为key的hash与128取模得到
 // StateMap内部封装了MapEntry,类似于 HashMap,基于链表加数组的实现
        StateMap<K, N, S> stateMap = getMapForKeyGroup(keyGroupIndex);
        if (stateMap == null) {
            return null;
        }
        // 从map中得到state value
        return stateMap.get(key, namespace);
}

public void put(K key, int keyGroup, N namespace, S state) {
        checkKeyNamespacePreconditions(key, namespace);
        // stateMap数组默认size为128,keyGroupIndex为key的hash与128取模得到
        StateMap<K, N, S> stateMap = getMapForKeyGroup(keyGroup);
        stateMap.put(key, namespace, state);
}

上面代码比较简单明了,也就不用多做解释。

总结

  1. StateDescribe 持有状态的描述, StateBackend 通过它来创建 State 对象
  2. State 对象里封装了 StateTable,StateTable 负责对State 做 snapshot 到对应的 StateBackend。
  3. StateTable 里封装了 StateMap,为存储 state 的内存介质,负责状态的更新/新增/获取 (基于内存)。
  4. StateMap 在 StateTable 中为一个数组,默认 size 为最大并行度 128,所以会存在不同 key 对应一个StateMap对象。里面的实现类似于 HashMap,为链表加数组的数据结构。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,937评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,503评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,712评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,668评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,677评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,601评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,975评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,637评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,881评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,621评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,710评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,387评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,971评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,947评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,189评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,805评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,449评论 2 342

推荐阅读更多精彩内容