How to Use Incremental Result Collection with Kyuubi

Kyuubi

1. Introductions

Kyuubi is an enhanced edition of the Apache Spark's primordial Thrift JDBC/ODBC Server. It is mainly designed for directly running SQL towards a cluster with all components including HDFS, YARN, Hive MetaStore, and itself secured.
Recently, Kyuubi support incrementally receive partial result data from executor side. The main purpose of this feature is to reduce OutOfMemoryError risks of the Kyuubi Server itself. Kyuubi itself will somehow much more risky than Spark Thrift Server for the reason of multi SparkContext support, so it is important to add this function.

2. Configurations

Name Default Description
spark.kyuubi.operation.incremental.collect false Whether to use incremental result collection from Spark executor side to Kyuubi server side.

As is shown in the above table, there is only one configuration to enable this feature which is disabled by default.

3. How to Configue

3.1 Server level scope

spark.kyuubi.operation.incremental.collect is a Kyuubi type configuration which can simply treated as a Spark one see the differences, which means that

  1. it can be set via --conf spark.kyuubi.operation.incremental.collect=true with $KYUUBI_HOME/bin/start-kyuubi.sh script.
$KYUUBI_HOME/bin/start-kyuubi.sh \
    --master yarn \
    --deploy-mode client \
    --driver-memory 10g \
    --conf spark.kyuubi.operation.incremental.collect=true
  1. it also can be put in the properties file(spark-defaults.conf) used by the spark launched Kyuubi, which always can be found in the $SPARK_HOME/conf directory.

The way how we configure Kyuubi as above means this configuration will be spread server side to affect all KyuubiSession s a.k.a HiveConnection s.

3.2 Session level scope

Kyuubi also treats it as an session level configuration, so we can change it inside session without affecting others. This makes Kyuubi more flexible. We will introduce it in the following.

Let's say we already have a startup Kyuubi Server. We use beeline cli to connect and test.

~/data/apache-spark/spark-2.1.2-bin-hadoop2.7$ bin/beeline -u "jdbc:hive2://kyuubi.server.163.org:10009/;principal=hive/kyuubi.server.163.org@SERVER.163.ORG;hive.server2.proxy.user=hzyaoqin#spark.yarn.queue=default;spark.sql.haha=hehe;spark.scheduler.mode=FAIR;spark.kyuubi.operation.incremental.collect=true"
Connecting to jdbc:hive2://kyuubi.server.163.org10009/;principal=hive/kyuubi.server.163.org@SERVER.163.ORG;hive.server2.proxy.user=hzyaoqin#spark.yarn.queue=default;spark.sql.haha=hehe;spark.scheduler.mode=FAIR;spark.kyuubi.operation.incremental.collect=true
18/05/25 14:49:08 INFO Utils: Supplied authorities: kyuubi.server.163.org:10009
18/05/25 14:49:08 INFO Utils: Resolved authority:kyuubi.server.163.org:10009
18/05/25 14:49:08 INFO HiveConnection: Will try to open client transport with JDBC Uri: jdbc:hive2://kyuubi.server.163.org:10009/;principal=hive/kyuubi.server.163.org@SERVER.163.ORG;hive.server2.proxy.user=hzyaoqin#spark.yarn.queue=default;spark.sql.haha=hehe;spark.scheduler.mode=FAIR;spark.kyuubi.operation.incremental.collect=true
Connected to: Spark SQL (version 2.1.2)
Driver: Hive JDBC (version 1.2.1.spark2)
Transaction isolation: TRANSACTION_REPEATABLE_READ
Beeline version 1.2.1.spark2 by Apache Hive

Like other spark/kyuubi configuration, we can simplely put it in the connection string. And let's test with show tables statement.

0: jdbc:hive2://kyuubi.server.163.org:> show tables;
18/05/25 14:49:54 INFO KyuubiOperation: Running query 'show tables' with 3a1f0ca4-8495-4e23-aa1e-bffb202b49a5
18/05/25 14:49:54 INFO SparkSqlParser: Parsing command: show tables
18/05/25 14:49:54 INFO KyuubiOperation: Executing query in incremental collection mode
18/05/25 14:49:54 INFO DAGScheduler: Asked to cancel job group 3a1f0ca4-8495-4e23-aa1e-bffb202b49a5
+-----------+-----------------+--------------+--+
| database  |    tableName    | isTemporary  |
+-----------+-----------------+--------------+--+
| default   | src             | false        |
| default   | src2            | false        |
| default   | src3            | false        |
| default   | src_parquet_30  | false        |
+-----------+-----------------+--------------+--+
4 rows selected (0.835 seconds)

"KyuubiOperation: Executing query in incremental collection mode" tells us that we collect results incrementally...

And than, let's test select * from src2 statement.

0: jdbc:hive2://kyuubi.server.163.org:> select * from src2;
18/05/25 14:50:00 INFO KyuubiOperation: Running query 'select * from src2' with ca9f76f5-72f7-4c22-86cd-f31c892070c8
18/05/25 14:50:00 INFO SparkSqlParser: Parsing command: select * from src2
18/05/25 14:50:01 INFO CatalystSqlParser: Parsing command: int
18/05/25 14:50:01 INFO CatalystSqlParser: Parsing command: string
18/05/25 14:50:01 INFO deprecation: mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps
18/05/25 14:50:01 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 339.6 KB, free 15.8 GB)
18/05/25 14:50:01 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 31.2 KB, free 15.8 GB)
18/05/25 14:50:01 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 10.201.168.184:25356 (size: 31.2 KB, free: 15.8 GB)
18/05/25 14:50:01 INFO SparkContext: Created broadcast 0 from toString at KyuubiOperation.scala:319
18/05/25 14:50:01 INFO KyuubiOperation: Executing query in incremental collection mode
18/05/25 14:50:02 INFO GPLNativeCodeLoader: Loaded native gpl library
18/05/25 14:50:02 WARN LzoCompressor: java.lang.UnsatisfiedLinkError: Cannot load liblzo2.so.2 (liblzo2.so.2: cannot open shared object file: No such file or directory)!
18/05/25 14:50:02 ERROR LzoCodec: Failed to load/initialize native-lzo library
18/05/25 14:50:02 INFO FileInputFormat: Total input paths to process : 1
18/05/25 14:50:02 INFO DAGScheduler: Asked to cancel job group ca9f76f5-72f7-4c22-86cd-f31c892070c8
+------+----------+--+
| key  |  value   |
+------+----------+--+
| 238  | val_238  |
| 86   | val_86   |
497 lines are omitted here......
| 97   | val_97   |
+------+----------+--+
500 rows selected (4.938 seconds)
18/05/25 14:50:02 INFO DAGScheduler: Got job 0 (toSeq at KyuubiOperation.scala:233) with 1 output partitions
18/05/25 14:50:02 INFO DAGScheduler: Final stage: ResultStage 0 (toSeq at KyuubiOperation.scala:233)
18/05/25 14:50:02 INFO DAGScheduler: Parents of final stage: List()
18/05/25 14:50:02 INFO DAGScheduler: Missing parents: List()
18/05/25 14:50:02 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[6] at toLocalIterator at KyuubiOperation.scala:324), which has no missing parents
18/05/25 14:50:02 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 8.1 KB, free 15.8 GB)
18/05/25 14:50:02 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 4.4 KB, free 15.8 GB)
18/05/25 14:50:02 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 10.201.168.184:25356 (size: 4.4 KB, free: 15.8 GB)
18/05/25 14:50:02 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:996
18/05/25 14:50:02 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (MapPartitionsRDD[6] at toLocalIterator at KyuubiOperation.scala:324)
18/05/25 14:50:02 INFO YarnScheduler: Adding task set 0.0 with 1 tasks
18/05/25 14:50:02 INFO FairSchedulableBuilder: Added task set TaskSet_0.0 tasks to pool default
18/05/25 14:50:02 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, hadoop3020.jd.163.org, executor 1, partition 0, NODE_LOCAL, 6005 bytes)
18/05/25 14:50:02 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on hadoop3020.jd.163.org:57229 (size: 4.4 KB, free: 10.5 GB)
18/05/25 14:50:03 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on hadoop3020.jd.163.org:57229 (size: 31.2 KB, free: 10.5 GB)
18/05/25 14:50:05 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 3396 ms on hadoop3020.jd.163.org (executor 1) (1/1)
18/05/25 14:50:05 INFO YarnScheduler: Removed TaskSet 0.0, whose tasks have all completed, from pool default
18/05/25 14:50:05 INFO DAGScheduler: ResultStage 0 (toSeq at KyuubiOperation.scala:233) finished in 3.401 s
18/05/25 14:50:05 INFO DAGScheduler: Got job 1 (indices at ColumnBasedSet.scala:60) with 1 output partitions
18/05/25 14:50:05 INFO DAGScheduler: Final stage: ResultStage 1 (indices at ColumnBasedSet.scala:60)
18/05/25 14:50:05 INFO DAGScheduler: Parents of final stage: List()
18/05/25 14:50:05 INFO DAGScheduler: Missing parents: List()
18/05/25 14:50:05 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[6] at toLocalIterator at KyuubiOperation.scala:324), which has no missing parents
18/05/25 14:50:05 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 8.1 KB, free 15.8 GB)
18/05/25 14:50:05 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 4.4 KB, free 15.8 GB)
18/05/25 14:50:05 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 10.201.168.184:25356 (size: 4.4 KB, free: 15.8 GB)
18/05/25 14:50:05 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:996
18/05/25 14:50:05 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[6] at toLocalIterator at KyuubiOperation.scala:324)
18/05/25 14:50:05 INFO YarnScheduler: Adding task set 1.0 with 1 tasks
18/05/25 14:50:05 INFO FairSchedulableBuilder: Added task set TaskSet_1.0 tasks to pool default
18/05/25 14:50:05 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, hadoop3020.jd.163.org, executor 1, partition 1, NODE_LOCAL, 6005 bytes)
18/05/25 14:50:05 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on hadoop3020.jd.163.org:57229 (size: 4.4 KB, free: 10.5 GB)
18/05/25 14:50:05 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 98 ms on hadoop3020.jd.163.org (executor 1) (1/1)
18/05/25 14:50:05 INFO YarnScheduler: Removed TaskSet 1.0, whose tasks have all completed, from pool default
18/05/25 14:50:05 INFO DAGScheduler: ResultStage 1 (indices at ColumnBasedSet.scala:60) finished in 0.099 s

