Flink 程序是可以在分布式数据集上实现类似常规数据加工操作的框架 (例如:filtering, mapping, updating state, joining, grouping, defining windows, aggregating)。数据集合从 Source 创建(例如: 从文件中读取,从kafka topics中消费, 从本地内存中读取)。数据处理结果通过Sink,写出到分布式文件系统
,标准输出
等。Flink程序可以在多种上下文环境中运行,如standalone,或 嵌入其他程序中。 Flink程序可以在本地 JVM 中运行,也可以在包含多台机器的集群中运行。
通过不同类型的数据源,即:有限数据源,或无限数据源,你可以编写批处理程序或流处理程序,其中 DataSet API 用于批处理,DataStream API 用于流处理。本文将介绍两个 API 的共同的概念,但是有关使用每个 API 编写具体程序的信息,请参阅流处理指南或批处理指南。
注意: 当我们展示如何使用APIs的实际例子时,流处理中使用
StreamingExecutionEnvironment
和DataStream
API,批处理中使用ExecutionEnvironment
和DataSet
API。
DataSet and DataStream
Flink 程序通过特定的 DataSet 和 DataStream 类型表示数据。你可以认为他们是不可改变的、包含重复数据的集合。在DataSet
中数据是有限的。在DataStream
中数据是无限的。
这些集合在某些关键点上不同于普通Java集合。首先,这些集合是不可变的,也就是说一旦他们被创建,你不能添加、删除里面的元素。你也不能简单的检查窥视其中的元素。
Flink 程序中的集合最初是通过在Flink程序中添加的 Source 来创建的,然后通过使用例如 map,filter 等的API 算子 对其进行转换来从中获得新的集合。
Anatomy of a Flink Program
Flink 程序看起来很像普通的数据转换程序。每个 Flink 程序包含以下几个基本部分:
- 获取一个
execution environment
对象。 - 加载/创建 初始化数据。
- 对这个数据进行
transformation 加工
操作。 - 指定计算结果要输出的地方。
- 触发 Flink 程序执行。
现在我们给出每个步骤的概要描述,有关更多详细信息请参阅相应的部分。注意 与 DataSet API
相关的所有核心类都位于org.apache.flink.api.java
中,与DataStream API
相关的所有核心类都位于org.apache.flink.streaming.api
包中。
StreamExecutionEnvironment
是所有 Flink 应用程序的基础。你可以通过调用StreamExecutionEnvironment
类的其中一个静态方法来获取该对象
getExecutionEnvironment()
createLocalEnvironment()
createRemoteEnvironment(String host, int port, String... jarFiles)
通常,你仅仅需要调用getExecutionEnvironment()
方法来获取 StreamExecutionEnvironment,因为 Flink框架会根据上下文环境来获取正确的 StreamExecutionEnvironment。如果你在本地环境的IDE中执行你的应用或当作普通Java程序执行应用,Flink 框架会创建一个local environment本地的执行环境
。如果把你的程序打成 Jar 包,通过命令行
执行程序,Flink Cluster Manager
将会执行你的main
函数,getExecutionEnvironment()
方法将创建一个在集群环境中运行所需要的execution environment
。
通过特定的Source,Flink执行环境execution environment
实现了多种数据读取方式: 你可以从 CSV文件中行一行一行读取数据,或者从定义的 InputFormat 中读取。要将文本文件以行的形式读入,可以使用以下函数:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.readTextFile("file:///path/to/file");
上面的程序会返回一个 DataStream,之后你可以在这个 DataStream 上进行各种转转换transformation
生成新的 DataStream
。例如:
DataStream<String> input = ...;
DataStream<Integer> parsed = input.map(new MapFunction<String, Integer>() {
@Override
public Integer map(String value) {
return Integer.parseInt(value);
}
});
这将原始的 DataStream
的每个 String
转换为一个新的包含 Integer
数据DataStream
。
一旦你创建的 DataStream
中包含了需要的最终结果,你就可以通过创建一个 Sink 将结果输出到外部系统。如以下例子:
// 输出到文件
writeAsText(String path)
// 输出到标准输出
print()
一旦你设定(开发)完Flink程序,就需要调用StreamExecutionEnvironment
对象的execute()
函数来触发程序执行。程序具体是在你本地执行,还是会提交到集群中依赖于程序中创建的ExecutionEnvironment
的类型。
execute()
方法会等待Job执行完才返回JobExecutionResult 对象,其中包含程序执行耗费的时间和累加器的结果。
如果你不想等待Job执行完,你可以调用StreamExecutionEnvironment
的异步执行方法executeAysnc
。executeAysnc
会返回一个 JobClient,你可以通过 JobClient 与你刚提交的 Job 通信交互。下面的例子就通过调用executeAsync()
方法实现execute()
一样的功能
final JobClient jobClient = env.executeAsync();
final JobExecutionResult jobExecutionResult = jobClient.getJobExecutionResult(userClassloader).get();
有关DataStream Source和Sink 的更多,更深入的信息,以及DataStream
上如何进行更多的transformation
操作,请参考 Streaming Guide。
Lazy Evaluation
Flink 所有的程序都是延迟执行的。无论是在 本地环境
运行 Flink 程序,还是在 集群环境
运行 Flink 程序。当 Flink 程序的 main 方法被调用后,数据加载(Load)和转换(Transformation)操作不会立即执行。而是先创建具体操作(operation),并将操作(operation)添加到程序执行计划中。只有在 execution environment
对象上明确调用execute()
函数时,Flink 程序具体的操作 operation
才会执行。
lazy evaluation(延迟执行)
使你可以构建复杂的Flink程序,Flink将其作为一个整体计划的单元执行。
Specifying Keys
一些转换操作transformations (join, coGroup, keyBy, groupBy)
要求集合元素包含一个 Key 值。其他转换操作(Reduce, GroupReduce, Aggregate, Windows)
允许在使用数据之前对数据分组。
把DataSet
按key分组,然后进行窗口操作:
DataSet<...> input = // [...]
DataSet<...> reduced = input
.groupBy(/*define key here*/)
.reduceGroup(/*do something*/);
同样的,也可以对DataStream
按key 分组,然后进行窗口操作:
DataStream<...> input = // [...]
DataStream<...> windowed = input
.keyBy(/*define key here*/)
.window(/*window specification*/);
Flink 的数据模型不是基于 Key/Value 对的。因此,你不需要将数据集类型转换成Key/Value 的形式。Key 是虚拟的,具体哪个值为Key,是通过在实际的数据集上指定相应的函数实现的,通过这种方式来指导分组运行符(grouping operator)
。
注意: 在下面的讨论中,我们将使用 DataStream API 和 keyBy。对于 DataSet API 相应替换成 DataSet 和 groupBy 即可。
Define keys for Tuples
最简单的例子是根据Tuple 的一个或多个fields,对Tuple(元组)进行聚合操作。下面的例子是按 Tuple 的 第一 field 分组。
DataStream<Tuple3<Integer,String,Long>> input = // [...]
KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0)
下面我们根据元素的第1个,第2个 fields对元素分组。
DataStream<Tuple3<Integer,String,Long>> input = // [...]
KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0,1)
关于内嵌Tuple 有一个需要注意的地方,指定keyBy(0)
系统会使用整个 Tuple2<Integer, Float>
作为分组的 Key(包含一个 Integer 和 Float 值)。如果要“深入”到嵌套的Tuple2中,则必须使用接下来介绍的字段表达式(field expression)
指定键值。
DataStream<Tuple3<Tuple2<Integer, Float>,String,Long>> ds;
Define keys using Field Expressions
可以通过基于字符串(string-based)
的字段表达式(field expression)来引用内嵌的字段,通过这种方式为grouping, sorting, joing, or coGrouping
等运算符指定 key 值。
字段表达式(Field expressions)使得从Tuple 和 POJO 等复杂数据类型中选择 key 变得非常容易。
在下面的例子中,我们定义了一个包含2个字段的 POJO: WC(word,count)。使用 word
字段分组,仅需要把word
字段的名子传入keyBy()
函数即可。
// some ordinary POJO (Plain old Java Object)
public class WC {
public String word;
public int count;
}
DataStream<WC> words = // [...]
DataStream<WC> wordCounts = words.keyBy("word").window(/*window specification*/);
Field Expression Syntax:
- 通过 POJO 的字段名指定字段。例如:
user
指的是一个POJO的user
字段。 - 通过 Tuple 从0开始的索引,或Tuple 的字段名指定字段。例如:
f0
或5
指的是Tuple 的第1个和第6个字段。 - 你可以选择 POJO 或 Tuple 中内嵌的字段。例如:
user.zip
指的是一个 POJO 中user
字段的zip
字段。Flink 支持POJO 和 Tuple 类型的任意混合,比如:f1.user.zip
或者user.f3.1.zip
。 - 你也可以通过
*(星号)
来模糊匹配所有的类型。*(星号)
形式的模糊匹配也适用不是 POJO 和 Tuple 类型的。
public static class WC {
public ComplexNestedClass complex; //nested POJO
private int count;
// getter / setter for private field (count)
public int getCount() {
return count;
}
public void setCount(int c) {
this.count = c;
}
}
public static class ComplexNestedClass {
public Integer someNumber;
public float someFloat;
public Tuple3<Long, Long, String> word;
public IntWritable hadoopCitizen;
}
- 'count':是 WC 类的 count 字段。
- 'complex': 递归选择复合数据类型
ComplexNestedClass
POJO 的所有字段。 - 'complex.word.f2': 选择内嵌 Tuple3的最后一个字段。
- 'complex.hadoopCitizen': 选择 Hadoop IntWritable 类型。
Define keys using Key Selector Functions
另一种指定 key 的方式是通过key selector(key选择器)
。key 选择器输入一个元素,输出这个元素的 key。这个 key 可以是任意类型,也可以从计算中获得。
下面的例子简单展示了如何通过一个KeySelector 函数返回一个 对象的字段作为键的例子。
// some ordinary POJO
public class WC {public String word; public int count;}
DataStream<WC> words = // [...]
KeyedStream<WC> keyed = words
.keyBy(new KeySelector<WC, String>() {
public String getKey(WC wc) { return wc.word; }
});
Specifying Transformation Functions
大多数的转换操作(transformations)需要用户自定义函数。下面介绍几种不同的指定自定义函数的方式。
Implementing an interface 实现一个接口
class MyMapFunction implements MapFunction<String, Integer> {
public Integer map(String value) { return Integer.parseInt(value); }
};
data.map(new MyMapFunction());
Anonymous classes匿名类
data.map(new MapFunction<String, Integer> () {
public Integer map(String value) { return Integer.parseInt(value); }
});
Java 8 Lambdas
data.filter(s -> s.startsWith("http://"));
data.reduce((i1,i2) -> i1 + i2);
Rich functions
所有接收用户自定义函数的转换操作(transformations)
都可以用一个rich function
来代替:
class MyMapFunction implements MapFunction<String, Integer> {
public Integer map(String value) { return Integer.parseInt(value); }
};
// 也可以这样写
class MyMapFunction extends RichMapFunction<String, Integer> {
public Integer map(String value) { return Integer.parseInt(value); }
};
data.map(new MyMapFunction());
// 同样的rich function也可以用匿名函数来实现
data.map (new RichMapFunction<String, Integer>() {
public Integer map(String value) { return Integer.parseInt(value); }
});
除了提供用户自定义的功能(map, reduce)之外,RichFunction 还提供了4个函数: open, close, getRuntimeContext, setRuntimeContext
。
这对于参数化函数(passing parameters to function),创建和最终确定本地状态,访问广播变量(broadCast variables)以及访问Flink运行时的相关信息(Accumulators and Counters),迭代信息(iterations)都是很有用的。
Supported Data Types
Flink 对可以在DataSet
和DataStream
中使用的元素类型设置了做了一些限制。原因是有助于Flink分析元素类型,从而高效的执行策略。
Flink 支持以下7种不同的数据类型:
- Java Tuple 和 Scala Case classes.
- Java POJOs
- Primitive Type (原始数据类型)
- Regular Classes (普通类)
- Values
- Hadoop Writables
- Special Types.
Tuples and Case Classes
Tuple是一个包含固定字段、不同数据类型的复杂数据类型。Java API 提供了从 Tupl1到 Tupl25的实现。Tuple 的每个字段可以是任意的 Flink 类型,甚至是Tuple 类型,这样就形成了Tuple 嵌套的形式。 Tuple 中的字段可以直接使用字段名直接访问,如tuple.f4
,或使用getter 方法访问,如tuple.getField(int position)
。字段的索引值从0开始。注意: Java 中的 Tuple 的索引与 Scala 中的 Tuple 的索引不同,但是与 Java 中其他普通索引相一致。
DataStream<Tuple2<String, Integer>> wordCounts = env.fromElements(
new Tuple2<String, Integer>("hello", 1),
new Tuple2<String, Integer>("world", 2));
wordCounts.map(new MapFunction<Tuple2<String, Integer>, Integer>() {
@Override
public Integer map(Tuple2<String, Integer> value) throws Exception {
return value.f1;
}
});
wordCounts.keyBy(0); // also valid .keyBy("f0")
POJOs
如果 Java 类或 Scala 类满足以下要求,Flink 会将他们视为特殊的 POJO数据类型:
- 类必须声名为 public
- 必须有一个无参的构造函数。
- 所有字段要么是 public 的,要么提供了可供外界访问的
getter
和setter
方法。 - Flink 已经注册的序列化程序必须支持字段的类型。
POJOs 通常由一个PojoTypeInfo
来表示,用PojoSerializer
实现序列化。例外情况是当POJO是Avro类型(Avro特定记录)或通过“Avro反射类型”生成。这时用AvroTypeInfo
来表示,用AvroSerializer
来实现序例化。如果你有需要,也可以注册你自己实现的 serializer,请参考 Serialization 来获取更多的信息。
Flink 分析 POJO 的类型结构,即了解 POJO 的字段。使用 POJO 类型作为结果比普通类型更容易使用。此外,Flink 处理 POJO 类型的效率也高于处于普通类型。
public class WordWithCount {
public String word;
public int count;
public WordWithCount() {}
public WordWithCount(String word, int count) {
this.word = word;
this.count = count;
}
}
DataStream<WordWithCount> wordCounts = env.fromElements(
new WordWithCount("hello", 1),
new WordWithCount("world", 2));
wordCounts.keyBy("word"); // key by field expression "word"
Primitive Types
Flink 支持所有 Java 和 Scala 的原始数据类型: Integer, String, 和 Double 等。
General Class Types
Flink支持大多数 Java 和 Scala类(API 和自定义)。但 Flink限制使用包含无法序列化的字段的类,例如文件指针,I/O 流,或者其他 native resources
。遵循 Java Bean 规范的类都可以很好的工作。
所有不符合 POJO 类型的类都被 Flink 当做general class(能用类)
来处理。Flink 以黑盒的方式来处理这些数据类型,并且不能访问他们的内容 (例如: 为了高效排序)。General types (通用类型)
类型需要使用序列化框架 Kryo来实现对象的序列化器/反序例化。
Values
值类型手动描述其序例化和反序例化。它们不使用通用的序例化框架,而是实现org.apache.flinktypes.Value
接口,提供read 和 write
方法实现序列化。当通用的序例化框架效率很低时,使用Value
类型是非常有用的。例如,使用数组存储一个稀疏向量,我们知道数组的大部分元素值都是0,通过使用特定的编码只保存非0元素,而通用序列化框架只会简单把数组所有元素都序列化。
org.apache.flinktypes.CopyableValue
接口以同样的方式支持手动内部克隆逻辑。
Flink内部预定义了与基本数据类型相对应的 Values 类型 (Flink comes with pre-defined Value types that correspond to basic data types.)。这些值类型代表的是基本数据类型的可变变体: 这些 Values 类型是可以被修改的,允许用户程序重复使用,减少垃圾回收的压力。
Hadoop Writables
用户也可以使用实现了Hadoop org.apache.hadoop.Writable
接口的类型。write()
方法定义序列化逻辑,readFields()
方法定义反序列化逻辑。
Type Erasure(类型擦除) & Type Inference
注意: 本段只与Java有关
Java 编译器在编译后会丢弃Java泛型信息。这就是著名的Java类型擦除。也就是说在运行时,对象实例已经不知道他的类型信息了。类如: DataStream<String> 的 实例 和 DataStream<Long> 的 实例,对JVM 来说是一样的。
当Flink准备执行执行FlinkJob时,Flink需要知道这些对象的具体类型。Flink会尝试重建以各种形式丢弃的类型信息,并把这些类型显示的存储在数据集和算子Operator
中。你可以通过DataStream.getType()
接口获取类型。该方法返回一个TypeInformation
类型的实例,这是Flink内部表述类型的方式。
类型接口也有其局限性,在某些情况下需要程序员的"配合"。例如,通过"集合"创建数据集的方法 ExecutionEnvironment.fromCollection()
,你可以传递一个类型描述参数。但是像MapFunction<I, O>
能用函数就需要额外的类型信息。
输入数据实现ResultTypeQueryable接口,通过函数的形式明确告诉API返回的具体类型。调用函数的输入类型通常能通过前一个函数的输出结果推测出来。
Accumulators & Counters
累加器结构比较简单,由累加操作和最终结果构成,当Flink-Job执行完成可获得累加器的执行结果。
最简单的累加器是计数器: 程序员可以通过Accumulator.add(V value)
方法对累加器累加。在Flink Job即将执行完时,Flink会合并所有分片的累加结果,把最终的累加结果发送给Client。计算器在debugging 或者 你想快速了解某些数据信息的时候非常有用。
目前Flink内置了一些计数器。每个计算器都实现了 Accumulator 接口。
- IntCounter, LongCounter and DoubleCounter: 参看下面的示例:
- A histogram implementation for a discrete number of bins. 它内部是一个'整型'到'整型'的映射。你可以用它来统计数值的分布情况,例如: 在WordCount程序中统计每行包含的单词数。
How to use accumulators:
第1步: 你必须在想使用 累加器的函数中定义一个累加器。
private IntCounter numLines = new IntCounter();
第2步: 你必须注册创建的累加器,通常会在RichFunction的open()
函数中注册创建的累加器,也可以为累加器增加一个名称。
getRuntimeContext().addAccumulator("num-lines", this.numLines);
这时,你可以在算子函数中使用注册的累加器,包括 open()
,close()
函数。
this.numLines.add(1);
累加的最终结果会保存在JobExecutionResult
对象中,JobExecutionResult
对象是从execution environment
的execute()
方法中返回的。
每个作业的所有累加器共享一个命名空间。这样就可以在你的作业的不同算子中使用相同的累加器。
关于累加器
和 迭代器
的注意事项: 目前累加器的结果只有在整个作业完成后才能获得。我们计划实现下次迭代中使用之前的迭代结果。你可以使用Aggregators计算每次迭代统计,并基于统计信息决定什么时候终止迭代。
Custom accumulators:
要实现自己的累加器,只需要简单的实现 Accumulator 接口。可以选择实现Accumulator or SimpleAccumulator。