Kyuubi 解锁 Spark SQL on CDH 6

背景

CDH 最后一个免费版 6.3.2 发布一年有余,离线计算核心组件版本停在了 Hadoop 3.0.0,Hive 2.1.1,Spark 2.4.0。随着 Spark 3.0 的重磅发布,在性能方面又迎来了一次飞跃,本文将描述把 Spark 3 集成到 CDH 6.3.1(未开启 Kerberos) 的过程,并使用 Kyuubi 替换 HiveServer2,实现 OLAP、ETL 等场景下从 HiveQL 到 SparkSQL 的无缝迁移,享受 5x-10x 的性能红利。

CDH 缺陷修复

[ORC-125] 修复 Hive 不能读取高版本 ORC 写入的数据

当使用 Hive 读取由 Presto 或者 Spark 等写入的 ORC 文件时,会出现以下错误。

ORC split generation failed with exception: java.lang.ArrayIndexOutOfBoundsException: 6

该问题在 ORC 上游被修复 [ORC-125] Correct OrcFile.WriterVersion to correctly use FUTURE

ORC 最早是 Hive 的一个子项目,在 CDH 6 集成的 Hive 2.1 这个版本里,ORC 还没有分离出去,所以这个问题要在 Hive 源码里修复。

我做了一个打包好的修复版本,GitHub 传送门,下载更换 /opt/cloudera/parcels/CDH/lib/hive/lib 路径下的 hive-exec-2.1.1-cdh6.3.1.jar, hive-orc-2.1.1-cdh6.3.1.jar 即可。(至少需要更换 Hadoop Client、HiveServer2 节点,如果你不知道我在说什么,就把所有节点都换掉)

Spark 3.1

[SPARK-33212] Spark 使用 Hadoop Shaded Client

Hadoop 3.0 提供了 Shaded Client,用于下游项目规避依赖冲突 [HADOOP-11656] Classpath isolation for downstream clients

Spark 3.2 在 hadoop-3.2 profile 中切换到了 Hadoop Shaded Client
[SPARK-33212] Upgrade to Hadoop 3.2.2 and move to shaded clients for Hadoop 3.x profile

该更改不是必须的,但个人建议从 Spark 主线将该补丁移植到 branch-3.1 使用,以规避潜在的依赖冲突。

[CDH-71907] Spark HiveShim 适配 CDH Hive

Spark 通过反射和隔离的类加载器来实现对多版本 Hive Metastore 的支持,详情参考
Interacting with Different Versions of Hive Metastore - Spark Documentation

CDH 6 使用修改过的 Hive 2.1.1 版本,其方法签名与 Apache 版本有所不同,故 Spark HiveShim 反射调用会出现找不到方法签名,需要手动将 CDH-71907 补丁打到 branch-3.1。

Spark External Shuffle Service 协议兼容

Spark shuffle 时,mapper 会将数据写入到本地磁盘而非 HDFS,引入 ESS 后,会将文件信息注册到 ESS 中,将 mapper 与 reducer 解耦。CDH 中,默认会启用 Spark Yarn External Shuffle Service,作为 YARN AUX Service 在所有 Yarn Node 上启动。

Spark 3 修改了 shuffle 通信协议,在与 CDH 2.4 版本的 ESS 交互时,需要设置 spark.shuffle.useOldFetchProtocol=true,否则可能报如下错误。[SPARK-29435] Spark 3 doesn't work with older shuffle service

IllegalArgumentException: Unexpected message type: <number>.

Spark 版本迁移指南

如果你有现存的基于 CDH Spark 2.4 的 Spark SQL/Job,在将其迁移至 Spark 3 版本前,请参阅完整的官方迁移指南。

Spark 部署

官方文档 Running Spark on YARN - Documentation 中提到

To make Spark runtime jars accessible from YARN side, you can specify spark.yarn.archive or spark.yarn.jars. For details please refer to Spark Properties. If neither spark.yarn.archive nor spark.yarn.jars is specified, Spark will create a zip file with all jars under $SPARK_HOME/jars and upload it to the distributed cache.

因此,无需在 CDH 所有节点上部署 Spark 3,只需在 Hadoop Client 节点上部署 Spark 3 即可。

如果你对集群权限管理没有十分严格的要求,请使用 hive 用户以避免权限问题。

我基于 Spark 3.1.2 制作了一个适配 CDH 6 的版本, GitHub 传送门 ,下载解压至 /opt,并软链至 /opt/spark3

[hive@cdh-kyuubi]$ ls -l /opt | grep spark
lrwxrwxrwx  1 root         root           39 Aug 10 18:46 spark3 -> /opt/spark-3.1.2-cdh6-bin-3.2.2
drwxr-xr-x 13 hive         hive         4096 Aug 10 18:46 spark-3.1.2-cdh6-bin-3.2.2

