一.Spark UI 选项卡的组成
1.Jobs
1.1 首页
补充:
- Scheduling Mode:
- application 中 task 任务的调度策略,由参数 spark.scheduler.mode 来设置,可选的参数有 FAIR 和 FIFO。这与 Yarn 的资源调度策略的层级不同,Yarn 的资源调度是针对集群中不同 application 间的,而 spark scheduler mode 则是针对 application 内部 task set(stage)级别的资源分配
1.2 Event TImeline
- Event Timeline:
- 在 application 应用运行期间,Job 和 Exector 的增加和删除事件进行图形化的展现。这个就是用来表示调度 Job 何时启动何时结束,以及 Excutor 何时加入何时移除。
-
一般用来查看资源是否充足,通过看add executor时间是否很长
例子1:
例子2
1.3 Job Detail
- Stauts:展示 Job 的当前状态信息。
- Active Stages:正在运行的 stages 信息,点击某个 stage 可进入查看具体的 stage 信息。
- Pending Stages:排队的 stages 信息,根据解析的 DAG 图 stage 可并发提交运行,而有依赖的 stage 未运行完时则处于等待队列中。
示例:
- input:
- 上图中发现input=35.6G,表示该stage有读表或文件的操作。
- 上面这个是input stage,有input 、
- shuffle write
- 为下游stage输出
2. stages
- Completed Stages:
- 已完成 Stage 的基本信息。
- Skipped Stages:
- 1.因为 shuffle 的落地文件如果还有对应 RDD 使用,它就不会被垃圾回收掉,stage 发现数据已经在磁盘或内存中了,那么就不会重新计算了。
- 如果发现下游fetch数据失败了,此时需要通过血缘关系重新计算,那么上游的stage如果还在内存中,那么就不用跑了
- 2.在将 Stage 分解成 TaskSet 的时候,如果一个 RDD 已经Cache 到了 BlockManager(BlockManager 是 spark 管理自己的存储的组件,包括 memory 和 disk,比如RDD-Cache、 Shuffle-output、broadcast 都由它管理),则这个 RDD 对应的所有祖宗 Stage 都不会分解成TaskSet 进行执行,所以这些祖宗 Stage 和它们对应的 Task 就会在 Spark UI 上显示为 skipped。
- 因为将stage cache到内存,那么通过血缘关系,就不需要从源头重新读数据,那么这个rdd对应的祖宗stage都可以跳过。
- 3.大多数 skipped stage都是input stage或有一些shuffle read 的stage,因为都cache过了,那么久可以跳过。
- 1.因为 shuffle 的落地文件如果还有对应 RDD 使用,它就不会被垃圾回收掉,stage 发现数据已经在磁盘或内存中了,那么就不会重新计算了。
运行中的界面
2.1stage detail
总览
- Total time across all tasks:当前 stage 中所有 task 花费的时间和。
- Locality Level Summary:不同本地化级别下的任务数,本地化级别是指数据与计算间的关系。
- PROCESS_LOCAL 进程本地化:task 与计算的数据在同一个 Executor 中。
- NODE_LOCAL 节点本地化:
- 情况一:task 要计算的数据是在同一个 Worker 的不同 Executor 进程中;
- 情况二:task 要计算的数据是在同一个 Worker 的磁盘上,或在 HDFS 上,恰好有 block 在同一个节点上。
- RACK_LOCAL 机架本地化,数据在同一机架的不同节点上:
- 情况一:task 计算的数据在 Worker2 的 Executor 中;
- 情况二:task 计算的数据在 Worker2 的磁盘上。
- ANY 跨机架,数据在非同一机架的网络上,速度最慢。
- 补充:
- 一般生产环境中网络环境比较好,本地化这个功能是关闭的,不是很有用
- 本地化的实现是通过等待,一个task过来后,先等10秒(配置的)看是否能获取一个资源恰好和该task是同一个机器,如果有就提交作业,没有的话就接着等等
- 因这个等待会比较耗时,所以一般都是让其走网络,不强调本地化。
- 一般生产环境中网络环境比较好,本地化这个功能是关闭的,不是很有用
- Input Size/Records:输入的数据字节数大小/记录条数。
- Output Size / Records:输出的数据字节数大小/记录条数。
- Shuffle Write:
- 为下一个依赖的 stage 提供输入数据,shuffle 过程中通过网络传输的数据字节数/记录条数。
- 应该尽量减少 shuffle 的数据量及其操作次数,这是 spark 任务优化的一条基本原则。
- Shuffle read:
- 总的 shuffle 字节数,包括本地节点和远程节点的数据。
- 一般第一个stage就不会有shffle read,肯定是写,下游stage一般是读
DAG
- DAG Visualization:
- 当前 stage 中包含的详细的 tranformation 操作流程图。
- 显示的比较初级,一般也不怎么看
- Show Additional Metrics:显示额外的一些指标。鼠标移上去会有相应的解释。
- Scheduler Delay:调度延迟时间,包含把任务从调度器输送给 excutor,并且把任务的结果从 excutor 返回给调度器。如果调度时间比较久,则考虑降低任务的数量,并且降低任务结果大小
- Task Deserialization Time:反序列化 excutor 的任务,也包含读取广播任务的时间
- Result Serialization Time:在 executor 上序列化task结果所花费的时间。
- Getting Result Time:从 executor 中获取结果的时间。
- Event Timeline: 清楚地展示在每个 Executor 上各个 task 的各个阶段的时间统计信息,可以清楚地看到 task 任务时间是否有明显倾斜,以及倾斜的时间主要是属于哪个阶段,从而有针对性的进行优化。
- 以上几个点都不怎么看
Summary Metrics(重点)
- Summary Metrics for XXX Completed Tasks:已完成的 Task 的指标摘要。
- Duration:task 持续时间
- GC Time:task GC 消耗时间
- Output Write Time:输出写时间
- Output Size/ Records:输出数量大小,条数。
- Shuffle HDFS Read Time:shuffle 中间结果从 HDFS 读取时间
- Shuffle Read Size/Records:shuffle 读入数据大小/条数
- Shuffle Spill Time:shuffle 中间结果溢写时间。
- 折现指标只会在有值时展示
- 主要通看 input 、shuffle write的几个分位线的数据是否平均,是否有数据倾斜的问题
Aggregated Metrics by Executor
- Aggregated Metrics by Executor:executor维度做聚合(看的不是很多)
- 汇总指标。将 task 运行的指标信息按 Executor 做聚合后的统计信息,并可查看某个 Excutor 上任务运行的日志信息。这里可以看到 executor 完成 task 的情况,读入的数据量,shufflewrite、shuffle read 数据量,根据这些指标判断节点是否健康。
Tasks
- Tasks:
- 当前 stage 中所有任务运行的明细信息。右边部分都是需要关注的点。
- 因为有排序效果,一般可以通过列排序找找一些关键指标:
- input size 查看哪些task输入比较多,该task也极有可能是比较慢的task
- erros 排序,查找错误都在哪些机器上分布,如果都在一台机器,则会很快定位出现问题的机器
- 通过 index、task id在对应 log中检索对应的task日志
speculative:推测执行
因为有推测执行,所以如果发现task killed是正常现象
3.利用 Web UI 定位问题
查看stages
在 stage 页面可以看到 stage 44、58 两个执行时间很长,并且 shuffle 数据量也很大。
- stage id 44 shuffle read 6.8TB,但是写出18TB,有一个明显的数据膨胀(数据翻了一番多)
- stage id 58 shffle read 18T,最终output表/文件中的数据是5.4TB
- 但是也只能发现两个stage shuffle数据量大,执行时间长,具体的信息就看不见了,具体需要看详情
查看stage id 58的详情
Summary Metrics for n Completed Tasks(通过分位线判断是否发生数据倾斜)
可以看出 shuffle spill 耗时最多,其他指标都比较均匀,没有出现数据倾斜。
- shuffle spill 中位线 40s,但是最大是1.2h,这说明有一个或者几个task在shuffle spill 上花了1.2小时
- spill 多跟内存紧张和磁盘有关,如果在不同节点上有几个 task spill 特别慢,且 GC 时间长,则可能内存不足。
- 如果 spill 慢发生在同一个节点上,则有可能和节点运行状态相关,如磁盘问题。
- 同时得出一个比较关键的结论,该stage中的task没有发生数据倾斜
- 没有 shuffle write、input
- shuffle read、output 中位线观察都是很均匀
- 通过读或者写的几个中位线观察是否有数据倾斜
Tasks
按照时间倒叙,Detail stage 页面可以找到 task 执行时间很长,shuffle spill 数据很大,这里产生了大量的 IO。
stage 58 总结
- 此时可以得出一个结论:
- stage id 58 没有发生数据倾斜,但是shuffle read 18TB,output 5TB,数据量实在太大,在10000个task中跑批是比较慢的,总共花了3.4小时,甚至有一些task要跑 1.8h、1.6h、1.4h等,都是小时级别的。
- 解决方法:
- 在资源允许的情况下,可以进一步扩大并行度,增加 Executor 来进行优化。后续并行度改为10000->15000,观察时间有明显提升。
查看 stage 44 的详情
Summary Metrics for n Completed Tasks(通过分位线判断是否发生数据倾斜)
Stage44 看出 shuffle 数据并不均匀,发生来数据倾斜,观察依据:
- shuffle read 中位线只有600MB,90线、95线、99线相差不多,但是max线居然有8.2GB,比中位线大了10倍多
- shuffle write 中位线只有1G多,但是max居然有15G多,比中位线大了将近15倍,比99线大了3倍多
- duration 大部分执行时间都是分钟级别,但是max是小时级别
接下来要通过sql ui观察那部分执行逻辑发生倾斜
sql ui中观察
通过sql 标签 定位
stage no 44 的执行部分:
我们可以根据 SQL 执行图上找到 stage44,并查到发生 shuffle 操作的 join key(item_id),顺着其上游可找到其中一个表 device_item。
找sql代码
接着可以从任务 SQL 中找到问题出现的部分。定位到了问题所在,就可以通过一些针对数据倾斜的优化参数或者处理方法进行一一实验,并通过对比得出最优解。
- 在新版本中可以通过AE做优化
- 也可以让客户做优化,如拆sql,加盐,id拆分等
- 做分桶避免shuffle
- 也可以找业务确认,是否接收慢
4.Environment
介绍:
- Environment 选项卡提供有关 Spark 应用程序中使用的各种属性和环境变量的信息。
- 用户可以通过这个选项卡得到非常有用的各种 Spark 属性信息,而不用去翻找属性配置文件。通过平台工具提交任务时,其默认参数配置可以在这里查找。
- Environment 除了展示 Spark 参数意外,有一个重要的功能是明确参数是由谁设置的。
- 例如用户设置了参数spark.sql.shuffle.partitions=1000, 平台默认参数是 spark.sql.shuffle.partitions=2000,智能优化器根据作业运行历史为本次执行设置了 spark.sql.shuffle.partitions=3000,那么最后生效的参数可以从 Environment 获取,例如是 3000,同时还可以找出到底是谁设置了最后生效的值。
- 如:spark.final.spark.sql.shuffle.partitions=HBO,标注是HBO设置的
- 例如用户设置了参数spark.sql.shuffle.partitions=1000, 平台默认参数是 spark.sql.shuffle.partitions=2000,智能优化器根据作业运行历史为本次执行设置了 spark.sql.shuffle.partitions=3000,那么最后生效的参数可以从 Environment 获取,例如是 3000,同时还可以找出到底是谁设置了最后生效的值。
5.Executor
- Executors 选项卡:
- 提供了关于内存、CPU 核和其他被 Executors 使用的资源的信息。
- 这些信息在 Executor 级别和汇总级别都可以获取到。
- 一方面通过它可以看出来每个 Excutor 是否发生了数据倾斜。
- 另一方面可以具体分析目前的应用是否产生了大量的 shuffle,是否可以通过增加并行度来减少 shuffle 的数据量。
- 提供了关于内存、CPU 核和其他被 Executors 使用的资源的信息。
- Summary:
- 该 application 运行过程中使用 Executor 的统计信息。
- Executors:
- 每个 Excutor的详细信息(包含 driver),可以点击查看某个 Executor 中任务运行的详细日志。
二.如何通过 Spark UI 定位问题
1.如何找 Web UI地址
2.怎么看 Spark Driver/Executor 日志
因为executor里面会跑很多task,当task有问题的时候,需要通过看executor log定位task日志
3.如何定位分配资源不足
如果 UI 页面上申请到的 executors 数量远小于配置的数量(静态资源:spark.executor.instances,动态资源:spark.dynamicAllocation.maxExecutors),则表示队列资源不充足。
- 如配置的数量是300,但是active(6),远远小于配置数量,则表示队列资源不足。
4.如何定位Executor太少
- 如果 Active Tasks 等于或接近 Cores,则表示申请的资源被占满。
- 当任务的 task 较多时,可以适当增加 executors 数提高任务的并行度。
5.如何定位Input读慢
tasks标签观察
5.1 task很少的场景
- 如果一个一个stage中有大量的spill 操作,就说明input stage 逻辑是比较复杂的
- input stage 必然有 input size列
- 现象:Input Stage 包含较复杂的计算,task 数不多,单个 task 很慢。
- 如果input stage执行的很慢,可以适当增加input stage个数
- 如果 task 数不太多的情况下,可以下调 spark.sql.files.maxPartitionBytes 以扩大并发为代价,缩短整体运行时间。
5.2 task 很多的场景
- Input Stage 单个 task 处理数据量很小,每个 task 很快,但 task 数特别多,时间在调度上浪费很多。
- 此时如果调大 spark.sql.files.maxPartitionBytes 虽然可以减少task数,但参数设置会影响全局,如果要读多个表,可能对其他 input stage 造成影响。
- 对于这种情况,可以设置 spark.datasource.splits.max。
- 在本例中,通过设置 setspark.datasource.splits.max=40000,将 task 数从 17W + 减少到 4W+,单个 task 数据量增加,但总体调度时间节约,stage 时间明显缩短。
- 即调高单个task能处理的数据量
5.6 SparkSQL Input Split 划分原理
Partition / Split 大小取决于四个条件:
- 1. spark.sql.files.maxPartitionBytes(默认值128MB)
- 每个rdd partition能读多少数据,一般都是通过该值直接影响并发度,具体如何生效可以看maxSplitBytes部分
- 2. spark.sql.files.openCostInBytes(默认值4MB)
- io读文件是有cost的,不建议小于4M开文件
- 3. spark.default.parallelism (默认并行度,取决于调度平台(local standalone yarn mesos)。
- 对于Yarn,该值等于当前可用 core 总数。在启动 DynamicAllocation 的情况下,该值取决于启动 Job 时的实时可用 core 的数量)
- 一般不调,交给调度平台。
- 4. 待读取数据总量 totalBytes
Split 的最大值取决于如下公式:
- val bytesPerCore = totalBytes / defaultParallelism
- 总值/默认分区,这个一般没法控制
- var maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
- 如像增大并发度,即增加task,那就调小maxSplitBytes
- 如果想减少并发度,那就增大maxSplitBytes
Spark 将输入文件按每 maxSplitBytes 切分成一个个 partition
补充:
1.spark sql如何为每个job设置executor的cpu、内存?
spark 中是没法为所有job定制化设置这些资源,一般需要在每个app粒度启动的时候设置:
executor:
- spark.executor.memory 相当于 JVM XMX
- spark.executor.cores=4 相当于 thread =4
driver:
- spark.driver.memory
- spark.driver.cores=4
需要每个application中去调这些参数,最小单元是app这个维度,无法为每个job定制资源。
- 都有默认值,一般放在spark安装包的conf/spark-defaults.conf中设置
- 当然也可以在提交sql的时候指定
一般在yarn里面,一个个spark作业就是application,一个application可以启很多个job
三.深入理解SQL页面
1.介绍
- Spark UI 的 SQL 页面是非常重要的一个页面,通过该页面我们可以看到一个查询计划的执行过程。
- 查询计划是了解查询执行细节的入口点。 它包含许多有用的信息,并提供有关如何执行查询的见解。
- 这非常重要,尤其是在复杂的查询中或执行时间过长且成本高昂的情况下。根据查询计划中的信息,我们可能会找出效率不高的地方,并决定重写部分查询以获得更好的性能。
- 根据前面课程的内容,我们知道 SQL 查询计划组织成树结构,每个节点代表一个运算符,提供有关执行的基本细节。
2.一个例子:
sql
val questionsDF = spark.read.json("/test/sparksql/simple/data/questions-json")
val usersDF = spark.read.parquet("/test/sparksql/simple/data/users")
val res = questionsDF.filter(col("score") > 0).groupBy("user_id").agg(count("*").alias("cnt")).join(usersDF, "user_id")
spark-shell
[root@hadoop102 spark-3.2.1]# spark-shell
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2022-05-29 22:38:24,941 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark context Web UI available at http://hadoop102:4040
Spark context available as 'sc' (master = local[*], app id = local-1653835106685).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 3.2.1
/_/
Using Scala version 2.12.15 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_144)
Type in expressions to have them evaluated.
Type :help for more information.
scala>
建表questionsDF
scala> val questionsDF = spark.read.json("/test/sparksql/simple/data/questions-json")
questionsDF: org.apache.spark.sql.DataFrame = [accepted_answer_id: bigint, answers: bigint ... 9 more fields]
通过ui观看,4个任务
查看stages
建表usersDF
val usersDF = spark.read.parquet("/test/sparksql/simple/data/users")
查看列信息
执行sql
val res = questionsDF.filter(col("score") > 0).groupBy("user_id").agg(count("*").alias("cnt")).join(usersDF, "user_id")
查看plan
scala> res.queryExecution.logical
res4: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
'Join UsingJoin(Inner,List(user_id)) ##inner join ,join id user_id,根节点,向下看
:- Aggregate [user_id#16L], [user_id#16L, count(1) AS cnt#58L] ##join前做聚合,聚合后有count
: +- Filter (score#13L > cast(0 as bigint)) ##聚合前 做过滤
: +- Relation [accepted_answer_id#7L,answers#8L,body#9,comments#10L,creation_date#11,question_id#12L,score#13L,tags#14,title#15,user_id#16L,views#17L] json ## 读questionsDF表
+- Relation [user_id#29L,display_name#30,about#31,location#32,downvotes#33L,upvotes#34L,reputation#35L,views#36L] parquet ##读usersDF 表
这只是计划,还没有执行,需要触发action,如show
触发action,查看sql
show默认展示20行,结果如下
此时真正执行sql了
通过spark ui查看sql
detail
Physical Plan
每个节点的操作,根据计划树的(n)查找对应的算子操作
SQL 查询计划图像化
图像化的具体细节可以通过上面的detail查看
详解
两个scan
查找stage
exchange
3.知识补充
WholeStageCodegen
- 在物理计划的图形表示中,可以看到运算符被分组为蓝色的大矩形。 这些大矩形对应于代码生成阶段。
- 这是一个优化功能,发生在物理规划阶段。
- 其想法是采用支持代码生成的运算符并将其折叠在一起,以通过消除虚函数调用来加速执行。
- 并非所有运算符都支持代码生成,因此某些运算符(例如 Exchange)不是大矩形的一部分。
- 在我们的示例中,有三个 codegen stage 对应于三个大矩形,在格式化的计划输出中,可以在 operator 的括号中看到 codegenstage 的 id。
- 同样从树中,可以判断算子是否支持 codegen,因为如果支持 codegen,算子前会带 * 号。
Scan Parquet
- Scan parquet 运算符表示从 parquet 文件格式读取数据。
- 我们可以从图中直接看到选择哪些列。
- 尽管我们没有在查询中选择特定字段,但优化器中有一个ColumnPruning 规则还是会被应用,它确保仅从源中选择那些实际需要的列。
- 另外需要关心的是两种过滤器:
- PartitionFilters
- PartitionFilters 是应用于对表的分区列进行过滤。允许跳过我们不需要的分区。所以很多时候我们需要把鼠标移上去查看下。
- PushedFilters
- PushedFilters 是可以直接下推到 parquet 文件的字段过滤器,如果 parquet 文件按这些过滤列排序,它们会很有用。另外,parquet foot 包含row group的元数据(如统计信息,每个 row group 的最小值和最大值)。基于这些元信息,Spark 可以决定是否读取该 row group。
- PartitionFilters
Filter
- Filter 算子理解起来非常直观,它只是表示过滤条件。
- 可能不太明显的是运算符是如何创建的,因为它通常不直接对应于查询中使用的过滤条件。
- 原因是所有过滤器首先由 Catalyst 优化器处理,该优化器可能会修改和重新定位它们。
- 在将逻辑过滤器转换为物理运算符之前,有几个规则应用于逻辑过滤器:
- PushDownPredicates:
- 此规则将通过其他几个运算符将过滤器推到更靠近源的位置,但不是全部。
- 例如,下推不会穿过nondeterministic表达式/算子。
- 如果我们使用诸如 first、last、collect_set、collect_list、rand之类的函数,过滤器将不会被推送通过它们,因为这些函数在 Spark 中是nondeterministic(不确定的,每次执行结果不一样)的。
- 此规则将通过其他几个运算符将过滤器推到更靠近源的位置,但不是全部。
- CombineFilters:
- 将两个相邻的运算符组合为一个(它将来自两个过滤器的条件收集到一个复杂条件中)。
- InferFiltersFromConstraints:
- 这个规则实际上创建了一个新的过滤器运算符,例如从一个连接条件(从一个简单的内部连接它将创建一个过滤条件连接键不为空)。
- PruneFilters:
- 删除多余的过滤器(例如,如果过滤器总是评估为 True)。
- PushDownPredicates:
Project
- 此运算符表示将投影(选择)哪些列。 每次我们在 DataFrame 上调用 select、withColumn 或 drop 转换时,Spark 都会将 Project 运算符添加到逻辑计划中,然后将其转换为物理计划中的对应项。
- 同样,在转换之前对其应用了一些优化规则:
- ColumnPruning:修剪不需要的列以减少将要扫描的数据量。
- CollapseProject:它将相邻的 Project 运算符合并为一个。
- PushProjectionThroughUnion:下推 Project 到 Union 的两侧
(记住,下推的下是指往叶子结点推)。
上面这个物理计划图形的数据是从上往下流入,即最上面是叶子结点,最下面是根节点。
detail中与之展现相反,根在上面
Exchange
- Exchange 运算符代表 shuffle,即物理数据移动。
- 这个操作被认为是相当昂贵的,因为它通过网络移动数据。 查询计划中的信息还包含有关如何重新分区数据的详细信息。 在我们的示例中,它是 hashpartitioning(user_id, 200),如下所示:
- 这意味着数据将根据 user_id 列重新分区为 200 个分区,所有具有相同 user_id 值的行都属于同一个分区,并将位于同一个 executor 上。
- 为确保准确创建 200 个分区,Spark 计算 user_id 的哈希值并模 200。这样做的结果有可能发生的某些分区可能为空。
- 其他 partitioning 方式还有:
- RoundRobinPartitioning:数据将随机分布到 n 个大小大致相等的分区中,其中 n 由用户在 repartition(n) 函数中指定。
- SinglePartition:所有数据都被移动到单个分区到单个执行程序。 例如,当调用窗口成为整个 DataFrame 的窗口函数时(当您没有在窗口定义中为 partitionBy() 函数提供参数时),就会发生这种情况。
- RangePartitioning:在调用 orderBy 或排序转换后对数据进行排序时使用此方式。
ShuffleQueryStage/AQEShuffleRead
- 在3.0以上版本,在开启了 AQE 优化后,Exchange 算子之后还会跟一个ShuffleQueryStage,但是在查询图上是不显示的。
detail中有,但是上面的查询图中不显示
- 出现 ShuffleQueryStage,Spark 会截断查询计划,可以将ShuffleQueryStage 理解为叶子结点。
- 虽然通过上图发现 算子(1)是叶子结点,1-5会先执行,AE会截断查询计划,将1-5重新计划变成了(6),所以算子(6)就代表了算子(1)-(5),所以可以将之理解为叶子结点。
- 以上规则仅对于AE有效,即ShuffleQueryStage在AE中就是叶子结点。
- 在 ShuffleQueryStage 之后,可能出现 AQEShuffleRead(3.0叫CostumShuffleRead)。
- 会根据上一个 Stage 的 shuffle write 数据进行计划调整,如本例中的将200个 partition 进行合并(coalesced),合并成1个分区。
- 另外,Exchange 上的明细信息也能帮我们判断有没有数据倾斜:
- shuffle bytes written total (min, med, max (stageId: taskId))416.9 KiB (103.9 KiB, 104.1 KiB, 104.9 KiB (stage 8.0: task 13))
- 如果max 明显大于 med那么就有可能数据倾斜
HashAggregate
- HashAggregate 表示使用 Hash 方式进行的数据聚合。
- 通常两个 HashAggregate 成对出现。
- 有两个 HashAggregate 的原因是:
- 第一个进行部分聚合,它聚合每个 partition,或者叫 map 端聚合,用于减少数据的 shuffle。
- 即先本地聚合,map端聚合,减少数据广播过程中的传输。
- 第一个进行部分聚合,它聚合每个 partition,或者叫 map 端聚合,用于减少数据的 shuffle。
- 在示例中,Functions 字段中显示 partial_count(1)。部分聚合的结果的最终在第二个 HashAggregate 上合并。
- 该运算符还具有 results 字段显示聚合后可用的列。
BroadcastHashJoin & BroadcastExchange
- BroadcastHashJoin(BHJ)是一个表示特定连接算法的算子。 除了这个之外,Spark 中还有其他可用的连接算法,
- 例如 SortMergeJoin 或 ShuffleHashJoin。 BHJ 总是与 BroadcastExchange 成对出现,后者是一个表示广播shuffle 的算子——数据将被收集到 Driver,然后发送到每个执行程序,它可以用于连接。
- 为什么会有broadcastexchange?
- 多了这么个算子是为了复用。
- 相当于多个作业
三个作业,broadcastexchange必然是一个作业,看作业不是很重要
ColumnarToRow
- 这是 Spark 3.0 中引入的一个新算子,它用作列执行和行执行之间的转换。在 Spark 中,Scan Parquet 是可以进行向量化读的。
- 但是 Spark 内部其他算子并没有实现向量化,所以需要将列执行再转换成行执行模式。
4.问题排查
关键字:number of output rows。
- 解释:数据膨胀时通过此关键字确定膨胀的位置。
- 影响:作业运行慢/卡住。
- 排查:在问题stage的所有算子里找到 exchange 部分,看到他们的 number of output rows,然后上下对比 rows 条数看看哪个环节有膨胀。
- 很多算子中都有,一般在join的过程中容易出现数据膨胀,所以重点看join前后数据变化
- 建议:用户自行排查处理。
如果上图中BroadcastHashJoin 最终是200多明显大于HashAggregate,那就是发生膨胀,一般聚合是将减少数据,但是join有可能增加数据,所以也是比较容易发生数据膨胀的算子。
关键字:limit。
- 解释:导致 task 数量少。
- 影响:作业运行慢/卡住。
- 排查:limit 会导致这个算子只能有一个 task。
- 建议:要求用户减少 limit 的数量,这样尽量避免单 task 处理太多数据。
关键字:AQEShuffleReader/CustomShuffleReader。
- 解释:发生数据倾斜。
- 影响:作业运行慢/卡住。
- 排查:
- 通过 CustomShuffleReader 里的 med 和 max 的差距可以判断出是否有数据倾斜。
- 确定数据倾斜的字段的SQL:Select count(distinct *) from xxx groupby '关联字段‘。
- 看哪个建数据多
- 建议:告诉用户问题所在,并要求用户自行调整。未开启 AQE 的开启 AQE。
关键字:aggregate。
- 解释:导致 task 数量少。
- 影响:作业运行慢/卡住。
- 排查:
- aggregate 有可能造成 task 少的原因是可能做 groupby 的字段本身只有6个值,所以shuffle 完只能有6个 task。
- 这个时候可能每个task中数据都非常多,但是没法提高并行度,因为分组只有6个。
- 建议:
- 确定 groupby 的字段数量:Select count(distinct 关联字段) from xxx。
- 通过 SQL 找到字段值少的 groupby 字段,让用户自行优化 SQL。
关键字:BroadcastNestedLoopJoin 或 Cartesian。
- 解释:产生笛卡尔积。
- 影响:作业运行慢 / driver oom。
- 排查:分别从左右两边往上找,找到做 Join 的表是哪两个,之后可推测出 “左表.关联字段” 和“右表.关联字段”。
- 通常会造成笛卡尔积的情况:
- 1. 没指定关联条件;
- 没写join条件
- 2. 关联字段用 or 关联;
- 3. 非等值关联 a.id > b.id。
- 1. 没指定关联条件;
- 建议:提示用户自行优化。
关键字:skew。
- 解释:发生数据倾斜并被 Spark 识别。
- 影响:可能导致作业慢。
- 排查:skew 存在,说明有倾斜,并被 Spark 识别了,且已经做过一定处理,但如果依旧存在倾斜问题,可以从 skew 的上下游找一下 Exchange 或 CustomShuffleReader 算子,然后对比 med 和 max 判断是否依旧存在数据倾斜。
- 建议:• 确定数据倾斜的字段:Select count(distinct *) from xxx groupby '关联字段‘;
- 告诉用户问题所在,并要求用户自行调整。
关键字:sort。
- 解释:导致 task 数量少。
- 影响:当 sort 数据记录很多时,作业运行慢/卡住。
- 建议:通过 SQL 找到 orderby 命令的位置,让用户自行优化 SQL。
关键字:number of files read。
- 解释:算子读的文件数。
- 影响:读了超级多的文件,每个文件一个 task,导致任务时间长。
- 排查:如果红框中的指标超级多,说明读了太多文件,建立了太多 task。
- 建议:让作业 owner 减少读取分区量,分成多个 SQL 跑。
关键字:BroadcastHashJoin。
- 解释:走广播模式。
- 影响:driver OOM。
- 排查:分别从左右两边往上找,找到做 Join 的表是哪两个,之后可推测出 "左表.关联字段" 和 "右表.关联字段"
- 建议:强行关闭广播模式。
- set spark.sql.adaptiveBroadcastJoinThreshold=-1;
- set spark.sql.autoBroadcastJoinThreshold=-1;