Flink DataStream API Programming Guide
在Flink中的数据流程序是实现数据流转换的常规程序(例如,过滤,更新状态,定义窗口,聚合)。数据流最初是从各种来源创建的(例如,消息队列、套接字流、文件)。结果通过sink返回,例如,sink可以将数据写入文件或标准输出(例如命令行终端)。Flink程序可以在各种上下文中运行,可以独立运行,也可以嵌入到其他程序中。可以在本地JVM中执行,也可以在许多机器的集群上执行。
Example Program
下面的程序是一个完整的流窗口字数计数应用程序的工作示例,它在5秒的窗口中计算来自web套接字的字数。你可以复制和粘贴代码在本地运行它。
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
object WindowWordCount {
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val text = env.socketTextStream("localhost", 9999)
val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
.map { (_, 1) }
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1)
counts.print()
env.execute("Window Stream WordCount")
}
}
要运行示例程序,首先从终端使用netcat启动输入流
nc -lk 9999
只要键入一些单词,按回车键就可以得到一个新单词。这些将是单词计数程序的输入。如果你想看到大于1的计数,在5秒内一次又一次地键入相同的单词(如果你不能键入那么快,将窗口大小从5秒增加)。
Data Sources
源是程序读取输入的地方。可以使用StreamExecutionEnvironment.addSource(sourceFunction)将一个源附加到程序中。Flink提供了许多预先实现的源函数,但是您总是可以编写自己的自定义源,方法是为非并行源实现SourceFunction,或者为并行源实现ParallelSourceFunction接口,或者扩展RichParallelSourceFunction。
可以从StreamExecutionEnvironment访问几个预定义的流源
File-based:
- readTextFile(path) - 逐行读取文本文件,即遵守TextInputFormat规范的文件,并将其作为字符串返回
- readFile(fileInputFormat, path) - 按照指定的文件输入格式读取(一次)文件
- readFile(fileInputFormat, path, watchType, interval, pathFilter) - 这是前两个方法在内部调用的方法。它根据给定的fileInputFormat读取路径中的文件。根据所提供的watchType,此源可能会定期监视(每个间隔ms)新数据的路径(FileProcessingMode.PROCESS_CONTINUOUSLY),或处理一次当前路径中的数据并退出(FileProcessingMode.PROCESS_ONCE)。使用pathFilter,用户可以进一步排除正在处理的文件。
实现:
在底层,Flink将文件读取过程分解为两个子任务,即目录监控和数据读取。这些子任务中的每一个都由单独的实体实现。监视由单个非并行(并行性= 1)任务实现,而读取由多个并行运行的任务执行。后者的并行度等于作业并行度。单个监视任务的作用是扫描目录(根据watchType定期或仅一次),查找要处理的文件,将它们分成分片,并将这些分片分配给下游readers。readers是读取实际数据的。每个split只能由一个reader读取,而一个reader可以逐个读取多个split。
注意事项:- 如果watchType设置为FileProcessingMode.PROCESS_CONTINUOUSLY,当一个文件被修改时,它的内容将被完全重新处理。这可能会破坏“exactly-once” 语义,因为将数据附加到文件末尾将导致对其所有内容进行重新处理。
- 如果watchType设置为FileProcessingMode.PROCESS_ONCE,source扫描一次路径并退出,而不等待读取器完成文件内容的读取。当然,读者将继续阅读直到所有文件内容都被读取。关闭source将导致在此之后不再出现检查点。这可能导致在节点故障后恢复速度变慢,因为作业将从最后一个检查点恢复读取。
Socket-based:
- socketTextStream - 从socket读取。元素可以用分隔符分隔。
Collection-based:
- fromCollection(Seq) - 从Java.util.collection创建数据流。集合中的所有元素必须是相同类型的。
- fromCollection(Iterator) - 从iterator创建数据流。该类指定iterator返回的元素的数据类型。
- fromElements(elements: _*) - 根据给定的对象序列创建数据流。所有对象必须具有相同的类型。
- fromParallelCollection(SplittableIterator) - 从迭代器并行地创建数据流。该类指定迭代器返回的元素的数据类型。
- generateSequence(from, to) - 在给定的区间内并行地生成数字序列。
Custom:
- addSource - 附加一个新的source函数。例如,要从Apache Kafka读取,可以使用addSource(new FlinkKafkaConsumer08<>(…))。有关更多细节,请参阅connectors。
DataStream Transformations
有关可用流转换的概述,请参阅 operators
Data Sinks
Data sinks消耗数据流并将它们转发到文件、套接字、外部系统或打印它们。Flink提供了各种内置的输出格式,这些格式封装在对数据表的operations之后:
- writeAsText() / TextOutputFormat - Elements以字符串的形式逐行写。字符串是通过调用每个元素的toString()方法获得的。
- writeAsCsv(...) / CsvOutputFormat - 以逗号分隔值文件的形式写入元组。行和字段分隔符是可配置的。每个字段的值来自对象的toString()方法。
- print() / printToErr() - 在标准输出/标准错误流上打印每个元素的toString()值。 可选地,可以提供前缀(msg),该前缀在输出之前。 这可以帮助区分不同的打印请求。 如果并行度大于1,则输出之前还将带有产生输出的任务的标识符。
- writeUsingOutputFormat() / FileOutputFormat - 方法和基类,是用来自定义文件输出的。支持自定义对象到字节的转换。
- writeToSocket - 根据SerializationSchema将元素写入套接字
- addSink - 调用自定义接收函数。Flink与其他系统(如Apache Kafka)的连接器捆绑在一起,这些连接器实现为sink函数。
注意,DataStream上的write*()方法主要用于调试目的。它们不参与Flink的检查点,这意味着这些函数通常具有至少一次的语义。数据刷新到目标系统取决于OutputFormat的实现。这意味着并不是发送给OutputFormat的所有元素都立即显示在目标系统中。此外,在失败的情况下,这些记录可能会丢失。
为了可靠地、精确一次的将流交付到文件系统,请使用flink-connector-filesystem。另外,通过. addsink(…)方法的自定义实现可以参与到Flink的检查点中,实现精确的一次语义。
Iterations
迭代流程序实现了一个阶梯函数,并将其嵌入到IterativeStream中。由于DataStream程序可能永远不会完成,因此没有最大迭代次数。相反,您需要指定使用split转换或filter将流的哪一部分反馈回迭代,以及哪一部分转发到下游。这里,我们展示了一个示例迭代,其中主体(重复的计算部分)是一个简单的map转换,反馈的元素由使用filter转发到下游的元素来区分。
val iteratedStream = someDataStream.iterate(
iteration => {
val iterationBody = iteration.map(/* this is executed many times */)
(iterationBody.filter(/* one part of the stream */), iterationBody.filter(/* some other part of the stream */))
})
例如,这里有一个程序,它从一系列整数中连续地减去1,直到它们达到0
val someIntegers: DataStream[Long] = env.generateSequence(0, 1000)
val iteratedStream = someIntegers.iterate(
iteration => {
val minusOne = iteration.map( v => v - 1)
val stillGreaterThanZero = minusOne.filter (_ > 0)
val lessThanZero = minusOne.filter(_ <= 0)
(stillGreaterThanZero, lessThanZero)
}
)
Execution Parameters
StreamExecutionEnvironment包含ExecutionConfig,它允许为运行时设置特定于作业的配置值。
有关大多数参数的解释,请参阅 execution configuration
。这些参数特别适用于DataStream API
- setAutoWatermarkInterval(long milliseconds):设置自动水印发射的间隔。你可以得到当前的值long getAutoWatermarkInterval()
Fault Tolerance
State & Checkpointing
描述如何启用和配置Flink的检查点机制。
Controlling Latency
默认情况下,元素不会在网络上逐个传输(这会导致不必要的网络流量),而是被缓冲。缓冲区的大小(实际上是在机器之间传输的)可以在Flink配置文件中设置。虽然这种方法很适合优化吞吐量,但当传入流不够快时,它可能会导致延迟问题。要控制吞吐量和延迟,可以在执行环境(或单个操作符)上使用env.setBufferTimeout(timeoutMillis)设置缓冲区满溢的最大等待时间。超过时间之后,即使缓冲区还没有满,也会自动发送缓冲区。此超时的默认值是100毫秒。
使用:
val env: LocalStreamEnvironment = StreamExecutionEnvironment.createLocalEnvironment
env.setBufferTimeout(timeoutMillis)
env.generateSequence(1,10).map(myMap).setBufferTimeout(timeoutMillis)
为了最大化吞吐量,设置setBufferTimeout(-1),它将删除超时,并且缓冲区只有在满了时才会被刷新。要最小化延迟,请将超时设置为接近0的值(例如5或10毫秒)。应该避免缓冲区超时为0,因为它会导致严重的性能下降。
Debugging
在分布式集群中运行流媒体程序之前,最好确保实现的算法按预期工作。因此,实现数据分析程序通常是检查结果、调试和改进的递增过程。通过支持IDE中的本地调试、测试数据的注入和结果数据的收集,Flink提供的特性极大地简化了数据分析程序的开发过程。本节给出一些提示,说明如何简化Flink程序的开发。
Local Execution Environment
LocalStreamEnvironment在创建Flink系统的同一个JVM进程中启动Flink系统。如果从IDE启动LocalEnvironment,则可以在代码中设置断点,并轻松地调试程序。
创建并按如下方式使用LocalEnvironment:
val lines = env.addSource(/* some source */)
// build your program
env.execute()
Collection Data Sources
Flink提供了由Java集合支持的特殊数据源,以简化测试。 一旦测试了程序,就可以轻松地将源和接收器替换为可读取/写入外部系统的源和接收器。
集合数据源的使用方法如下
// Create a DataStream from a list of elements
val myInts = env.fromElements(1, 2, 3, 4, 5)
// Create a DataStream from any Collection
val data: Seq[(String, Int)] = ...
val myTuples = env.fromCollection(data)
// Create a DataStream from an Iterator
val longIt: Iterator[Long] = ...
val myLongs = env.fromCollection(longIt)
注意:当前,集合数据源要求数据类型和迭代器实现Serializable。而且,收集数据源不能并行执行(并行度= 1)。
Iterator Data Sink
Flink还提供了一个接收器,用于收集用于测试和调试的DataStream结果。它可以这样使用
import org.apache.flink.streaming.experimental.DataStreamUtils
import scala.collection.JavaConverters.asScalaIteratorConverter
val myResult: DataStream[(String, Int)] = ...
val myOutput: Iterator[(String, Int)] = DataStreamUtils.collect(myResult.javaStream).asScala
注意:从Flink 1.5.0中删除了flink-streaming-contrib模块。它的类已经转移到flink-streaming-java和flink-streaming-scala中。
Where to go next?
- Operators: 可用流操作符的规范。
- Event Time: 介绍Flink的时间概念
- State & Fault Tolerance: 说明如何开发有状态应用程序
- Connectors: 可用的输入和输出连接器的说明。