配置 Hadoop、Hive

CDH 会将配置文件自动分发到所有节点 /etc 目录下,建立软链即可。

ln -s /etc/hadoop/conf/core-site.xml /opt/spark3/conf/
ln -s /etc/hadoop/conf/hdfs-site.xml /opt/spark3/conf/
ln -s /etc/hadoop/conf/yarn-site.xml /opt/spark3/conf/
ln -s /etc/hive/conf/hive-site.xml /opt/spark3/conf/

配置 Spark 环境变量 /opt/spark3/conf/spark-env.sh

#!/usr/bin/env bash

export HADOOP_CONF_DIR=/etc/hadoop/conf:/etc/hive/conf
export YARN_CONF_DIR=/etc/hadoop/conf.cloudera.yarn:/etc/hive/conf

配置 Spark 默认参数 /opt/spark3/conf/spark-defaults.conf

请参考 Configuration - Spark Documentation 根据集群环境实际情况进行微调

spark.authenticate=false
spark.io.encryption.enabled=false
spark.network.crypto.enabled=false

spark.eventLog.enabled=true
spark.eventLog.dir=hdfs://nameservice1/user/spark/applicationHistory
spark.driver.log.dfsDir=/user/spark/driverLogs
spark.driver.log.persistToDfs.enabled=true

spark.files.overwrite=true
spark.files.useFetchCache=false

spark.serializer=org.apache.spark.serializer.KryoSerializer

spark.shuffle.service.enabled=true
spark.shuffle.service.port=7337
spark.shuffle.useOldFetchProtocol=true

spark.ui.enabled=true
spark.ui.killEnabled=true
spark.yarn.historyServer.address=http://cdh-master2:18088
spark.yarn.historyServer.allowTracking=true

spark.master=yarn
spark.submit.deployMode=cluster

spark.driver.memory=2G
spark.executor.cores=6
spark.executor.memory=8G
spark.executor.memoryOverhead=2G
spark.memory.offHeap.enabled=true
spark.memory.offHeap.size=2G

spark.dynamicAllocation.enabled=true
spark.dynamicAllocation.executorIdleTimeout=60
spark.dynamicAllocation.minExecutors=0
spark.dynamicAllocation.schedulerBacklogTimeout=1

spark.sql.cbo.enabled=true
spark.sql.cbo.starSchemaDetection=true
spark.sql.datetime.java8API.enabled=false
spark.sql.sources.partitionOverwriteMode=dynamic

