1、背景
降本增效大背景下,统计分析实时任务资源使用情况,核心关注任务配置的CPU、内存、 实际使用的CPU、内存,首先对Flink任务涉及的基础概念做个梳理理解
2、Flink基础概念
介绍的主要是与 Flink 资源管理相关的组件,我们知道一个 Flink Cluster 是由一个 Flink Master 和多个 Task Manager 组成的,Flink Master 和 Task Manager 是进程级组件,其他的组件都是进程内的组件。
2.1、jobManager
2.1.1、概述
JobManager :协调 Flink 应用程序的分布式执行。它决定何时调度下一个 task(或一组 task)、对完成的 task 或执行失败做出反应、协调 checkpoint、并且协调从失败中恢复等等。这个进程由三个不同的组件组成:
- ResourceManager: 负责 Flink 集群中的资源提供、回收、分配 。 它管理 taskslots,这是 Flink 集群中资源调度的单位。Flink 为不同的环境和资源提供者(例如YARN、Kubernetes 和 standalone 部署)实现了对应的 ResourceManager。
- Dispatcher:提供了一个 REST 接口,用来提交 Flink 应用程序执行,并为每个提交的作业启动一个新的 JobMaster。它还运行 Flink WebUI 用来提供作业执行信息。
- JobMaster:负责管理单个JobGraph的执行。Flink 集群中可以同时运行多个作业,每个作业都有自己的 唯一一个JobMaster。
2.1.2、设置内存
- Per job模式:-yjm参数设置内存如 -yjm 1024 表示jobManager内存1G
- session模式:-jm 参数设置内存,如 -jm 1024 表示jobManager内存1G
参数jobmanager.memory.process.size设置
2.1.3、设置CPU
- K8s:kubernetes.jobmanager.cpu 默认1C
- yarn: yarn.appmaster.vcores 默认1C
2.2、taskManager
2.2.1、概述
TaskManager:执行作业流的 task,并且缓存和交换数据流。在 TaskManager 中资源调度的最小单位是 task slot。TaskManager 中 task slot 的数量表示并发处理 task 的数量。
2.2.3、数量如何设置
- Per job模式:TaskManager的数量是在提交作业时根据并发度动态计算(-yn参数指定已失效),根据设定的operator的最大并发度计算,例如,如果作业中operator的最大并发度为10,则 Parallelism/numberOfTaskSlots为向YARN申请的TaskManager数,如:Parallelism为10,numberOfTaskSlots为1,则TaskManager为10
- session模式:“-n NUM”参数设置TaskManager个数
2.2.4、设置内存
- Per job模式:-ytm参数设置内存,如-ytm 1024 表示每个taskmanager内存1G
- session模式:-tm参数设置内存,如 -tm 1024 表示每个taskmanager内存1G
参数taskmanager.memory.process.size设置
2.2.5、设置CPU
- k8s:kubernetes.taskmanager.cpu
- yarn: the cpu is set to the number of slots per TaskManager
如果你想为每个 TaskManager 分配多个 vcores,而不是 slot number,你可以在flink/conf/flink-conf.yaml中额外提供yarn.containers.vcores设置 - 每个 YARN 的虚拟核心数
2.3、并发度(parallelism)
2.3.1、概述
特定算子的子任务(subtask)的个数称之为并行度(parallel),一般情况下,一个数据流的并行度可以认为是其所有算子中最大的并行度。Flink中每个算子都可以在代码中通过.setParallelism(n)来重新设置并行度。而并行执行的subtask要发布到不同的slot中去执行
2.3.2、设置parallelism
- 在flink的配置文件中flink-conf.yaml,默认的并行度为1;
- 在以shell的方式提交flink job的时候,可以使用-p指定程序的并行度,如:flink run -p 10
- 在flink job程序内设置并行度,如: env.setParallelism(10);
- 每个算子指定并行度;如:.map(new XxxMapFunction).setParallelism(5)
- 并行度设置优先级是:算子设置并行度 > env 设置并行度 > 配置文件默认并行度
- 如果并发度值超过slot数量,flink任务会等待申请任务资源超时,而抛出异常
2.4、slot
2.4.1、概述
TaskManager 执行具体的 Task,TaskManager 为了对资源进行隔离和增加允许的task数,引入了 slot 的概念,这个 slot 对资源的隔离仅仅是对内存进行隔离,策略是均分,比如 taskmanager 的管理内存是 3 GB,假如有两个 slot,那么每个 slot 就仅仅有 1.5 GB 内存可用
2.4.2、数量设置
配置文件taskmanager.numberOfTaskSlots 参数设置
- Per job模式:-ys参数设置slot的数量
- session模式:-s参数设置slot的数量
2.4.3、slot和taskManager之间的关系
每个 task slot 代表 TaskManager 中资源的固定子集。例如,具有 3 个 slot 的 TaskManager,会将其托管内存 1/3 用于每个 slot。分配资源意味着 subtask 不会与其他作业的 subtask 竞争托管内存,而是具有一定数量的保留托管内存。注意此处没有 CPU 隔离;当前 slot 仅分离 task 的托管内存。
通过调整 task slot 的数量,用户可以定义 subtask 如何互相隔离。每个 TaskManager 有一个 slot,这意味着每个 task 组都在单独的 JVM 中运行。具有多个 slot 意味着更多 subtask 共享同一 JVM。同一 JVM 中的 task 共享 TCP 连接(通过多路复用)和心跳信息。它们还可以共享数据集和数据结构,从而减少了每个 task 的开销
2.4.4、TaskManager,Slot,Task和并行度parallelism的关系
slot 是指 TaskManager 的最大并发能力
如上图,3 个 TaskManager,每个 TaskManager 3 个 slot,此时一共有 9 个 slot
如上图,所有的算子并行度为1,只需要 1 个 slot 就能解决问题,有 8 个处于空闲。
如上图的上半部分,并行度为2,使用了 2 个 slot。上图的下半部分,设置并行度为9,所有的 slot 都用到了。
2.5、 task 和 subtask
2.5.1、概述
task: 是没有产生 shuffle,One-to-one 模式下算子的集合,里面封装了数个 subTask,类似 spark 中的 TaskSet。
subTask:flink 最小的执行单元,task 每一个分区会形成一个 subTask ,类似 spark 中的 task。
2.5.2、详细说明
下面我们用flink官方案例说明一下,案例代码如下
案例执行DAG图
说明:图中假设是 source/map 的并行度都是 2,keyby/window/apply 的并行度也都是 2,sink 的是 1,那么有几个 task,几个subTask 呢?
答案:共 task 有 3 个,subTask 是五个,最终需要五个线程。
解释:由于 source 到 map 没有产生 shuffle ,并且并行度相同,属于 One-to-one 的模式,所有 source 和 map 划分成一个 task,后面的 map 到 keyBy ,和最后的 sink 都有 shuffle 产生,并行度发生改变,所有 keyBy,sink 都是一个单独的 task,所有共有 3 个task,其中 source,map 并行度是 2,所以有两个 subTask,以此类推共有 5 个 subtask。
2.5.3、UI 界面上查看任务的 task 和 subTask
如下图我们点击任务的详情页面,右上角的 4 就是 task 总数,DAG 中的每一个矩形代表一个独立的 task,点击每一个 task 详情,我们能看到 task 的 subtask 信息,包括 subtask 的接受数据量,状态等信息
2.6、Operator Chains
2.6.1、概述
为了更高效地分布式执行,Flink会尽可能地将operator的subtask链接(chain)在一起形成task。每个task在一个线程中执行。将operators链接成task是非常有效的优化:它能减少线程之间的切换,减少消息的序列化/反序列化,减少数据在缓冲区的交换,减少了延迟的同时提高整体的吞吐量。
2.6.2、详细说明
下面这幅图,展示了 Source 并行度为 1,FlatMap、KeyAggregation、Sink并行度均为 2,最终以 5 个并行的线程来执行的优化过程
上图中将 KeyAggregation 和 Sink 两个 operator 进行了合并,因为这两个合并后并不会改变整体的拓扑结构。
但是,并不是任意两个 operator 就能 chain 一起的。其条件还是很苛刻的:
- 上下游的并行度一致
- 下游节点的入度为1 (也就是说下游节点没有来自其他节点的输入)
- 上下游节点都在同一个 slot group 中(下面会解释 slot group)
- 下游节点的 chain 策略为 ALWAYS(可以与上下游链接,map、flatmap、filter等默认是ALWAYS)
- 上游节点的 chain 策略为 ALWAYS 或 HEAD(只能与下游链接,不能与上游链接,Source默认是HEAD)
- 两个节点间数据分区方式是 forward
- 用户没有禁用 chain
2.7、slotgroup
为了防止同一个 slot 包含太多的 task,或者我们希望把计算逻辑复杂的算子单独使用 slot ,提高计算速度,Flink 提供了资源组(group) 的概念。group 就是对 operator 进行分组,同一 group 的不同 operator task 可以共享同一个 slot。默认所有 operator 属于同一个组"default",也就是所有 operator task 可以共享一个 slot。我们可以通过 slotSharingGroup() 为不同的 operator 设置不同的group
dataStream.filter(...).slotSharingGroup("groupName");