Flink学习总结
-
flink是什么:
- 为分布式、高性能、随时可用以及准确的流处理应用程序打造的开源流处理框架
-
流处理&& 批处理
批处理:spark streaming为批处理代表,数据有界,持久,大量,一般用于离线
流处理:flink为代表,数据无界,实时,无需针对整个数据集,针对每一项数据进行处理,一般用于实时统计
-
flink 分层API
High -level Analytics API:SQL/Table API(dynamic tables):
Stream & Batch Data Porcessing:DataStream API (streams, windows)
statefu Event-Driven Application:ProcessFunction(events, state, time)
-
关系:
最底层级别的仅仅提供抽象的有状态流,通过函数ProcessFunction 被嵌入到DataStream API 中
大多数应用,不需要上述的底层抽象,而是针对核心API(Core APIS)进行变成,比如datastream API /DataSet API
最高抽象层为SQL,表达能力上与Table API类似,但是Flink的SQL并不完善,推荐使用DataStreamAPI
- Flink部署方式:
-
Standalone模式
- 一般本地独立部署,无其他外部资源管理器(Hadoop,k8s这些)
-
Yarn模式
- Session-Cluster运行模式
先启动集群,在提交作业,接着向yarn申请一块空间,资源永远保持不变,资源满了,下一个任务无法提交,直到有作业执行完成,释放资源
所有作业共享Dispatcher和ReourceManager
适合规模小执行时间短的作业
- Per-Job-Cluster运行模式
一个Job会对应一个集群,每提交一个作业,根据自身情况,都会向yarn申请资源,直到作业执行完成,一个作业是否失败,不影响下一个作业正常运行
独享Dispatcher和ReourceManager
适合大规模,长时间运行的作业
- Kubernetes部署
- Session-Cluster运行模式
- Flink运行时的组件:
-
作业管理器(JobManager)
控制应用程序执行的主程序,
JobManager接受程序因该包括:作业图(JobGraph),逻辑数据流图(Logical dataflow graph)和所有类,库和其他资源的jar包
JobManager会将JobGraph转换为物理层面的数据流图---执行图(ExecutionGraph)
ExecutionGraph包含所有可以并发执行的任务,
JobManager向系统申请执行资源(TaskManager上的slot),资源足够,执行图分发到真正运行他们的TaskManager上去运行,
运行过程中,JobManager负责所有需要中央协调的操作(state,checkpoint)
-
资源管理器(ResourceManager)
主要负责管理任务管理器(TaskManager)的插槽(slot)
slot是TaskManager处理任务的资源单位
如果在k8s上或者Hadoop上,ResourceManager负责向系统申请运行资源,和资源平台对话。
还负责终止空闲TaskManager,释放计算机资源
任务管理器(TaskManager)
flink会有很多TaskManager运行,每一个TaskManager包含一个或者多个slots
slots数量会限制TaskManager执行的任务数量
可以和同一个应用程序中的其他TaskManager交换数据
分发器(Dispatcher)
当一个应用被提交执行的时候,分发器就会启动,并将应用移交给一个JobManager。
为应用提供一个REST接口,也会启动一个WebUI,展示任务执行情况
分发器在架构中可能并不是需要的,取决于提交运行方式
- 任务提交流程:
- 这是一个整体的提交流程,如果部署方式不一样,比如yarn,k8s等,提交过程会有变化
- 任务调度原理
FLink集群启动后,会首先启动一个JobManagerhe 一个或者多个TaskManager
Client提交任务给JobManager,JobManager再调度任务到各个TaskManager去执行
TaskManager将心跳和统计信息回报给JobManager(TaskManager之间是按照流的形式进行数据传输的)
上述三则均为独立的JVM进程
Client:为提交 Job 的客户端,可以是运行在任何机器上(与 JobManager 环境 连通即可)。提交Job 后,Client 可以结束进程(Streaming 的任务),也可以不 结束并等待结果返回。
JobManager:主要负责调度 Job 并协调 Task 做 checkpoint,职责上很像 Storm 的 Nimbus。从 Client 处接收到 Job 和 JAR 包等资源后,会生成优化后的 执行计划,并以 Task 的单元调度到各个 TaskManager 去执行。
TaskManager:在启动的时候就设置好了槽位数(Slot),每个 slot 能启动一个 Task,Task 为线程。从 JobManager 处接收需要部署的 Task,部署启动后,与自 己的上游建立 Netty 连接,接收数据并处理。
- Task Manager与slots关系
slot表示一个Task Manager具有的资源有多少固定大小的子集。Task Manager会将资源平均划分N份
slot数量的动态,允许用户定义task 之间的隔离形式。一个slot代表task运行在一个独立的JVM中,划分多个,意味着task共享一个JVM,共享TCP连接
Task Slot 是静态的概念,是指 TaskManager 具有的并发执行能力
并行度 parallelism 是动态概念, 即 TaskManager 运行程序时实际使用的并发能力
-
程序和数据流(DataFlow)
所有flink程序都是由三部分组成:Environments,Source,Transformation和Sink
Environments:flink执行的环境
Source:负责读取数据源,Kafka,mysql,ck等
Transformation:利用算子进行处理加工
Sink:负责输出,可以输出到Kafka,mysql,CK等都可以
-
执行图(ExecutionGraph)
StreamGraph:用户通过代码,生成最初的数据流图
JobGraph:StreamGraph经过优化后生成了Job Graph,提交给了JobManager的数据结构,
ExecutionGraph:JobManager根据JobGraph生成的,是JobGraph并行化的版本,是调度层最核心的数据结构
物理执行图:jobManager根据ExecutionGraph对Job进行调度后,在各个TaskManager上部署Task后形成的图,这里并不是一个具体的数据结构了
-
并行度(Parallelism)
在执行过程中,一个流(stream)包含一个或多个分区(stream partition),而 每一个算子(operator)可以包含一个或多个子任务(operator subtask),
这些子任 务在不同的线程、不同的物理机或不同的容器中彼此互不依赖地执行。
一个特定算子的子任务(subtask)的个数被称之为其并行度(parallelism)。
一般情况下,一个流程序的并行度,可以认为就是其所有算子中最大的并行度。一 个程序中,不同的算子可能具有不同的并行度。
-
任务链(OperatorChains)
Flink将并行度为One-to-One的算子操作,连接在一起,形成一个task
可以减少线程切换,缓存数据交换,增加吞吐量
One-to-one:stream(比如在 source 和 map operator 之间)维护着分区以及元素的 顺序。那意味着 map 算子的子任务看到的元素的个数以及顺序跟 source 算子的子 任务生产的元素的个数、顺序相同,map、fliter、flatMap 等算子都是 one-to-one 的 对应关系。
-
Flink流处理API
-
Environment
getExecutionEnvironment:创建一个执行程序环境上下文,会根据运行方式决定返回一个什么样的运行环境,比较常用
createLocalEnvironment:创造一个本地执行环境
createLocalEnvironment:返回远程集群执行环境,比如想在本地模拟远程服务器运行环境。
-
Source
env.fromCollection():从一个集合中获取数据(Set, Array等),一般不常用。
env.readTextFile():从一个文件中获取数据
env.addSource():从自定义的数据源获取(比如Kafka,mysql等)
自定义Source:继承SourceFunction即可,实现run方法,不会的化,可以去看源码中的样例。
-
Transform
map:1对1 ,针对一个输入数据做处理后,输出一个结果
flatMap:1对n 或则n对1。将一个输入数据解析成多个结果,或者将多个输入,变成1个结果。一般都是一变多
filter:用作数据过滤,不符合的将不被输出
-
keyby:按照指定的key,对数据流进行一个分区,每个分区内,都包含相同的key元素,在内部按照hash的形式实现。很类似group by。会将DataStream变为KeyedStream
以下算子针对KeyedStream的每一个分区做聚合
sum()
min()
max()
minBy()
maxBy()
reduce:将一个分组的数据流聚合操作,合并当前元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,不仅仅是最后一次聚合的结果
split:根据某些特征,将dataStream拆分成两个或则多个DataStream。DataStream -> splitStream
select:从一个splitStream中获取一个或则多个DataStream。splitStream -> DataStream
Connect:
comap
Union
-
-
内部支持的数据结构
-
基础数据类型:
Tuple
POJOs
Arrays,List,Maps,Enums
-
函数:flink暴露了所有的udf函数接口,用户可以继承或则实现某些接口实现对应功能
MapFunction
FilterFunction
ProcessFunction
LambdaFunction:支持匿名函数,lambda表达式
-
richFunction:富函数,和上面常规函数不一样,富函数可以获取运行环境的上下文,并拥有一些生命周期,可以实现更复杂的功能
RichMapFunction
RichFlatMapFunction
RichFilterFunction
-
生命周期:
open():rich function初始化方法,当一个算子,被调用前,open()会被调用
close():生命周期中最后一个调用方法,做一些清理工作
getRuntimeContext():提供RuntimeContext的一些信息,例如函数执行并行度,任务名称,state状态等
-
Sink
官方默认提供了一些常见sink,比如Kafka,redis,ES等
可以自定义
stream.addSink(new MySink(xxxxx))
Flink中的window:
window是切割无限数据为有限块进行处理的手段
-
分为两类
CountWindow:按照指定数据条数,生成一个window,和时间无关。.countWindow(xxx), .countWindow(xxx,xxx)
-
TimeWindow:按照时间生成window
Tumbling Window(滚动窗口):时间对其,窗口长度固定,没有重叠。.timeWindow(Time.seconds(xxx))
Sliding Window(滑动窗口):时间对其,窗口长度固定,可以有重叠。.timeWindow(Time.seconds(xxxx), Time.seconds(xxxx))
Session Window(会话窗口):类似于web中的session,时间没有对齐
-
Window Function
Window function 定义了对窗口中收集的数据做的计算操作
增量聚合函数:每条数据进来就计算,,保持简单状态(ReduceFunction,AggregateFunction)
全窗口函数:会把所有数据都收集起来,等到计算的时候便利所有数据(ProcessWindowFunction)
时间语义与WarnerMark
-
时间语义
Event Time:事件创建的时间,由事件中的时间戳描述。但部分业务都会使用这个时间语义
Ingestion Time:数据进入flink的时间
Processing Time:每一个执行基于时间操作的算子的本地系统时间
通过env.setStreamTimeCharacteristic(TimeCharacteristic.EvenTime)
-
WarnerMark
由于flink为分布操作,所接受到的事件,其实并不是严格意义上的Event Time顺序排列发生的
针对乱序,所开的窗口window中的计算,会出现问题,部分数据会在窗口之外的时间发送到flink里面来。
watermark是一种衡量event time进展机制,作为一种延迟出发机制。允许窗口在额外时间之后关闭,收集window窗口时间内的数据
比如:watermark=10,window(5,5),所以window搜集0-15分钟内所有到达的eventTime为0-5的数据。
watermark是基于数据携带的时间戳计算的,并不是系统本身的时间戳或者计时
引入:dataStream.assignTimestampAndWaterMarks()
侧输出流(SideOutput):可以将主流分为一个或者多个side outputs。比如将温度低于30的数据输出到另外一个side output
dateStream.getSideOutput(new OutPutTag())
flink中的state状态编程和容错机制:
无状态流处理:独立观察每一件事件,并根据最后一个事件输出结果,比如检测问题超过90的异常情况告警
有状态流处理:根据多个事件输出结果,比如窗口内事件发生次数。
-
keyState:只能用于keyed Stream(key by 算子计算处理之后)
-
Value State 保存单个值
get操作:valueState.value
set操作:value State.update(T value)
-
ListState保存一个列表
ListState.add(T value)
ListState.addAll(List<T> values)
ListState.get() 返回Iterable
ListState.update(List<T> values)
-
MapState<K, V> 保存kv对
MapState.get(key)
MapState.put(key)
MapState.contains(key)
MapState.remove(key)
ReducingState<T>
AggregatingState<I,O>
-
flank中的checkpoint: