总结来看,流处理是实时的,数据过来,接收后放入缓存,处理节点会立刻从缓存中拉取数据进行处理。批处理则是数据过来,序列化到缓存,并不会立即处理。flink通过引入超时值同时支持两种方式,当超时值为0时表示流处理。
flink编程基本步骤
1 运行环境
我们首先要获得已经存在的运行环境或者创建它。有3种方法得到运行环境:
(1)通过getExecutionEnvironment()获得;这将根据上下文得到运行环境,假如local模式,则它会创建一个local的运行环境;假如是集群模式,则会创建一个分布式的运行环境;
(2)通过createLocalEnvironment() 创建一个本地的运行环境;
(3)通过createRemoteEnvironment (String host, int port, String, and .jar files)创建一个远程的运行环境。
2 数据源
Flink支持许多预定义的数据源,同时也支持自定义数据源。
2.1 基于socket
DataStream API支持从socket读取数据,有如下3个方法:
socketTextStream(hostName, port);
socketTextStream(hostName,port,delimiter)
socketTextStream(hostName,port,delimiter, maxRetry)
2.2 基于文件
你可以使用readTextFile(String path)来消费文件中的数据作为流数据的来源,默认情况下的格式是TextInputFormat。当然你也可以通过readFile(FileInputFormat inputFormat, String path)来指定FileInputFormat的格式。
Flink同样支持读取文件流:
readFileStream(String filePath, long intervalMillis,
FileMonitoringFunction.WatchType watchType)
readFile(fileInputFormat, path, watchType, interval, pathFilter,
typeInfo)。
3 Transformation
Transformation允许将数据从一种形式转换为另一种形式,输入可以是1个源也可以是多个,输出则可以是0个、1个或者多个。下面我们一一介绍这些Transformations。
3.1 Map
输入1个元素,输出一个元素,Java API如下:
inputStream.map(new MapFunction<Integer, Integer>() {
@Override
public Integer map(Integer value) throws Exception {
return 5 * value;
}
});
3.2 FlatMap
输入1个元素,输出0个、1个或多个元素,Java API如下:
inputStream.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out)
throws Exception {
for(String word: value.split(" ")){
out.collect(word);
}
}
});
3.3 Filter
条件过滤时使用,当结果为true时,输出记录;
inputStream.filter(new FilterFunction<Integer>() {
@Override
public boolean filter(Integer value) throws Exception {
return value != 1;
}
});
3.4 keyBy
逻辑上按照key分组,内部使用hash函数进行分组,返回keyedDataStream:
inputStream.keyBy("someKey");
3.5 Reduce
keyedStream流上,将上一次reduce的结果和本次的进行操作,例如sum reduce的例子:
keyedInputStream. reduce(new ReduceFunction<Integer>() {
@Override
public Integer reduce(Integer value1, Integer value2)
throws Exception {
return value1 + value2;
}
});
3.6 Fold
在keyedStream流上的记录进行连接操作,例如:
keyedInputStream keyedStream.fold("Start", new FoldFunction<Integer,
String>() {
@Override
public String fold(String current, Integer value) {
return current + "=" + value;
}
});
假如是一个(1,2,3,4,5)的流,那么结果将是:Start=1=2=3=4=5
3.7 Aggregation
在keyedStream上应用类似min、max等聚合操作:
keyedInputStream.sum(0)
keyedInputStream.sum("key")
keyedInputStream.min(0)
keyedInputStream.min("key")
keyedInputStream.max(0)
keyedInputStream.max("key")
keyedInputStream.minBy(0)
keyedInputStream.minBy("key")
keyedInputStream.maxBy(0)
keyedInputStream.maxBy("key")
4. 深入
https://www.jianshu.com/p/f9d447a3c48f
http://vinoyang.com/2016/06/22/flink-data-stream-partitioner/
4. Flink DataStream API Programming Guide
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/datastream_api.html
5. 事件时间
参考:https://blog.csdn.net/lmalds/article/details/52704170
Event time is the time that each individual event occurred on its producing device. This time is typically embedded within the records before they enter Flink, and that event timestamp can be extracted from each record. In event time, the progress of time depends on the data, not on any wall clocks. Event time programs must specify how to generate Event Time Watermarks, which is the mechanism that signals progress in event time. This watermarking mechanism is described in a later section, below.
In a perfect world, event time processing would yield completely consistent and deterministic results, regardless of when events arrive, or their ordering. However, unless the events are known to arrive in-order (by timestamp), event time processing incurs some latency while waiting for out-of-order events. As it is only possible to wait for a finite period of time, this places a limit on how deterministic event time applications can be.
Assuming all of the data has arrived, event time operations will behave as expected, and produce correct and consistent results even when working with out-of-order or late events, or when reprocessing historic data. For example, an hourly event time window will contain all records that carry an event timestamp that falls into that hour, regardless of the order in which they arrive, or when they are processed. (See the section on late events for more information.)
Note that sometimes when event time programs are processing live data in real-time, they will use some processing time operations in order to guarantee that they are progressing in a timely fashion.
event time是指数据产生时的时间。
watermark是一种衡量Event Time进展的机制,它是数据本身的一个隐藏属性。通常基于Event Time的数据,自身都包含一个timestamp,例如1472693399700(2016-09-01 09:29:59.700),而这条数据的watermark时间则可能是:
watermark(1472693399700) = 1472693396700(2016-09-01 09:29:56.700)
这条数据的watermark时间是什么含义呢?即:timestamp小于1472693396700(2016-09-01 09:29:56.700)的数据,都已经到达了。
watermark是用于处理乱序事件的,而正确的处理乱序事件,通常用watermark机制结合window来实现。
我们知道,流处理从事件产生,到流经source,再到operator,中间是有一个过程和时间的。虽然大部分情况下,流到operator的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络、背压等原因,导致乱序的产生(out-of-order或者说late element)。
但是对于late element,我们又不能无限期的等下去,必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了。这个特别的机制,就是watermark。
通常,在接收到source的数据后,应该立刻生成watermark;但是,也可以在source后,应用简单的map或者filter操作,然后再生成watermark。
生成watermark的方式主要有2大类:
(1):With Periodic Watermarks
(2):With Punctuated Watermarks
第一种可以定义一个最大允许乱序的时间,这种情况应用较多。
编程示例
1)To work with event time, streaming programs need to set the time characteristic accordingly.
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
public class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<MyEvent> {
private final long maxOutOfOrderness = 3500; // 3.5 seconds
private long currentMaxTimestamp;
@Override
public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
long timestamp = element.getCreationTime();
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
return timestamp;
}
@Override
public Watermark getCurrentWatermark() {
// return the watermark as current highest timestamp minus the out-of-orderness bound
return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
}
}
6. flink 并行相关的核心概念
参考 : http://vinoyang.com/2016/05/02/flink-concepts/
程序基本流程
程序在Flink内部的执行具有并行、分布式的特性。stream被分割成stream partition,operator被分割成operator subtask,这些operator subtasks在不同的线程、不同的物理机或不同的容器中彼此互不依赖得执行。
一个特定operator的subtask的个数被称之为其parallelism(并行度)。一个stream的并行度总是等同于其producing operator的并行度。一个程序中,不同的operator可能具有不同的并行度。