Streaming(DataStream API) -- Overview

Flink DataStream API Programming Guide

在Flink中的数据流程序是实现数据流转换的常规程序(例如,过滤,更新状态,定义窗口,聚合)。数据流最初是从各种来源创建的(例如,消息队列、套接字流、文件)。结果通过sink返回,例如,sink可以将数据写入文件或标准输出(例如命令行终端)。Flink程序可以在各种上下文中运行,可以独立运行,也可以嵌入到其他程序中。可以在本地JVM中执行,也可以在许多机器的集群上执行。

Example Program

下面的程序是一个完整的流窗口字数计数应用程序的工作示例,它在5秒的窗口中计算来自web套接字的字数。你可以复制和粘贴代码在本地运行它。

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

object WindowWordCount {
  def main(args: Array[String]) {

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val text = env.socketTextStream("localhost", 9999)

    val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
      .map { (_, 1) }
      .keyBy(0)
      .timeWindow(Time.seconds(5))
      .sum(1)

    counts.print()

    env.execute("Window Stream WordCount")
  }
}

要运行示例程序,首先从终端使用netcat启动输入流

nc -lk 9999

只要键入一些单词,按回车键就可以得到一个新单词。这些将是单词计数程序的输入。如果你想看到大于1的计数,在5秒内一次又一次地键入相同的单词(如果你不能键入那么快,将窗口大小从5秒增加)。

Data Sources

源是程序读取输入的地方。可以使用StreamExecutionEnvironment.addSource(sourceFunction)将一个源附加到程序中。Flink提供了许多预先实现的源函数,但是您总是可以编写自己的自定义源,方法是为非并行源实现SourceFunction,或者为并行源实现ParallelSourceFunction接口,或者扩展RichParallelSourceFunction。
可以从StreamExecutionEnvironment访问几个预定义的流源
File-based:

  • readTextFile(path) - 逐行读取文本文件,即遵守TextInputFormat规范的文件,并将其作为字符串返回
  • readFile(fileInputFormat, path) - 按照指定的文件输入格式读取(一次)文件
  • readFile(fileInputFormat, path, watchType, interval, pathFilter) - 这是前两个方法在内部调用的方法。它根据给定的fileInputFormat读取路径中的文件。根据所提供的watchType,此源可能会定期监视(每个间隔ms)新数据的路径(FileProcessingMode.PROCESS_CONTINUOUSLY),或处理一次当前路径中的数据并退出(FileProcessingMode.PROCESS_ONCE)。使用pathFilter,用户可以进一步排除正在处理的文件。
    实现:
    在底层,Flink将文件读取过程分解为两个子任务,即目录监控和数据读取。这些子任务中的每一个都由单独的实体实现。监视由单个非并行(并行性= 1)任务实现,而读取由多个并行运行的任务执行。后者的并行度等于作业并行度。单个监视任务的作用是扫描目录(根据watchType定期或仅一次),查找要处理的文件,将它们分成分片,并将这些分片分配给下游readers。readers是读取实际数据的。每个split只能由一个reader读取,而一个reader可以逐个读取多个split。
    注意事项:
    1. 如果watchType设置为FileProcessingMode.PROCESS_CONTINUOUSLY,当一个文件被修改时,它的内容将被完全重新处理。这可能会破坏“exactly-once” 语义,因为将数据附加到文件末尾将导致对其所有内容进行重新处理。
    2. 如果watchType设置为FileProcessingMode.PROCESS_ONCE,source扫描一次路径并退出,而不等待读取器完成文件内容的读取。当然,读者将继续阅读直到所有文件内容都被读取。关闭source将导致在此之后不再出现检查点。这可能导致在节点故障后恢复速度变慢,因为作业将从最后一个检查点恢复读取。

Socket-based:

  • socketTextStream - 从socket读取。元素可以用分隔符分隔。

Collection-based:

  • fromCollection(Seq) - 从Java.util.collection创建数据流。集合中的所有元素必须是相同类型的。
  • fromCollection(Iterator) - 从iterator创建数据流。该类指定iterator返回的元素的数据类型。
  • fromElements(elements: _*) - 根据给定的对象序列创建数据流。所有对象必须具有相同的类型。
  • fromParallelCollection(SplittableIterator) - 从迭代器并行地创建数据流。该类指定迭代器返回的元素的数据类型。
  • generateSequence(from, to) - 在给定的区间内并行地生成数字序列。

Custom:

  • addSource - 附加一个新的source函数。例如,要从Apache Kafka读取,可以使用addSource(new FlinkKafkaConsumer08<>(…))。有关更多细节,请参阅connectors

DataStream Transformations

有关可用流转换的概述,请参阅 operators

Data Sinks

Data sinks消耗数据流并将它们转发到文件、套接字、外部系统或打印它们。Flink提供了各种内置的输出格式,这些格式封装在对数据表的operations之后:

  • writeAsText() / TextOutputFormat - Elements以字符串的形式逐行写。字符串是通过调用每个元素的toString()方法获得的。
  • writeAsCsv(...) / CsvOutputFormat - 以逗号分隔值文件的形式写入元组。行和字段分隔符是可配置的。每个字段的值来自对象的toString()方法。
  • print() / printToErr() - 在标准输出/标准错误流上打印每个元素的toString()值。 可选地,可以提供前缀(msg),该前缀在输出之前。 这可以帮助区分不同的打印请求。 如果并行度大于1,则输出之前还将带有产生输出的任务的标识符。
  • writeUsingOutputFormat() / FileOutputFormat - 方法和基类,是用来自定义文件输出的。支持自定义对象到字节的转换。
  • writeToSocket - 根据SerializationSchema将元素写入套接字
  • addSink - 调用自定义接收函数。Flink与其他系统(如Apache Kafka)的连接器捆绑在一起,这些连接器实现为sink函数。

