Apache Flink是一个可以在有限流数据流和无限流基础上进行有状态计算的大数据处理框架。Flink从下到上提供了不同层级的API抽象,并为常见的用例提供了专用的开发库。
构建流处理应用程序 (Building Blocks for Streaming Applications)
流处理框架如何控制 Stream, State,Time等因素,决定了框架能构建和执行什么类型的应用(The types of applications that can be built with and executed by a stream processing framework are defined by how well the framework controls streams, state, and time)。下面我们将逐一描述流处理应用的基础组件,并阐述Flink处理他们的方法。
Streams (流)
很明显,(数据) 流是流处理应用一个基本概念。然而一个流可以有不同的特性,这些特性将影响流能够及应该如何处理数据。Flink是一个具有具有多种处理功能的强大数据处理框架,它可以处理多种类型的流。
- 有限流 和 无限流: 流可以是无限的 或 有限的。即,有固定数据大小或固定数据条数的数据集。Flink 具有处理无限流的强大特性,同时也有专用的算子 (Operators) 高效处理有限流(数据集)。
- 实时流 和 有顺序流: 所有的数据都是以流的形式产生的。通常情况下处理数据有2种方式。当流数据产生时就立即处理,或者 将产生的流数据存储起来,即先储存到文件系统或存储对象中,之后再处理。Flink可以处理存储数据流或实时数据流。
Status (状态)
每一个非普通(non-trivial)流处理程序都是有状态的,也就是说只有那些只针对单个事件进行单独处理的应用流式应用才不需要关心应用状态。换句话说,任何一个具体基本业务处理逻辑的应用程序都需要记录 事件状态 或中间结果,以便在今后业务需要时访问这些状态。
在Flink应用中应用状态是一等公民。可以从Flink在上下文环境中提供的对状态处理的所有特性就能证明这一点。
- 多种原始基础状态: Flink为不同基础数据结构提供了不同的状态类型: 如 原子值,List,Map等数据结构。开发人员可以根据不同函数对不同状态的访问处理形式来选择最高效,最适合的状态类型。
- 插件式状态后端:Flink应用的状态被插件式的状态后端管理并定期执行Checkpoint操作。Flink具有不同类型的状态后端,这些状态后端把应用的状态保存在内存(Memory) 或 RockDB(一种高效的嵌入式磁盘存储)中。Flink也支持插件式的自定义的状态后端。
- 精确一次的状态一致性:Flink的Checkpoint机制和故障恢复算法保证了应用失败后状态恢复的数据一致性。因此,Flink故障处理是透明的,不会影响应用程序的正确性。
- 非常大的状态: 由于Flink的异步和增量的Checkpoint算法,使Flink能够维持TB级的应用状态。
- 应用扩展: Flink通过将状态分配给不同规模Worker来支持有状态应用的水平扩展。
Time
时间是流处理应用的另一个重要组成部分。多数的事件流都有内部的时间语义,因为流的每个事件都是在特定的时间点生成的。此外,许多通用的流计算都是基于时间的,例如窗口聚合,sessionization(会话计算), 模式匹配,以及基于时间的Join。流式计算的另一个重要方面是应用如何测量时间,即区分 事件时间 和 处理时间。
Flink提供了一组丰富的与时间有关的特性。
- 事件时间模式:使用事件时间定义的流处理应用基于事件内的timestamp(时间戳) 计算程序结果。因此不管是处理流式数据还是已经保存记录下来的数据,基于事件时间的数据处理保证了结果的正确性和一致性。
- 水印支持: 在"事件时间"应用中Flink引入水印(Watermark)利用水印来衡量事件进展。水印是一种权衡结果延迟和完事性的一种灵活机制。
- 迟到数据处理: 在以"事件时间"为基准流处理中使用水印时,可能发生所有相关事件到达之前计算已经完成的情况。这种事件叫迟到数据。Flink提供了多种选项来处理迟到数据,例如:通过 侧输出 重新路由这些数据并更新之前的处理结果。
- 处理时间模式:除了事件模式外,Flink也支持处理时间语义,它是根据具体处理机器的本地时间来触发计算的。处理时间模式适用于一些对数据延迟有严格要求但允许损失一定数据精度的应用。
APIs 分层 (Layered APIs)
Flink提供了3层APIs. 每层API 在表达性,简洁性及不同使用场景之间提供了不同的权衡。
我们简要介绍一下每层API,讨论他们的应用,并展示一段样例代码。
The ProcessFunctions
ProcessFunctions 是Flink提供的最具有表达力的函数接口。Flink的ProcessFunctions函数接口用来处理1或2个流中独立的事件,或处理聚合窗口中的事件。ProcessFuntions函数提供了对时间和状态的细粒度控制。一个ProcessFunctions可以随意修改它自身的状态,注册时间定时器,时间定时器在将来的某个时刻触发函数回调。因此,许多基于事件状态驱动的应用,ProcessFunctions根据要求实现 基于单个事件的、复杂的业务处理。
下面展示了KeyedStream流中使用KeyedProcessFunction函数匹配符合START和END事件的例子。当START事件到达函数时,函数将事件的timestamp(时间戳)记录在state(状态)中,并启动一个4小时后触发的定时器。如果一个END事件在定时器触发之前到达,函数计算START事件和END事件之间的时间差值,清除状态,并返回时间差值。否则触发定时器并清除状态值。
/**
* Matches keyed START and END events and computes the difference between
* both elements' timestamps. The first String field is the key attribute,
* the second String attribute marks START and END events.
*/
public static class StartEndDuration
extends KeyedProcessFunction<String, Tuple2<String, String>, Tuple2<String, Long>> {
private ValueState<Long> startTime;
@Override
public void open(Configuration conf) {
// obtain state handle
startTime = getRuntimeContext()
.getState(new ValueStateDescriptor<Long>("startTime", Long.class));
}
/** Called for each processed event. */
@Override
public void processElement(
Tuple2<String, String> in,
Context ctx,
Collector<Tuple2<String, Long>> out) throws Exception {
switch (in.f1) {
case "START":
// set the start time if we receive a start event.
startTime.update(ctx.timestamp());
// register a timer in four hours from the start event.
ctx.timerService()
.registerEventTimeTimer(ctx.timestamp() + 4 * 60 * 60 * 1000);
break;
case "END":
// emit the duration between start and end event
Long sTime = startTime.value();
if (sTime != null) {
out.collect(Tuple2.of(in.f0, ctx.timestamp() - sTime));
// clear the state
startTime.clear();
}
default:
// do nothing
}
}
/** Called when a timer fires. */
@Override
public void onTimer(
long timestamp,
OnTimerContext ctx,
Collector<Tuple2<String, Long>> out) {
// Timeout interval exceeded. Cleaning up the state.
startTime.clear();
}
}
示例说明了KeyedProcessFunction的表达力,但高亮部分也说明他是一个冗长的接口。
The DataStream API
DataStream API 为许多长见的流处理操作提供语义支持,例如 窗口操作,事件转换,通过查询外部存储来丰富事件等。DataStream API支持Java和Scala,同时DataStream是基于函数的,例如 map(), reduce(),aggregate()。可以实现接口来定义函数,也可以是Java或Scala支持的lambda函数。
下面例子展示了如何定义一个clickstream,并统计每个会话的点击次数。
// a stream of website clicks
DataStream<Click> clicks = ...
DataStream<Tuple2<String, Long>> result = clicks
// project clicks to userId and add a 1 for counting
.map(
// define function by implementing the MapFunction interface.
new MapFunction<Click, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(Click click) {
return Tuple2.of(click.userId, 1L);
}
})
// key by userId (field 0)
.keyBy(0)
// define session window with 30 minute gap
.window(EventTimeSessionWindows.withGap(Time.minutes(30L)))
// count clicks per session. Define function as lambda function.
.reduce((a, b) -> Tuple2.of(a.f0, a.f1 + b.f1));
SQL & Table API
Flink提供了两种关系型API,Table API 和 SQL。两个API为批处理和流处理提供了统一的API,也就是说,查询在无限的数据流,或有限的记录流上以相同的语义执行,并产生相同的结果。Table API 和 SQL 利用 Apache Calcite来解析,验证,执行查询优化。Table API 和 SQL 可以与 DataStream 和 DataSet API无缝集成,并支持用户自下定义 标量函数,聚合函数,和 Table函数。
Flink关系型API的目的是要简化数据分析,数据管道,ETL应用的定义。
下面的示例展示了使用SQL查询一个基于Session窗口的点击流,并统计每个Session的点击次数。本例和DataStream API示例中的例子相同。
SELECT userId, COUNT(*)
FROM clicks
GROUP BY SESSION(clicktime, INTERVAL '30' MINUTE), userId
Libraries
Flink为常用的数据处理用例提供了若干类库。 这些类库通常嵌入在一个API中,而不是完全独立的。这使得类库API可以从Flink提供其他特征API中获益,并与其他类库集成。
复杂事件处理(Complex Event Processing (CEP)): 模式检测是事件流处理中一个常见用例。Flink的CEP库提供了一个API来指定事件模式(考虑正则表达式或状态机制)。Flink CEP 库与 Flink DataStream API集成,这样可以在DataStream上执行模式计算。CEP 库的应用包括网络入侵检测,业务处理监控 和 欺诈检测。
DataSet API: 略。
Gelly: 略。