Flink 之 State

1. Overview

State 是 Flink 中一个非常基本且重要的概念,本文将介绍什么是 State,如何使用 State,State 的存储和原理。以及 State 衍生的一些概念和应用。

2. Basic

2.1 什么是 State

一种为了满足算子计算时需要历史数据需求的,使用 checkpoint 机制进行容错,存储在 state backend 的数据结构。

首先 state 其实就是一种数据结构。然后上面的定义中隐含了三个基本知识点:

  1. 什么时候需要历史数据
  2. 为什么要容错,以及 checkpoint 如何进行容错的
  3. state backend 又是什么。

后两点会在后面小节具体展开,这里先列举一些常见的需要历史数据的场景:

  • 去重:在流处理系统中,上游的系统数据可能会有重复,落到下游时希望把重复的数据去掉,此时就需要记录历史的数据。
  • 窗口计算:在触发窗口计算函数前,需要将窗口中收集的数据保存起来,等到触发时进行计算。
  • 机器学习/深度学习:如训练的模型以及当前模型的参数也是一种状态,机器学习可能每次都用有一个数据集,需要在数据集上进行学习,对模型进行一个反馈。

2.2 有哪些常见的 State

最常见的是 Keyed State,应用于 keyedStream 上,必须在 KeyBy 操作之后使用。它的特点是同一个 sub task 上的同一个 key 共享一个 state。另外还有 operator state,顾名思义每一个 operator state 都只与一个 operator 的实例绑定。常见的 operator state 是 source state,例如记录当前 source 的 offset。它的特点是同一个 subtask 共享一个 state。另外还有一种特殊的 operator state 称为 broadcast state,它的特点是同一个算子的多个 sub task 共享一个 state

2.3 State 使用

这里以常用的 Keyed State 进行举例。前面说了 State 本质上就是一种用来存储数据的数据结构,那么作为 Keyed State,都支持哪些数据结构呢?以下列举了常见的几种数据结构:

  • ValueState 存储单个值,比如 Wordcount,用 Word 当 Key,State 就是它的 Count。这里面的单个值可能是数值或者字符串,作为单个值,访问接口可能有两种,get 和 set。在 State 上体现的是 update(T) / T value()。

  • MapState 的状态数据类型是 Map,在 State 上有 put、remove等。需要注意的是在 MapState 中的 key 和 Keyed state 中的 key 不是同一个。

  • ListState 状态数据类型是 List,访问接口如 add、update 等。

这是官网给出的 ValueState 的使用案例:

class CountWindowAverage extends RichFlatMapFunction[(Long, Long), (Long, Long)] {

  private var sum: ValueState[(Long, Long)] = _
  
  /** 
  也可以使用 lazy 的方式对 state 进行初始化
  lazy private val sum = getRuntimeContext.getState(
      new ValueStateDescriptor[(Long, Long)]("average", createTypeInformation[(Long, Long)])
    ) 
  **/

  override def flatMap(input: (Long, Long), out: Collector[(Long, Long)]): Unit = {

    // access the state value
    val tmpCurrentSum = sum.value

    // If it hasn't been used before, it will be null
    val currentSum = if (tmpCurrentSum != null) {
      tmpCurrentSum
    } else {
      (0L, 0L)
    }

    // update the count
    val newSum = (currentSum._1 + 1, currentSum._2 + input._2)

    // update the state
    sum.update(newSum)

    // if the count reaches 2, emit the average and clear the state
    if (newSum._1 >= 2) {
      out.collect((input._1, newSum._2 / newSum._1))
      sum.clear()
    }
  }

  override def open(parameters: Configuration): Unit = {
    sum = getRuntimeContext.getState(
      new ValueStateDescriptor[(Long, Long)]("average", createTypeInformation[(Long, Long)])
    )
  }
}


object ExampleCountWindowAverage extends App {
  val env = StreamExecutionEnvironment.getExecutionEnvironment

  env.fromCollection(List(
    (1L, 3L),
    (1L, 5L),
    (1L, 7L),
    (1L, 4L),
    (1L, 2L)
  )).keyBy(_._1)
    .flatMap(new CountWindowAverage())
    .print()
  // the printed output will be (1,4) and (1,5)

  env.execute("ExampleKeyedState")
}

以上代码的功能是对输入的流数据进行平均数计算,当输入的数据大于等于 2 个时,触发计算。这里有几点需要注意:

  • 因为 state 的初始化需要用到运行时上下文,所以定义的类需要继承 RichXXFunction
  • state 有两种初始化方式,一种是在成员变量初定义并在 open 函数中初始化。另一种是直接在成员变量处通过 lazy 的方式进行定义和初始化。
  • 这里的例子中使用的是 ValueState,他的 get 和 put 方法分别是 .value().update()
  • state 除了需要我们自己维护状态更新,状态的删除也需要在合适的时间点通过调用 clear
    方法实现。