spark.sql.hive.convertMetastoreParquet=false
spark.sql.hive.convertMetastoreParquet.mergeSchema=false
spark.sql.hive.metastore.version=2.1.1
spark.sql.hive.metastore.jars=/opt/cloudera/parcels/CDH/lib/hive/lib/*

spark.sql.orc.mergeSchema=true
spark.sql.parquet.mergeSchema=true
spark.sql.parquet.writeLegacyFormat=true

spark.sql.adaptive.enabled=true
spark.sql.adaptive.forceApply=false
spark.sql.adaptive.logLevel=info
spark.sql.adaptive.advisoryPartitionSizeInBytes=256m
spark.sql.adaptive.coalescePartitions.enabled=true
spark.sql.adaptive.coalescePartitions.minPartitionNum=1
spark.sql.adaptive.coalescePartitions.initialPartitionNum=1024
spark.sql.adaptive.fetchShuffleBlocksInBatch=true
spark.sql.adaptive.localShuffleReader.enabled=true
spark.sql.adaptive.skewJoin.enabled=true
spark.sql.adaptive.skewJoin.skewedPartitionFactor=5
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes=128m
spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin=0.2
spark.sql.autoBroadcastJoinThreshold=-1

验证 spark-shell 工作正常

[hive@cdh-kyuubi]$ /opt/spark3/bin/spark-shell
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/08/30 19:53:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
21/08/30 19:53:46 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
Spark context Web UI available at http://cdh-master1:4040
Spark context available as 'sc' (master = yarn, app id = application_1615462037335_40099).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.1.2-cdh6
      /_/

Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_221)
Type in expressions to have them evaluated.
Type :help for more information.

scala> spark.sql("show databases").show
+----------------+
|       namespace|
+----------------+
|            test|
+----------------+
scala>

至此,Spark 3 on CDH 6 已经部署完成,可以像 CDH 自带的 Spark 2.4 一样,正常的使用 spark-submit、spark-shell,但依旧不支持 spark-sql,spark-thriftserver。

相比略显鸡肋的 spark-sql,spark-thriftserver,Kyuubi 是更好的选择,因此我故意移除了这两个功能。

注意:我提供的构建版,没有启用 spark-thriftserver 模块,为正常使用 Kyuubi,请下载 hive-service-rpc-3.1.2.jar 添加到 /opt/spark3/jars 路径。

Kyuubi —— 解锁 Spark SQL 更多场景

简言之,Apache Kyuubi (Incubating) 之于 Spark,类似 HiveServer2 之于 Hive。

Kyuubi 通过将 Spark 暴露一个与 HiveServer2 完全兼容的 Thrift API,可以兼容现有的 Hive 生态,如 beeline,hive-jdbc-driver,HUE,Superset 等。

可以通过以下文档来了解 Kyuubi 的架构,Kyuubi 与 HiveServer2 和 Spark Thrift Server 的异同。

Kyuubi 部署

Kyuubi 无需任何修改即可适配 CDH 6,下面给出关键步骤,详情可以参考 Deploy Kyuubi engines on Yarn — Kyuubi documentation

同样的,如果你对集群权限管理没有十分严格的要求,请使用 hive 用户以避免权限问题。

解压部署

下载 kyuubi-1.3.0-incubating-bin.tgz 解压至 /opt,并创建软链到 /opt/kyuubi

[hive@cdh-external opt]$ ls -l /opt | grep kyuubi
lrwxrwxrwx  1 root         root           32 Aug 18 17:36 kyuubi -> /opt/kyuubi-1.3.0-incubating-bin
drwxrwxr-x 13 hive         hive         4096 Aug 18 17:52 kyuubi-1.3.0-incubating-bin

修改配置

按需修改 /opt/kyuubi/conf/kyuubi-env.sh

#!/usr/bin/env bash

export JAVA_HOME=/usr/java/default
export SPARK_HOME=/opt/spark3
export SPARK_CONF_DIR=${SPARK_HOME}/conf
export HADOOP_CONF_DIR=/etc/hadoop/conf:/etc/hive/conf

export KYUUBI_PID_DIR=/data/log/service/kyuubi/pid
export KYUUBI_LOG_DIR=/data/log/service/kyuubi/logs
export KYUUBI_WORK_DIR_ROOT=/data/log/service/kyuubi/work
export KYUUBI_MAX_LOG_FILES=10

参考 Kyuubi Configurations System — Kyuubi documentation 按需修改 /opt/kyuubi/conf/kyuubi-defaults.conf

kyuubi.authentication=NONE

kyuubi.engine.share.level=USER

kyuubi.frontend.bind.host=0.0.0.0
kyuubi.frontend.bind.port=10009

kyuubi.ha.zookeeper.quorum=cdh-master1:2181,cdh-master2:2181,cdh-master3:2181
kyuubi.ha.zookeeper.namespace=kyuubi

kyuubi.session.engine.idle.timeout=PT10H

spark.master=yarn
spark.submit.deployMode=cluster

spark.dynamicAllocation.enabled=true
spark.dynamicAllocation.minExecutors=0
spark.dynamicAllocation.maxExecutors=20
spark.dynamicAllocation.executorIdleTimeout=60

注意:kyuubi-defaults.conf 中的 spark 配置优先级高于 spark-defaults.conf

性能调优

How To Use Spark Dynamic Resource Allocation (DRA) in Kyuubi — Kyuubi documentation
How To Use Spark Adaptive Query Execution (AQE) in Kyuubi — Kyuubi documentation

启动

使用 /opt/kyuubi/bin/kyuubi 在前台或后台启动 Kyuubi Server。

[hive@cdh-kyuubi]$ /opt/kyuubi/bin/kyuubi --help
Usage: bin/kyuubi command
  commands:
    start        - Run a Kyuubi server as a daemon
    run          - Run a Kyuubi server in the foreground
    stop         - Stop the Kyuubi daemon
    status       - Show status of the Kyuubi daemon
    -h | --help  - Show this help message

Beeline 连接

使用默认参数连接 Kyuubi

beeline -u jdbc:hive2://cdh-kyuubi:10009 -n bigdata

使用自定义参数连接 Kyuubi

beeline -u "jdbc:hive2://cdh-master2:10009/;?spark.driver.memory=8G#spark.app.name=batch_001;kyuubi.engine.share.level=CONNECTION" -n batch

详细配置参考 Access Kyuubi with Hive JDBC and ODBC Drivers — Kyuubi documentation

HUE 连接

简单说,在 Cloudera Manager 中修改 HUE 配置项 Hue Service Advanced Configuration Snippet 如下,即可在 HUE 中开启 Spark SQL 引擎。

[desktop]
 app_blacklist=zookeeper,hbase,impala,search,sqoop,security
 use_new_editor=true
[[interpreters]]
[[[sparksql]]]
  name=Spark SQL
  interface=hiveserver2
[[[hive]]]
  name=Hive
  interface=hiveserver2
# other interpreters
  ...
[spark]
sql_server_host=kyuubi
sql_server_port=10009

详细配置参考 Getting Started with Kyuubi and Cloudera Hue — Kyuubi documentation

Kyuubi engine 共享级别与应用场景

Kyuubi Spark engine 即一个 Spark driver,通过控制 engine 的共享策略,可以在隔离性和资源利用率上取得平衡。Kyuubi 提供 3 种 engine 共享级别,分别为 CONNECTION,USER(默认),SERVER。

下面的讨论均假设使用 YARN Cluster 模式启动 Kyuubi Spark engine。

我们首先补充一些时间开销和 Spark 操作执行的信息。

Kyuubi Spark engine 在冷启动时,会由 Kyuubi Server 通过 spark-submit 命令向 YARN 提交一个 Spark App,即 engine,该过程从提交到 Spark driver 启动约需要 5-6s,然后 engine 将自己注册到 Zookeeper,Kyuubi Server 监听 Zookeeper 发现 engine 并建立连接,该过程约 1-2s,如此算来,在 YARN 资源空闲时,整个 engine 冷启动时间约 6-8s;Client 连接到存在的 engine,通常耗时 1s 内。

如果启动了 Spark 的 executor 动态伸缩特性,真正执行 SQL 任务时,如果资源有富余,会动态创建 executor,每个 executor 创建耗时约为 2-3s。

元数据、DDL 等操作,如获取 database 列表,CREATE TABLE 语句等,会在 driver 上执行;计算任务如 JOINCOUNT() 等,会由 driver 生成执行计划,在 executor 上执行。

在我们的生产环境中,大概有如下三种使用场景:

  1. 使用 HUE 进行 ad-hoc 查询
    该场景中,会有多个用户进行查询,一般会运行相对较大的查询任务,用户对连接创建时间以及元数据加载时间较为敏感,但对查询结果响应时间有一定的容忍性。
    在这种场景中,使用默认的 USER 共享级别,每个用户只使用一个 Spark engine,配合使用 Spark 的动态伸缩特性,动态的创建和销毁 executor,在保证用户之间隔离的基础上,降低启动时间和资源占用。建议的关键配置如下:
kyuubi.engine.share.level=USER
kyuubi.session.engine.idle.timeout=PT1H

spark.dynamicAllocation.enabled=true
spark.dynamicAllocation.minExecutors=0
spark.dynamicAllocation.maxExecutors=30
spark.dynamicAllocation.executorIdleTimeout=120
  1. 使用 Beeline 运行批任务 SQL
    该场景会使用统一的 batch 账号提交 SQL 任务,对响应时间敏感度低,但对稳定性要求非常要,并且应尽可能的最大化利用集群资源。
    推荐在该场景下将共享级别调整为 CONNECTION,这样每个 SQL 执行将会使用独立的 Spark driver,并且 SQL 执行完毕后 Spark driver 立即退出,保障离线任务互不干扰,并且资源及时释放。建议的关键配置如下:
kyuubi.engine.share.level=CONNECTION

spark.dynamicAllocation.enabled=true
# 根据任务具体资源消耗估算,从 workflow 整体上提升集群资源利用率
spark.dynamicAllocation.minExecutors=5
spark.dynamicAllocation.maxExecutors=30
  1. 使用 Superset 进行多数据源联邦查询
    该场景中,只有一个 service 账号,会定时同时刷新大量的图表,对连接创建时间、查询结果响应时间、并发度都有较高的要求。
    该场景中,driver 和 executor 的启动时间都是不容忽视的,因此在 ad-hoc 查询配置的基础上,应延长 driver、executor 的闲置等待时间,并且设定最小的 executor 保活数量,保证时刻有 executor 常驻,能快速响应较小的查询。建议的关键配置如下:
kyuubi.engine.share.level=USER
kyuubi.session.engine.idle.timeout=PT10H

spark.dynamicAllocation.enabled=true
spark.dynamicAllocation.minExecutors=6
spark.dynamicAllocation.maxExecutors=10
spark.dynamicAllocation.executorIdleTimeout=120

我们可以启动三个 Kyuubi Server(单机或集群)来分别应对如上的三种场景,但也可以仅启动一个 Kyuubi Server(单机或集群)来满足不同的场景。

一种建议的实践方式是 Kyuubi Server 配置使用默认的 USER 共享级别,这样客户端连接时,会使用默认配置;当 ETL 跑批场景时,可以通过 beeline 参数将本次连接共享级别调整为 CONNECTION;类似的,在 Superset 连接中配置独立参数。

结语

现在去跑一些 SQL,体验 Spark SQL 带来的性能飞跃吧!

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,126评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,254评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,445评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,185评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,178评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,970评论 1 284
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,276评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,927评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,400评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,883评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,997评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,646评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,213评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,204评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,423评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,423评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,722评论 2 345

推荐阅读更多精彩内容