"KyuubiOperation: Executing query in incremental collection mode" shows..

3.3 By set command

As a runtime configuration, it can be set by SetCommand as following,

set spark.kyuubi.operation.incremental.collect=false;

Let's test this mode in the same beeline client.

0: jdbc:hive2://hzadg-jenkins.server.163.org:> set spark.kyuubi.operation.incremental.collect=false;
18/05/25 15:58:50 INFO KyuubiOperation: Running query 'set spark.kyuubi.operation.incremental.collect=false' with b63966a7-731c-48d6-9b99-f1a738232bb5
18/05/25 15:58:50 INFO SparkSqlParser: Parsing command: set spark.kyuubi.operation.incremental.collect=false
18/05/25 15:58:50 INFO KyuubiOperation: Executing query in incremental collection mode
18/05/25 15:58:50 INFO DAGScheduler: Asked to cancel job group b63966a7-731c-48d6-9b99-f1a738232bb5
+---------------------------------------------+--------+--+
|                     key                     | value  |
+---------------------------------------------+--------+--+
| spark.kyuubi.operation.incremental.collect  | false  |
+---------------------------------------------+--------+--+

As the logs shown above, we still go the incremental way. And than execute select * from src2.

0: jdbc:hive2://kyuubi.server.163.org:> select * from src2;
18/05/25 15:58:54 INFO KyuubiOperation: Running query 'select * from src2' with 01e00534-c927-4a45-be3b-9c0491897322
18/05/25 15:58:54 INFO SparkSqlParser: Parsing command: select * from src2
18/05/25 15:58:54 INFO CatalystSqlParser: Parsing command: int
18/05/25 15:58:54 INFO CatalystSqlParser: Parsing command: string
18/05/25 15:58:54 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 339.6 KB, free 15.8 GB)
18/05/25 15:58:54 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 31.2 KB, free 15.8 GB)
18/05/25 15:58:54 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on 10.201.168.184:25356 (size: 31.2 KB, free: 15.8 GB)
18/05/25 15:58:54 INFO SparkContext: Created broadcast 3 from toString at KyuubiOperation.scala:319
18/05/25 15:58:54 INFO FileInputFormat: Total input paths to process : 1
18/05/25 15:58:54 INFO SparkContext: Starting job: collect at KyuubiOperation.scala:326
18/05/25 15:58:54 INFO DAGScheduler: Got job 2 (collect at KyuubiOperation.scala:326) with 2 output partitions
18/05/25 15:58:54 INFO DAGScheduler: Final stage: ResultStage 2 (collect at KyuubiOperation.scala:326)
18/05/25 15:58:54 INFO DAGScheduler: Parents of final stage: List()
18/05/25 15:58:54 INFO DAGScheduler: Missing parents: List()
18/05/25 15:58:54 INFO DAGScheduler: Submitting ResultStage 2 (MapPartitionsRDD[12] at collect at KyuubiOperation.scala:326), which has no missing parents
18/05/25 15:58:54 INFO MemoryStore: Block broadcast_4 stored as values in memory (estimated size 8.0 KB, free 15.8 GB)
18/05/25 15:58:54 INFO MemoryStore: Block broadcast_4_piece0 stored as bytes in memory (estimated size 4.4 KB, free 15.8 GB)
18/05/25 15:58:54 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on 10.201.168.184:25356 (size: 4.4 KB, free: 15.8 GB)
18/05/25 15:58:54 INFO SparkContext: Created broadcast 4 from broadcast at DAGScheduler.scala:996
18/05/25 15:58:54 INFO DAGScheduler: Submitting 2 missing tasks from ResultStage 2 (MapPartitionsRDD[12] at collect at KyuubiOperation.scala:326)
18/05/25 15:58:54 INFO YarnScheduler: Adding task set 2.0 with 2 tasks
18/05/25 15:58:54 INFO FairSchedulableBuilder: Added task set TaskSet_2.0 tasks to pool default
18/05/25 15:58:54 INFO TaskSetManager: Starting task 0.0 in stage 2.0 (TID 2, hadoop3020.jd.163.org, executor 1, partition 0, NODE_LOCAL, 6261 bytes)
18/05/25 15:58:54 INFO TaskSetManager: Starting task 1.0 in stage 2.0 (TID 3, hadoop3020.jd.163.org, executor 1, partition 1, NODE_LOCAL, 6261 bytes)
18/05/25 15:58:54 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on hadoop3020.jd.163.org:57229 (size: 4.4 KB, free: 10.5 GB)
18/05/25 15:58:54 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on hadoop3020.jd.163.org:57229 (size: 31.2 KB, free: 10.5 GB)
18/05/25 15:58:54 INFO TaskSetManager: Finished task 1.0 in stage 2.0 (TID 3) in 219 ms on hadoop3020.jd.163.org (executor 1) (1/2)
18/05/25 15:58:54 INFO TaskSetManager: Finished task 0.0 in stage 2.0 (TID 2) in 223 ms on hadoop3020.jd.163.org (executor 1) (2/2)
18/05/25 15:58:54 INFO YarnScheduler: Removed TaskSet 2.0, whose tasks have all completed, from pool default
18/05/25 15:58:54 INFO DAGScheduler: ResultStage 2 (collect at KyuubiOperation.scala:326) finished in 0.219 s
18/05/25 15:58:54 INFO DAGScheduler: Job 2 finished: collect at KyuubiOperation.scala:326, took 0.240928 s
18/05/25 15:58:54 INFO DAGScheduler: Asked to cancel job group 01e00534-c927-4a45-be3b-9c0491897322
+------+----------+--+
| key  |  value   |
+------+----------+--+
| 238  | val_238  |
| 86   | val_86   |
497 lines are omitted here......
| 97   | val_97   |
+------+----------+--+
500 rows selected (0.553 seconds)

"KyuubiOperation: Executing query in incremental collection mode" has gone away. we set back to collect data as a whole part.

Conclusions

Kyuubi now supports incremental result collection in three ways. HOPE this feature helps all of you in Spark SQL productization.

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,911评论 5 460
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 82,014评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 142,129评论 0 320
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,283评论 1 264
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,159评论 4 357
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,161评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,565评论 3 382
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,251评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,531评论 1 292
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,619评论 2 310
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,383评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,255评论 3 313
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,624评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,916评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,199评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,553评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,756评论 2 335

推荐阅读更多精彩内容