Flink 有状态的流的工作(Working with state)

有状态的函数和操作在处理各个元素或者事件时存储数据,使得state称为任何类型的复杂操作的关键构建部件,例如:
  当一个应用程序搜索某些特定的事件模式时,状态会保存截止到目前为止遇到过的事件的顺序;
  当每分钟聚合事件时,状态会保存挂起的聚合
  当通过数据点来训练机器学习模型时,状态会保存当前版本的模型参数

为了使state容错,Flink需要识别state并checkpoint它, 在许多情况下,Flink还管理着应用程序的状态,这意味着Flink处理内存管理(如果需要,可能会将内存中的数据溢出到磁盘)来保存非常大的state。

这篇文档介绍了在开发应用程序时如何使用Flink的state 抽象概念。

键控State 和算子 State(Keyed State and Operator State)

在Flink中有两个基本的state:Keyed state和 Operator state

Keyed State

Keyed State总是与key相关,并且只能应用于KeyedStream的函数和操作中。

你可以认为Keyed State是一个已经分区或者划分的,每个state分区对应一个key的Operator State, 每个keyed-state逻辑上与一个<并行操作实例, 键>(<parallel-operator-instance, key>)绑定在一起,由于每个key属于唯一一个键控算子(keyed operator)的并行实例,我们可以简单地看作是<operator, key>

Keyed State可以进一步的组成Key Group, Key Group是Flink重新分配Keyed State的最小单元,这里有跟定义的最大并行数一样多的Key Group,在运行时keyed operator的并行实例与key一起为一个或者多个Key Group工作。

Operator State

使用Operator State(或者非键控的state)的话,每个算子状态绑定到一个并行算子实例中。Kafka Connector就是在Flink中使用Operator State的一个很好的例子,每个Kafka consumer的并行实例保存着一个topic分区和偏移量的map作为它的Operator State
当并行数发生变化时,Operator State接口支持在并行操作实例中进行重新分配,这里有多种方法来进行重分配。

原生的和托管的State(Raw and Managed State)

Keyed State和 Operator State存在两种形式:托管的和原生的。

托管的State(Managed State)由Flink运行时控制的数据结构表示, 例如内部哈希表或者RocksDB,例子是"ValueSate", "ListState"等。Flink运行时会对State编码并将它们写入checkpoint中。

原生State(Raw State)是算子保存它们自己的数据结构的state,当checkpoint时,它们仅仅写一串byte数组到checkpoint中。Flink并不知道State的数据结构,仅能看到原生的byte数组。

所有的数据流函数都可以使用托管state,但是原生state接口只能在实现operator时才能使用。使用托管State(而不是原生state)被推荐使用是因为使用托管state,当并行度发生变化时,Flink可以自动地重新分配state,同时还能更好地管理内存。

使用托管的键控state(Using Managed Keyed State)

托管的键控state接口可以访问所有当前输入元素的key范围内的不同类型的state,这也就意味着这种类型的state只能被通过stream.keyBy(...)创建的KeyedStream使用。
现在我们首先来看一下可用的不同类型的state,然后在看它们是如何在程序中使用的,可用State的原形如下:
ValueState<T>:这里保存了一个可以更新和检索的值(由上述输入元素的key所限定,所以一个操作的每个key可能有一个值)。这个值可以使用update(T)来更新,使用T value()来获取。

ListState<T>:这个保存了一个元素列表,你可以追加元素以及获取一个囊括当前所保存的所有元素的Iterable,元素可以通过调用add(T)来添加,而Iterable可以调用Iterable<T> get()来获取。

ReducingState<T>:这个保存了表示添加到state的所值的聚合的当个值,这个接口与ListState类似,只是调用add(T)添加的元素使用指定的ReduceFunction聚合成一个聚合值。

FoldingState<T, ACC>:这将保存表示添加到状态的所有值的聚合的单个值,与ReducingState相反,聚合的数据类型可能跟添加到State的元素的数据类型不同,这个接口与ListState类似,只是调用add(T)添加的元素使用指定的FoldFunction折叠成一个聚合值。

MapState<T>:这个保存了一个映射列表,你可以添加key-value对到state中并且检索一个包含所有当前保存的映射的Iterable。映射可以使用put(UK, UV)或者putAll(Map<UK, UV>)来添加。与key相关的value,可以使用get(UK)来获取,映射的迭代、keys及values可以分别调用entries(), keys()values()来获取。

所有类型的state都有一个clear()方法来清除当前活动的key(及输入元素的key)的State。

注意:FoldingState会在下一个Flink版本中启用,并在将来的版本中彻底删除,将提供更加一般的替代方案。

值得注意的是这些State对象仅用于与State进行接口,State并不只保存在内存中,也可能会在磁盘中或者其他地方,第二个需要注意的是从State中获取的值依赖于输入元素的key,因此如果涉及的key不同,那么在一次调用用户函数中获得的值可能与另一次调用的值不同。

为了获得一个State句柄,你需要创建一个StateDescriptor,这个StateDescriptor保存了state的名称(接下来我们会讲到,你可以创建若干个state,但是它们必须有唯一的值以便你能够引用它们),State保存的值的类型以及用户自定义函数如:一个ReduceFunction。根据你想要检索的state的类型,你可以创建一个ValueStateDescriptor, 一个ListStateDescriptor, 一个ReducingStateDescriptor, 一个FoldingStateDescriptor或者一个MapStateDescriptor