注意,DataStream上的write*()方法主要用于调试目的。它们不参与Flink的检查点,这意味着这些函数通常具有至少一次的语义。数据刷新到目标系统取决于OutputFormat的实现。这意味着并不是发送给OutputFormat的所有元素都立即显示在目标系统中。此外,在失败的情况下,这些记录可能会丢失。
为了可靠地、精确一次的将流交付到文件系统,请使用flink-connector-filesystem。另外,通过. addsink(…)方法的自定义实现可以参与到Flink的检查点中,实现精确的一次语义。

Iterations

迭代流程序实现了一个阶梯函数,并将其嵌入到IterativeStream中。由于DataStream程序可能永远不会完成,因此没有最大迭代次数。相反,您需要指定使用split转换或filter将流的哪一部分反馈回迭代,以及哪一部分转发到下游。这里,我们展示了一个示例迭代,其中主体(重复的计算部分)是一个简单的map转换,反馈的元素由使用filter转发到下游的元素来区分。

val iteratedStream = someDataStream.iterate(
  iteration => {
    val iterationBody = iteration.map(/* this is executed many times */)
    (iterationBody.filter(/* one part of the stream */), iterationBody.filter(/* some other part of the stream */))
})

例如,这里有一个程序,它从一系列整数中连续地减去1,直到它们达到0

val someIntegers: DataStream[Long] = env.generateSequence(0, 1000)

val iteratedStream = someIntegers.iterate(
  iteration => {
    val minusOne = iteration.map( v => v - 1)
    val stillGreaterThanZero = minusOne.filter (_ > 0)
    val lessThanZero = minusOne.filter(_ <= 0)
    (stillGreaterThanZero, lessThanZero)
  }
)

Execution Parameters

StreamExecutionEnvironment包含ExecutionConfig,它允许为运行时设置特定于作业的配置值。
有关大多数参数的解释,请参阅 execution configuration
。这些参数特别适用于DataStream API

  • setAutoWatermarkInterval(long milliseconds):设置自动水印发射的间隔。你可以得到当前的值long getAutoWatermarkInterval()

Fault Tolerance

State & Checkpointing
描述如何启用和配置Flink的检查点机制。

Controlling Latency

默认情况下,元素不会在网络上逐个传输(这会导致不必要的网络流量),而是被缓冲。缓冲区的大小(实际上是在机器之间传输的)可以在Flink配置文件中设置。虽然这种方法很适合优化吞吐量,但当传入流不够快时,它可能会导致延迟问题。要控制吞吐量和延迟,可以在执行环境(或单个操作符)上使用env.setBufferTimeout(timeoutMillis)设置缓冲区满溢的最大等待时间。超过时间之后,即使缓冲区还没有满,也会自动发送缓冲区。此超时的默认值是100毫秒。
使用:

val env: LocalStreamEnvironment = StreamExecutionEnvironment.createLocalEnvironment
env.setBufferTimeout(timeoutMillis)

env.generateSequence(1,10).map(myMap).setBufferTimeout(timeoutMillis)

为了最大化吞吐量,设置setBufferTimeout(-1),它将删除超时,并且缓冲区只有在满了时才会被刷新。要最小化延迟,请将超时设置为接近0的值(例如5或10毫秒)。应该避免缓冲区超时为0,因为它会导致严重的性能下降。

Debugging

在分布式集群中运行流媒体程序之前,最好确保实现的算法按预期工作。因此,实现数据分析程序通常是检查结果、调试和改进的递增过程。通过支持IDE中的本地调试、测试数据的注入和结果数据的收集,Flink提供的特性极大地简化了数据分析程序的开发过程。本节给出一些提示,说明如何简化Flink程序的开发。

Local Execution Environment

LocalStreamEnvironment在创建Flink系统的同一个JVM进程中启动Flink系统。如果从IDE启动LocalEnvironment,则可以在代码中设置断点,并轻松地调试程序。
创建并按如下方式使用LocalEnvironment:


val lines = env.addSource(/* some source */)
// build your program

env.execute()

Collection Data Sources

Flink提供了由Java集合支持的特殊数据源,以简化测试。 一旦测试了程序,就可以轻松地将源和接收器替换为可读取/写入外部系统的源和接收器。
集合数据源的使用方法如下


// Create a DataStream from a list of elements
val myInts = env.fromElements(1, 2, 3, 4, 5)

// Create a DataStream from any Collection
val data: Seq[(String, Int)] = ...
val myTuples = env.fromCollection(data)

// Create a DataStream from an Iterator
val longIt: Iterator[Long] = ...
val myLongs = env.fromCollection(longIt)

注意:当前,集合数据源要求数据类型和迭代器实现Serializable。而且,收集数据源不能并行执行(并行度= 1)。

Iterator Data Sink

Flink还提供了一个接收器,用于收集用于测试和调试的DataStream结果。它可以这样使用

import org.apache.flink.streaming.experimental.DataStreamUtils
import scala.collection.JavaConverters.asScalaIteratorConverter

val myResult: DataStream[(String, Int)] = ...
val myOutput: Iterator[(String, Int)] = DataStreamUtils.collect(myResult.javaStream).asScala

注意:从Flink 1.5.0中删除了flink-streaming-contrib模块。它的类已经转移到flink-streaming-java和flink-streaming-scala中。

Where to go next?

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
禁止转载,如需转载请通过简信或评论联系作者。
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,802评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,109评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,683评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,458评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,452评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,505评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,901评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,550评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,763评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,556评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,629评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,330评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,898评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,897评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,140评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,807评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,339评论 2 342