使用 state 除了继承 RichXXFunction 外还可以直接使用系统提供的函数。如 keyBy 之后直接使用 flatMapWithState ,这里不进行具体举例。

2.4 State backend

前面介绍了 State 的类型和常见的数据结构,那么这些 state 存储的介质有哪些呢? Flink 提供了三种存储 State 的介质:

2.4.1 MemoryStateBackend:

  • 构造方法: MemoryStateBackend( int maxStateSize, boolean asynchronousSnapshots )
  • 存储方式:
    • State: TaskManager 内存
    • Checkpoint: Jobmanager 内存
  • 使用场景:本地测试用,不推荐生产场景使用

2.4.2 FsStatebackend:

  • 构造方法: FaStateBackend( URI checkpointDataUri, boolean asynchronousSnapshots )

  • 存储方式:

    • State:Taskmanager 内存
    • Checkpoint: 外部文件系统( 本地或 HDFS )
  • 使用场景:常规使用 State 的作业,可以在生产中使用

2.4.3 RocksDBStateBackend

  • 构造方法:RocksDBStateBackend( URI checkpointDataUri, boolean enableIncrementalCheckpointing )
  • 存储方式:
    • State: TaskManager 上的 KV 数据库(实际使用内存 + 磁盘)

    • Checkpoint: 外部文件系统(本地或 HDFS )

  • 使用场景:超大状态作业,对性能要求不高的生产场景

2.5 Checkpoint

前面对 State 的使用中没有考虑容错的问题,当集群出现故障时进行恢复时,State 的值肯定不会从头开始计算,这就需要进行容错。State 使用 Checkpoint 机制进行容错。简单来说就是定时制作分布式快照,当出现故障需要进行恢复时,将所有 Task 恢复到最近一次成功的 Checkpoint 状态中,然后从那个点开始继续处理。Checkpoint 通过 Barries 对齐机制保证了恰好一次的一致性语义,关于 Barries 的原理后面将进行详细说明。

3. Deep

3.1 Checkpoint Barries

checkpoint 是 jobmanager 从 source 触发到下游所有节点完成的一次全局操作。checkpoint barriers 和 watermark 类似,都是一种特殊的事件。对某一 subtask 而言,checkpoint 表示所有 subtask 恰好处理完(不能多处理,也不能少处理。为了状态恢复时保持一致性)某个相同数据。watermark 表示这之前的数据已经接收完毕。
watermark在多 subtask 上游向下游传递时,是广播 + 取上游最小 watermark 作为当前 task 的watermark,不取最小 watermark 会丢数据。
checkpoint barriers 在多 subtask 上游向下游传递时,是广播 + checkpoint barriers 对齐(alignment)。所谓对齐就是下游 subtask 会等待他上游所有分区的 subtask 的 checkpoint barriers 都到达才进行 checkpoint。上游已经到达 checkpoint barriers 的 substask 后续数据会缓存,没有到达 checkpoint barriers 的 subtask 数据会继续处理直到 checkpoint barriers 到达。
如图是一条流的两个并行任务,逻辑是分别对奇偶数进行求和计算。

initial .png

首先由 JobManager 触发 checkpoint,向 Source 发送带有 checkpoint Id 的指令。
JobManager 触发 checkpoint

随后 Source 触发 checkpoint 将 State 保存在 State Backend 后发送 ack 给 JobManager。并且向下游发送 checkpoint barriers。此时 Source 后面算子的计算不会受到任何打断。
Source checkpoint

以 even 流的 Sum 算子为例,从图中可以看到先接收到 Source1 的 barrier 后接收到 Source2 的 barrier( 分别对应蓝色和黄色的三角 )。所以在接收到 Source1 的 barrier 后,对后面值为 4 的蓝流数据进行了缓存没有进行下一步计算,因为这个数据属于下一个 checkpoint。而在接收到 Source2 的 barrier 之前,对值为 4 的黄流数据照常进行计算直到接收到 Source2 的 barrier 为止。这就是所谓的 barriers alignment。
barriers alignment

当下游收到上游所有的 barriers 时,进行 checkpoint,并将 barrier 向下游转发。
sum checkpoint

sink 任务向 JobManager 确认状态保存到完毕后,整个 checkpoint 过程结束
sink ack

3.3 Checkpoint VS Savepoint

Savepoints are manually triggered checkpoints, which take a snapshot of the program and write it out to a state backend. They rely on the regular checkpointing mechanism for this.
Savepoints are similar to checkpoints except that they are triggered by the user and don’t automatically expire when newer checkpoints are completed.

Savepoint 底层依赖于 checkpoint,但是是由用户手动触发的。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,456评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,370评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,337评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,583评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,596评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,572评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,936评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,595评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,850评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,601评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,685评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,371评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,951评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,934评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,167评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,636评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,411评论 2 342