State可以通过RuntimeContext来访问,所以只能在富函数中使用。RichFunction中的RuntimeContext有以下这些方法来访问state:
  ValueState<T> getState(ValueStateDescriptor<T>)
  ReducingState<T> getReducingState(ReduceingStateDescriptor<T>)
  ListState<T> getListState(ListStateDescriptor<T>)
  FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC>)
  MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)

这个FlatMapFunction例子展示了所有部件如何组合在一起:

public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {

    /**
     * The ValueState handle. The first field is the count, the second field a running sum.
     */
    private transient ValueState<Tuple2<Long, Long>> sum;

    @Override
    public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {

        // access the state value
        Tuple2<Long, Long> currentSum = sum.value();

        // update the count
        currentSum.f0 += 1;

        // add the second field of the input value
        currentSum.f1 += input.f1;

        // update the state
        sum.update(currentSum);

        // if the count reaches 2, emit the average and clear the state
        if (currentSum.f0 >= 2) {
            out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
            sum.clear();
        }
    }

    @Override
    public void open(Configuration config) {
        ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
                new ValueStateDescriptor<>(
                        "average", // the state name
                        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information
                        Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
        sum = getRuntimeContext().getState(descriptor);
    }
}

// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
        .keyBy(0)
        .flatMap(new CountWindowAverage())
        .print();

// the printed output will be (1,4) and (1,5)
class CountWindowAverage extends RichFlatMapFunction[(Long, Long), (Long, Long)] {

  private var sum: ValueState[(Long, Long)] = _

  override def flatMap(input: (Long, Long), out: Collector[(Long, Long)]): Unit = {

    // access the state value
    val tmpCurrentSum = sum.value

    // If it hasn't been used before, it will be null
    val currentSum = if (tmpCurrentSum != null) {
      tmpCurrentSum
    } else {
      (0L, 0L)
    }

    // update the count
    val newSum = (currentSum._1 + 1, currentSum._2 + input._2)

    // update the state
    sum.update(newSum)

    // if the count reaches 2, emit the average and clear the state
    if (newSum._1 >= 2) {
      out.collect((input._1, newSum._2 / newSum._1))
      sum.clear()
    }
  }

  override def open(parameters: Configuration): Unit = {
    sum = getRuntimeContext.getState(
      new ValueStateDescriptor[(Long, Long)]("average", createTypeInformation[(Long, Long)])
    )
  }
}


object ExampleCountWindowAverage extends App {
  val env = StreamExecutionEnvironment.getExecutionEnvironment

  env.fromCollection(List(
    (1L, 3L),
    (1L, 5L),
    (1L, 7L),
    (1L, 4L),
    (1L, 2L)
  )).keyBy(_._1)
    .flatMap(new CountWindowAverage())
    .print()
  // the printed output will be (1,4) and (1,5)

  env.execute("ExampleManagedState")
}

这个例子实现了一个简单的计数器,我们使用元组的第一个字段来进行分组(这个例子中,所有的key都是1),这个函数将计数和运行时总和保存在一个ValueState中,一旦计数大于2,就会发出平均值并清理state,因此我们又从0开始。请注意,如果我们在第一个字段中具有不同值的元组,则这将为每个不同的输入键保持不同的state值。

Scala DataStream API 中的 State

除了上述接口之外,Scala API还具有快捷方式在KeyedStream上通过有状态的map()flatMap()函数获取当个ValueState, 用户定义的Function以一个Option形式来获取ValueState的当前值,并且必须返回一个更新的值来更新State。

val stream: DataStream[(String, Int)] = ...

val counts: DataStream[(String, Int)] = stream
  .keyBy(_._1)
  .mapWithState((in: (String, Int), count: Option[Int]) =>
    count match {
      case Some(c) => ( (in._1, c), Some(c + in._2) )
      case None => ( (in._1, 0), Some(in._2) )
    })

使用托管算子State

为了使用托管的算子State,有状态的函数可以实现更加通用的CheckpointedFunction接口或者ListCheckpoint<T extends Serializable>接口

CheckpointedFunction

CheckpointedFunction接口可以通过不同的重分区模式来访问非键控的state,它需要实现两个方法:

void snapshotState(FunctionSnapshotContext context) throws Exception;
void initializeState(FunctionInitializationContext context) throws Exception;

无论何时执行checkpoint,snapshotState()都会被调用,相应地,每次初始化用户定义的函数时,都会调用对应的initializeState(),当函数首次初始化时,或者当该函数实际上是从较早的检查点进行恢复时调用的。鉴于此,initializeState()不仅是不同类型的状态被初始化的地方,而且还是state恢复逻辑的地方。

目前列表式托管算子状态是支持的,State要求是一个可序列化的彼此独立的列表,因此可以在重新调整后重新分配,换句话说,这些对象是可重新分配的非键控state的最小粒度。根据状态的访问方法,定义了一下重分配方案:
Even-split redistribution:每个算子返回一个State元素列表,
Union redistribution:每个算子返回一个State元素列表,

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,590评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 86,808评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,151评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,779评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,773评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,656评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,022评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,678评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,038评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,659评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,756评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,411评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,005评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,973评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,053评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,495评论 2 343

推荐阅读更多精彩内容