Flink中的流应用就是在数据流上应用各种转化(如:filter,update state,difine window,aggregation)。数据流有各种数据源创建而来(如:消息队列,socket流,文件等)。结果输出到sink,如写入文件或者标准输出。Flink程序可以在多种上下文中运行,standalone,内置在其他应用中等。应用可以在本地JVM中执行,也可以在集群的许多机器中执行。
示例程序
下面的程序是一个完成的应用,它演示了如何在web soscke上使用window统计5秒内的字数。你可以复制代码然后在你本地运行。
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class WindowWordCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Integer>> dataStream = env
.socketTextStream("localhost", 9999)
.flatMap(new Splitter())
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1);
dataStream.print();
env.execute("Window WordCount");
}
public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
for (String word: sentence.split(" ")) {
out.collect(new Tuple2<String, Integer>(word, 1));
}
}
}
}
运行应用前,先使用 netcat 在命令行中开启输入流:
nc -lk 9999
输入一些单词就会返回新的结果。这些单词或作为字数统计应用的输入。如果你想看到统计值大于1,可以在5秒内一遍又一遍的输入相同的单词(如果你做不到,可以增加window的大小)
数据源 Data Source
Source指的是你的程序从哪里读取它的输入。你可以使用 StreamExecutionEnvironment.addSource(sourceFunction)在你的程序中添加数据源。Flink自带了一些实现好的数据源函数,淡然你可以实现 SourceFunction 来实现自定义的非并行的source或者实现 ParallelSourceFunction 接口或者继承 RichParallelSourceFunction 来实现并行的source。
StreamExecutionEnvironment有一些实现定义好的数据源方法:
基于文件的数据源:
- readTextFile(path) - 读取text文件。也就是说使用 TextInputFormat 一行一行的读取数据。
- readFile(fileInputFormat,path) - 使用给定的 input format读取文件
- readFile(fileInputFormat,path,watchType,interval,pathFilter,typeInfo) - 这个方法在flink内部,被上面的两个方法所调用。它使用给定的fileInputFomat读取path中的文件。根据 watchType 的值,数据源会定期(interval 毫秒)监控path中的新数据(FileProcessingMode.PROCESS_CONTINUOUSLY),或者仅对当前path下的文件进行一次处理,然后退出(FileProcessingMode.PROCESS_ONCE)。使用 pathFilter ,用户可以排除不想要处理的文件。
实现:
在内部,Flink将读取文件分为两个子任务,分别叫做 目录监控 与 数据读取。每个任务都是单独运行的。目录监控是一个单线程的任务,而数据读取任务可以是多线程的并发任务。数据读取任务的并发度取决于job的并发度。目录监控的功能在于定期监控目录,发现需要被处理的文件,将它们分片 split 然后指定分片给下游的reader。reader会进行实际读取数据的操作。每一个split仅会被一个reader读取,而一个reader可能会读取多个split(依次读取)。
重要说明:
1.如果 watchType 设置为 FileProcessingMode.PROCESS_CONTINUOUSLY,当文件被修改后,它的内容会被全部重新进行处理。这会破坏“精确一次”的语义,因为向文件中追加数据,会导致整个文件进行重新处理。
2.如果 watchType 设置为 FileProcessingMode.PROCESS_ONCE,source只会扫描path一次然后退出,而不会等到reader读取所有数据完毕后再退出。当然,reader会继续进行数据读取,直到所有文件内容都读取完毕。关闭source会导致之后不会再有checkpoint。这将导致故障恢复时,需要等待更长的时间,因为job会从上次checkpoint处会进行重新读取数据。
基于socket:
- socketTextStream - 从socket读取数据。数据可以被 分隔符delimiter 分隔开
基于集合:
- fromCollection(Collection) - 从java集合中创造数据流。所有集合中的数据必须是同样的类型
- fromCollection(Iterator,Class) - 从iterator中创造数据流。class参数指定了iterator返回的数据的类型
- fromElements(T ...) - 从给定的对象序列中创造数据流。所有对象必须是相同的类型
- fromParallelCollection(SplitableIterator , Class) - 从iterator中并行的创造数据流。class参数指定了iterator返回的数据的类型
- generateSequence(from,to) - 使用给定的interval并行的生成数字序列
自定义:
- addSource - 使用source function。如,从Kafka中读取数据,你可以使用 addSource(new FlinkKafkaConsumer08<>(...)).
DataStream Transformations
查阅 operator 文档
Data Sink
Data Sink读取数据流,并将它们写入到file,socket,其他系统或者打印它们。Flink自带了一些output format,它们被封装到一些操作符中:
- writeAsText() / TextOutputFormat - 将数据作为一整行string,写入文件。通过调用数据的toString方法
- writeAsCsv(...) / CsvOutputFormat - 将 tuple 以逗号分隔,写入文件。行与行以及field之间的分隔符可以自定义。每一个field的值,是通过调用toString方法获取的
- pring() / pringToErr() - 将数据的toString方法的值打印到口红纸条。可以选择前缀,在打印输出内容前先打印前缀。这能够区分不同的print的内容。如果并发度大于1,输出同样会打印一个task的标识符。
- writeUsingOutputFormat() / FileOutputFormat - 使用自定义文件输出的基类与方法。支持自定义的 对象-字节 的转化。
- writeToSocket - 根据 SerializationSchema 将数据写入socket
- addSink - 调用传入的自定义 sink function。Flink通过实现 sink function可以与其他系统连接起来(如kafka)
注意的是 write*() 方法主要用于调试的目的。它们没有参与flink的checkpoint过程,这就意味着使用这些函数是“at-least-once”至少一次语义。数据如何写入目标系统是由OutputFormat决定的,也就是说发送到OutputFormat的数据并不一定会立即写入目标系统(如批量写入情况)。因此,在遇到故障时,这些数据有可能会丢失。
为了稳定地,精确一致的将流数据写入问加你系统,建议使用 flink-connector-filesystem。当然,如果自定义了sink function,通过 addSink 添加该自定义的sink,也可以参与flink的checkpoint过程,保持 exactly-once 语义。
Iterator
迭代流程序实现了step function,并且内置在 IterativeStream中。由于DataStream程序可能不会停止,因此iteration中不会有最大数量限制。你需要定义流中的哪些数据需要继续迭代,哪些数据可以发送到下游的操作符,你可以使用split或者filter实现。下面我们使用 filter 来演示。首先,我们定义一个 IterativeStream :
IterativeStream<Integer> iteration = input.iterate();
然后,我们定义在循环中,需要对数据流做哪些操作(下面我们就简单的使用map作为演示)
DataStream<Integer> iterationBody = iteration.map(/* this is executed many times */);
为了定义迭代器何时关闭,可以调用 IterativeStream 的 closeWith(feedbackStream) 方法。传入 closeWith() 的数据流会再次进入迭代器,放到迭代器的head。一个常用的模式是,使用filter将流的一部分重新放入迭代器,而另一部分下发到下游操作符这些filter可以定义“终止”的逻辑,也就是一个数据可以不再进入迭代器,而是被转发到下游操作符。
iteration.closeWith(iterationBody.filter(/* one part of the stream */));
DataStream<Integer> output = iterationBody.filter(/* some other part of the stream */);
例如,下面的程序就是对数据进行减1操作,直到为0:
DataStream<Long> someIntegers = env.generateSequence(0, 1000);
IterativeStream<Long> iteration = someIntegers.iterate();
DataStream<Long> minusOne = iteration.map(new MapFunction<Long, Long>() {
@Override
public Long map(Long value) throws Exception {
return value - 1 ;
}
});
DataStream<Long> stillGreaterThanZero = minusOne.filter(new FilterFunction<Long>() {
@Override
public boolean filter(Long value) throws Exception {
return (value > 0);
}
});
iteration.closeWith(stillGreaterThanZero);
DataStream<Long> lessThanZero = minusOne.filter(new FilterFunction<Long>() {
@Override
public boolean filter(Long value) throws Exception {
return (value <= 0);
}
});
Execution Parameter 执行参数
StreamExecutionEnvironment 包括 ExecutionConfig ,它允许设置运行时所需的job配置。
请参阅 execution configuration 获取更多参数的解释。下面的参数仅属于 DataStream API:
- setAutoWatermarkInterval(long milliseconds) : 设置 watermark 发射的间隔。你可以公共 long getAutoWatermartkInterval() 获取当前的值。
故障容忍
控制延迟
默认情况下,数据在网络间传输时,并不是一个一个的传输(造成不必要的网络拥堵),而是缓存后一起传输。buffer的大小可以在Flink 的配置文件中配置。尽管这种方式可以优化吞吐率,但是当输入流的速度不够快时,会造成延迟问题。为了平衡吞吐率和延迟,你可以使用 env.setBufferTimeout(timeoutMillis) 来设置最大等待时间。超过这个时间后,即便buffer没有填满,也要发出去。默认值为100ms。
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setBufferTimeout(timeoutMillis);
env.generateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);
为了最大化吞吐率,设置 setBufferTimeout(-1) 会移除超时设置,仅当buffer填满后才发送。为了最小化延迟,设置超时的值接近0(如 5 或 10 毫秒)。应该避免设置值为0,因为这会导致服务性能下降。
调试 Debugging
在提交任务到分布式集群运行前,最好确认程序可以按预期运行。因此,实现一个数据分析应用,通常是一个增量的过程:检查结果,调试,优化。
Flink提供了本地IDE调试的功能,简化了数据分析应用的开发。包括加载测试数据,收集结果数据。这一部分会显示如何简化flink程序的开发,便于测试调试程序。
本地运行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
DataStream<String> lines = env.addSource(/* some source */);
// build your program
env.execute();
加载测试数据
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
// Create a DataStream from a list of elements
DataStream<Integer> myInts = env.fromElements(1, 2, 3, 4, 5);
// Create a DataStream from any Java collection
List<Tuple2<String, Integer>> data = ...
DataStream<Tuple2<String, Integer>> myTuples = env.fromCollection(data);
// Create a DataStream from an Iterator
Iterator<Long> longIt = ...
DataStream<Long> myLongs = env.fromCollection(longIt, Long.class);
注意:需要提供数据烈性,iterator要实现 Serializable。不能并发执行
迭代Sink
import org.apache.flink.streaming.experimental.DataStreamUtils
DataStream<Tuple2<String, Integer>> myResult = ...
Iterator<Tuple2<String, Integer>> myOutput = DataStreamUtils.collect(myResult)