1.理论
一.Flink流处理介绍
Flink 是什么
- Apache Flink is a framework and distributed processing enginefor stateful computations over unbounded and bounded data streams.
- Apache Flink 是一个框架和分布式处理引擎,用于对无界和有界数据流进行状态计算。
- 官网 https://flink.apache.org/flink-architecture.html
为什么选择 Flink
- 流数据更真实地反映了我们的生活方式
- 真实的数据大部分是连续不断产生的,而不是攒了一批产生的
- 传统的数据架构是基于有限数据集的
- 即攒一批数据然后处理,失去了实时性
- 我们的目标
-
低延迟
- 毫秒级别
-
高吞吐
- 海量数据
- 结果的****准确性****和****良好的容错性
-
低延迟
哪些行业需要处理流数据(延迟性要求高的业务)
- 电商和市场营销
- 数据报表、广告投放、业务流程需要
- 物联网(IOT)
- 传感器实时数据采集和显示、实时报警,交通运输业
- 电信业
- 基站流量调配
- 银行和金融业
- 实时结算和通知推送,实时检测异常行为
传统数据处理架构
事务处理
- compute和storage分开的
- 缺点:吞吐量低,当数据量很大的时候后台传统数据库多表联查很慢,支持不了高并发
- 优点:实时性好
分析处理
- 将数据从业务数据库复制到数仓,再进行分析和查询
- 当需要分析计算的时候,将数据从关系型数据库中提取出来做ETL计算(清洗、整合),然后统一放到数仓中,最后通过数据计算引擎写sql做查询,可以存报表或者机器学习
- 一个标准的离线查询的流程
- 缺点:基本舍弃了实时性
- 优点:高并发,数据量上去了
有状态的流式处理
- 数据不放库中,直接放在本地内存中,将数据存成本地状态,每次数据流入,直接判断本地状态结合状态做计算
- 有状态:
- 将原来需要持久化的数据存放在本地内存中,而不存放在关系型数据库中
- checkpoint 检查点
- 针对状态做定期的备份
- 问题:
- 分布式架构下数据的顺序无法保证
- 基于此,产生了第二代的流处理器:lambda 架构
流处理的演变
lambda 架构
- 用两套系统,同时保证低延迟和结果准确
- batch layer:批处理层
- 攒一批数据然后将结果放在batch table中
- speed layer:流处理层
- 保证速度,将数据存储在speed table中
- 将上面两张表结合然后响应给用户
- 实际效果:
- 客户端快速看见一个近似的处理结果,但是可能不准,隔一段时间之后,得到一个正确的处理结果
- 问题:
- 实现一个需求要实现两套系统,维护成本高
第三代流处理器:Flink
- Flink满足:低延迟、高吞吐、时间正确、表现力好、压力下保持正确
- storm:是第一代处理器,低延迟好,毫秒级延迟
- spark streaming:只能满足高吞吐、压力下保持正确,但是其他满足不了
Flink 的主要特点
事件驱动(Event-driven)
- 流处理持久化数据存储在内存中,即本地状态,为了容错可以选择落盘持久化到磁盘上
- 流式计算基于事件驱动的
基于流的世界观
- 在 Flink 的世界观中,一切都是由流组成的
- 离线数据是有界的流;
- 实时数据是一个没有界限的流;
- 这就是所谓的有界流和无界流
分层API
- 越顶层越抽象,表达含义越简明,使用越方便
- 越底层越具体,表达能力越丰富,使用越灵活
- 如最底层能操作evets、state、time
- Flink是个批流统一的框架
- 离线用 dateset
- 实时用 datastream
Flink 的其它特点
- 支持事件时间(event-time)和处理时间(processing-time)
语义
- 精确一次(exactly-once)的状态一致性保证
- 状态一致性:结果正确,即使出现故障也要保证恢复后结果正确
- 低延迟,每秒处理数百万个事件,毫秒级延迟
- 与众多常用存储系统的连接
- 高可用,动态扩展,实现7*24小时全天候运行
Flink vs Spark Streaming
- 流(stream)和微批(micro-batching)
- spark streaming 延迟一般在几百毫秒,底层实际是批处理,所以称为微批处理
- 这是其架构设计导致的
- flink延迟真正保证了毫秒
- 其也能做批处理,即将流视作有界流
- spark streaming 延迟一般在几百毫秒,底层实际是批处理,所以称为微批处理
- 数据模型
- spark 采用 RDD 模型,spark streaming 的 DStream 实际上也就是一组 组小批数据 RDD 的集合
- flink 基本数据模型是数据流,以及事件(Event)序列
- 即spark是基于一批数据的数据集做计算,flink是基于事件序列
- 运行时架构
- spark 是批计算,将 DAG 划分为不同的 stage,一个完成后才可以计算下一个,即每个事件数据都有等待的过程
- flink 是标准的流执行模式,一个事件在一个节点处理完后可以直接发往下一个节点进行处理
二.Flink运行架构
Flink 运行时的组件
作业管理器(JobManager)
-
控制一个应用程序执行的****主进程,也就是说,每个应用程序都会被一个不同的JobManager 所控制执行。
- JobManager 会先接收到要执行的应用程序,这个应用程序会包括:
- 作业(JobGraph)
- JobManager 会把JobGraph转换成一个物理层面的数据流图,这个图被叫做“执行图”(ExecutionGraph),包含了所有可以并发执行的任务。
- 逻辑数据流图(logical dataflow graph)
- 打包了所有的类、库和其它资源的JAR包。
- 作业(JobGraph)
- JobManager 会先接收到要执行的应用程序,这个应用程序会包括:
- JobManager 工作机制:
- 会向资源管理器(ResourceManager)请求执行任务必要的资源,也就是任务管理器(TaskManager)上的插槽(slot)。一旦它获取到了足够的资源,就会将执行图分发到真正运行它们的TaskManager上。
- 而在运行过程中,JobManager会负责所有需要中央协调的操作,比如说检查点(checkpoints)的协调。
- 针对每一个job做进程管理
任务管理器(TaskManager)
- Flink中的工作进程。
- 通常在Flink中会有多个TaskManager运行,每一个TaskManager都包含了一定数量的插槽(slots)。
- 插槽的数量限制了TaskManager能够执行的任务数量。
- TaskManager工作流程:
- 启动之后,TaskManager会向资源管理器(ResourceManager)注册它的插槽;
- 收到资源管理器的指令后,TaskManager就会将一个或者多个插槽提供给JobManager调用。
- JobManager就可以向插槽分配任务(tasks)来执行了。
- 在执行过程中,一个TaskManager可以跟其它运行同一应用程序的TaskManager交换数据。
资源管理器(ResourceManager)
- 主要负责管理TaskManager的插槽(slot),TaskManger 插槽是Flink中定义的处理资源单元。
- Flink为不同的环境和资源管理工具提供了不同资源管理器,比如YARN、Mesos、K8s,以及standalone部署。
- 当JobManager申请插槽资源时,ResourceManager会将有空闲插槽的TaskManager分配给JobManager。
- 如果ResourceManager没有****足够的****插槽****来满足JobManager的请求,它还可以向资源提供平台发起会话,以提供启动TaskManager进程的容器。
分发器(Dispatcher)
- 可以跨作业运行,它为应用提交提供了REST接口。
- 当一个应用被提交执行时,分发器就会启动并将应用移交给一个JobManager。
- Dispatcher也会启动一个Web UI,用来方便地展示和监控作业执行的信息。
- Dispatcher在架构中可能并不是必需的,这取决于应用提交运行的方式。
任务提交流程
- standone模式,用多少资源启动几个taskmanager,现用现启taskmanager
任务提交流程(YARN)
- resourcemanager是yarn的资源管理器,接收job后启动applicatonMaster,其中有Flink的生态如jobManager和resourceManager (Flink的资源管理),但是此时Flink的resourceManager就不管资源分配了,而是放到Yarn中管理类(第四步)
- Flink启动的时候需要从环境中加载配置
- 上图是yarn的job模式,提交一个job,启动一个集群
任务调度原理
思考
如何实现并行计算?
并行任务需要占用多少个slot?
一个流处理程序,到底包含多少个任务?
并行度(Parallelism)
- 一个特定算子的** 子任务(subtask)的个数**被称之为其并行度(parallelism)。
- 一个特定算子并行执行子任务的个数
- 一般情况下,一个 stream 的并行度,可以认为就是其所有算子中最大的并行度。
TaskManager 和 Slots
图中并行度是5,一个算子类型一个slot,可以通过slotsharinggroup实现,不同组之间一定互斥,占用不同的slot
- Flink 中每一个 TaskManager 都是一个JVM进程,它可能会在独立的线程上执行一个或多个子任务
- 因为是独立的jvm进程,所以其可以有多个子任务,多子任务即多线程
- 为了控制一个 TaskManager 能接收多少个 task, TaskManager 通过 task slot 来进行控制(一个 TaskManager 至少有一个 slot)
-
slot就是执行一个独立线程所需要的计算资源的最小单元,就是一组计算资源
- 每个slot内存彼此隔离
- cpu资源不隔离,如果该cpu此时有两个slot工作,即两个线程执行任务,就需要cpu做时间片的轮转,供两个线程同时使用
- 虽然没有直接设置cpu的分配,但是一般按照当前cpu的核心数量来设置slot
- number of task slot 一个taskmanger中slot数量,即每个taskManager最大的并行能力,即最多多少个线程并行执行
- Parallelism default 默认并行度,job提交的并行度,指定job需要的subtask数量
job并行度调成6,13个subtask
slot 来进行控制(一个 TaskManager 至少有一个 slot)TaskManager 和 Slots
- 默认情况下,Flink 允许子任务共享 slot,即使它们是不同任务的子任务。 这样的结果是,一个 slot 可以保存作业的整个管道。
-
共享slot:
- 前后发生的不同的任务可以共享一个slot
- 共享 slot:即不同的subtask放在了同一个slot中,一个slot上运行多个线程
- 同一个任务的并行的子任务必须分开,必须执行在不同的slot上,而先后发生的不同subtask可以运行在一个slot上
- 默认slot共享,如果不设置slot的group,默认每个操作使用前一个操作的共享组,即所有计算都在一个group中,所以默认所有操作共享slot
- 前后发生的不同subtask放在同一个slot中的好处:
- 一个 slot 可以保存作业的整个管道(Pipline),即在一个slot中执行了一套完整的作业算子,避免了不同算子之间在不同的slot中的通信
- slot之间彼此隔离,即使一个slot挂了,不影响其他slot中的计算
- 如果按照算子类型分配slot,每个算子需要的计算时间不同,可能会出现部分slot满载,部分slot空闲,出现数据堆积
- 数据共享使得每个slot资源都能合理利用
- Task Slot 是静态的概念,是指 TaskManager 具有的并发执行能力
并行子任务的分配
- 两条流,A、C,到D4合并,然做操作E
- A4 表示并行度是4,其有四个subtask
- 当前jobgraph一共有16个任务(所有操作并行子任务之和)
- 虽然是两条流做计算,但是slot不是6(C2+A4)而是4,即按照A4的并行度来分配4个slot,C2可以使用其中两个slot,不同操作可以共享slot
- 即前文提到的“一般情况下,一个 stream 的并行度,可以认为就是其所有算子中最大的并行度”
并行度的例子
- flink-conf.yaml
- 指定了taskslots数量:3,默认为cpu核心数
- 三个taskmanager,每个taskmanger有3个slot
- 一个作业如果parallelism=3最优的分配方式为:3个taskmanager各占用一个slot
- 一共9个slot
- example1中
- paralelism=1,并行度为1,三个操作共享内存使用一个slot
- example2中
- 将并行度调成2,如上图有三种修改方法
- flink-conf
- flink client
- 提交flink的job时,指定-p参数
- environment
- 代码中写死
- 优先级:先看代码中,然后看client,最后看conf
- 此时占用两个slot,优先将并行的任务提交给另外一个taskmanager上的slot中,
- 将并行度调成2,如上图有三种修改方法
- example3中
- 设置parallelism=9
- 一个操作表示一个任务,三个任务使用一个slot,共使用9个slot
- example4中
- parallelism=9,将sink算子设置成1,此时有19个任务(18个source+reduce与1个sink)
程序与数据流(DataFlow)
- 所有的Flink程序都是由三部分组成的: Source 、Transformation 和 Sink。
- 流的转换都是基于Datastream
- Source任务从外部读取数据源
- Transformation任务 负责转换计算
- sink任务负责将当前计算结果输出到外部,可以有多个比如输出到多个应用中
- 以上所有任务的操作在flin中就是一个完整的数据流图dataflow graph
- Source 负责读取数据源,Transformation 利用各种算子进行处理加工,Sink负责输出
logical dataflow graph
- 在运行时,Flink上运行的程序会被映射成“逻辑数据流”(dataflows),它包含了这三部分
- 每一个dataflow以一个或多个sources开始****以一个或多个sinks结束。dataflow类似于任意的有向无环图(DAG)
- 在大部分情况下,程序中的转换运算(transformations)跟dataflow中的算子(operator)是一一对应的关系
执行图(ExecutionGraph)
- Flink 中的执行图可以分成四层:StreamGraph -> JobGraph ->ExecutionGraph -> 物理执行图
- StreamGraph:
- 是根据用户通过 Stream API 编写的代码生成的最初的图。用来表示程序的拓扑结构。
- 一个操作就是一个节点一个任务
- JobGraph:
- StreamGraph经过优化后生成了 JobGraph,提交给 JobManager的数据结构。
- 主要的优化为,将多个符合条件的节点 chain(连) 在一起作为一个节点
- 该动作在客户端上生成
- ExecutionGraph:
- JobManager 根据 JobGraph 生成ExecutionGraph。
- ExecutionGraph是JobGraph的并行化版本,是调度层最核心的数据结构。
- 物理执行图:
- JobManager 根据 ExecutionGraph 对 Job 进行调度后,在各个TaskManager 上部署 Task 后形成的“图”,并不是一个具体的数据结构。
- StreamGraph:
数据传输形式
- 一个程序中,不同的算子可能具有不同的并行度
- 算子之间传输数据的形式可以是 one-to-one (forwarding) 的模式也可以是redistributing 的模式,具体是哪一种形式,取决于算子的种类
- One-to-one:
- stream维护着分区以及元素的顺序(比如source和map之间)。
- 这意味着map 算子的子任务看到的元素的个数以及顺序跟 source 算子的子任务生产的元素的个数、顺序相同。
- 不发发生分区
- map、fliter、flatMap等算子都是one-to-one的对应关系。
- Redistributing:
- stream的分区会发生改变。
- 每一个算子的子任务依据所选择的transformation发送数据到不同的目标任务。
- 例如,keyBy 基于 hashCode 重分区、而 broadcast 和 rebalance 会随机重新分区,这些算子都会引起redistribute过程,而 redistribute 过程就类似于 Spark 中的 shuffle 过程。
- rebalance是轮询
- 例如,keyBy 基于 hashCode 重分区、而 broadcast 和 rebalance 会随机重新分区,这些算子都会引起redistribute过程,而 redistribute 过程就类似于 Spark 中的 shuffle 过程。
- One-to-one:
任务链(Operator Chains)
- Flink 采用了一种称为任务链的优化技术,可以在特定条件下减少本地通信的开销。
- 为了满足任务链的要求,必须将两个或多个算子设为相同的并行度,并通过本地转发(local forward)的方式进行连接
- 将多个任务合并成一个任务
- 相同并行度的 one-to-one 操作,Flink 这样相连的算子链接在一起形成一个 task,原来的算子成为里面的 subtask
-
并行度相同、并且是 one-to-one 操作,两个条件缺一不可
- 默认满足以上两个条件就会合并,如果不想让他们合并则:
- 在两个算子之间做一个重分区算子
- api disableChaining() 指定当前算子不参与operation chain中
- 默认满足以上两个条件就会合并,如果不想让他们合并则:
例子
- source并行度是1,flatmap并行度是2不能合并
- flatmap与key agg并行度相同,但是基于hashcode重分区,算子之间传输类型是redistributing,所以也不能合并
- key agg与sink并行度相同且是one-to-one可以合并
- 所以以上需要两个slot就可以运行了