企业级大表关联小表的Shuffle实践

       分布式计算系统的精粹在于移动计算而非移动数据,但是在实际的计算过程中,总存在着移动数据的情况。移动数据,将数据从一个节点移动到另一个节点进行计算,不但消耗了网络IO,也消耗了磁盘IO,降低了整个计算的效率。Hadoop的核心思想是MapReduce,但shuffle又是MapReduce的核心。shuffle的主要工作是从Map结束到Reduce开始之间的过程。也是整个计算过程中最影响效率的地方。

       在工作过程中,我发现网络上很多文章都讲了shuffle是什么,但是就是不告诉你怎么做,实际的企业级应用中如何使用是最优的。所以我根据实践写了这边文章帮助大家更好的使用spark。

我们使用spark主要有2种方式:

1、spark编程的方式计算,如果有能力,要想写出最高效的代码,编程是最好的。但是这往往是学习代价最高的,而且大企业开发人员非常多,动辄数百号人使用hadoop作为数据仓库,要想每一个人都学会spark是很难的。

2、采用thriftserver暴露JDBC接口的方式就成为了企业的最佳选择,而且很多第三方工具也只能采用jdbc连接。这样就能够极大降低hadoop作为数据仓库的使用门槛和学习成本。

要想知道为什么会产生shuffle,还需要掌握Spark宽依赖与窄依赖:https://www.jianshu.com/p/5c2301dfa360

JOIN的时候必然会出现宽依赖,SparkSQL的3种Join实现:http://www.cnblogs.com/0xcafedaddy/p/7614299.html请自行学习,这方面网上的资料也比较多。本文就是讲jdbc方式下如何避免或减少shuffle和一些网络上很少讲到的点。

我使用的版本是开源的hadoop-3.2.0、spark-2.4.0,为什么一定要说版本,因为各厂商的hadoop版本执行计划和参数都有差异,但是原理都差不多。

实验一:

readingsORC一个大表,device_info(1M数据都不到)是一张小表,spark是通过spark.sql.autoBroadcastJoinThreshold这个变量来设置小表广播的,默认是小于10M就广播。我们首先直接关联看一下:

spark-sql>explain select count(*) from readingsORC inner join device_info ondevice_info.device_id=readingsORC.device_id;

==Physical Plan ==

*(6)HashAggregate(keys=[], functions=[count(1)])

+-Exchange SinglePartition

   +- *(5) HashAggregate(keys=[],functions=[partial_count(1)])

      +- *(5) Project

         +- *(5)SortMergeJoin[device_id#409], [device_id#421], Inner

            :- *(2) Sort [device_id#409 ASCNULLS FIRST], false, 0

            : +- Exchange hashpartitioning(device_id#409, 200)

            :     +- *(1) Project [device_id#409]

            :        +- *(1) Filter isnotnull(device_id#409)

            :           +- *(1) FileScan orcdefault.readingsorc[device_id#409] Batched: true, Format: ORC, Location:InMemoryFileIndex[hdfs://master:9000/user/hive/warehouse/readingsorc],PartitionFilters: [], PushedFilters: [IsNotNull(device_id)], ReadSchema:struct

            +- *(4) Sort [device_id#421 ASCNULLS FIRST], false, 0

               +- Exchangehashpartitioning(device_id#421, 200)

                  +- *(3) Filterisnotnull(device_id#421)

                     +- Scan hive default.device_info[device_id#421], HiveTableRelation `default`.`device_info`,org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [device_id#421,api_version#422, manufacturer#423, model#424, os_name#425]

我们发现采用了效率最低的SortMergeJoin,task总数也非常多,必然非常慢,这并不是我们想要的。


第二个实验,我们指定小表MAPJOIN(device_info)

explainselect/*+ MAPJOIN(device_info) */count(*) from readingsORC inner join device_info ondevice_info.device_id=readingsORC.device_id;

spark-sql>explain select /*+ MAPJOIN(device_info) */ count(*) from readingsORC inner joindevice_info on device_info.device_id=readingsORC.device_id;

==Physical Plan ==

*(3)HashAggregate(keys=[], functions=[count(1)])

+-Exchange SinglePartition

   +- *(2) HashAggregate(keys=[],functions=[partial_count(1)])

      +- *(2) Project

         +- *(2)BroadcastHashJoin[device_id#339], [device_id#351], Inner,BuildRight

            :- *(2) Project [device_id#339]

            : +- *(2) Filter isnotnull(device_id#339)

            :     +- *(2) FileScan orcdefault.readingsorc[device_id#339] Batched: true, Format: ORC, Location: InMemoryFileIndex[hdfs://master:9000/user/hive/warehouse/readingsorc],PartitionFilters: [], PushedFilters: [IsNotNull(device_id)], ReadSchema:struct

            +- BroadcastExchangeHashedRelationBroadcastMode(List(input[0, string, false]))

               +- *(1) Filterisnotnull(device_id#351)

                  +- Scan hivedefault.device_info [device_id#351], HiveTableRelation `default`.`device_info`,org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [device_id#351,api_version#352, manufacturer#353, model#354, os_name#355]

Time taken: 0.569 seconds, Fetched 1 row(s)


我们发现,采用了BroadcastHashJoin广播小表,达到要求,如果手工能够敲写SQL语句,例如写跑批任务的时候,采用这种方式一点问题都没有,但是我们经常会采用第三方的BI工具来连接JDBC,SQL语句都是自动生成的,根本没有办法让你写入MAPJOIN(device_info)或者开发人员不熟悉数据量大小,不知道写MAPJOIN(device_info)这该怎么办呢?

其实要满足BroadcastHashJoin需要满足两个条件:

1、 表的统计信息是否正确,也就是你要让执行计划知道你是小表,这一条我看基本上99%的文章都没有告诉你,这也是我折腾两天才发现的,我发现了就想告诉大家。我想起做MPP数据库跑批结束都会要求收集统计信息才想到的。

2、广播的表数据量小于spark.sql.autoBroadcastJoinThreshold值,这一条几乎所有文章都说了。


我们做第三个实验,先收集统计信息:

spark-sql> ANALYZE TABLE device_infoorc  COMPUTE STATISTICS;

spark-sql> explain select count(*) from readingsORCinner join device_infoorc on device_infoorc.device_id=readingsORC.device_id;

==Physical Plan ==

*(3)HashAggregate(keys=[], functions=[count(1)])

+-Exchange SinglePartition

   +- *(2) HashAggregate(keys=[],functions=[partial_count(1)])

      +- *(2) Project

         +- *(2)BroadcastHashJoin[device_id#282], [device_id#294], Inner,BuildRight

            :- *(2) Project [device_id#282]

            : +- *(2) Filter isnotnull(device_id#282)

            :     +- *(2) FileScan orc default.readingsorc[device_id#282]Batched: true, Format: ORC, Location:InMemoryFileIndex[hdfs://master:9000/user/hive/warehouse/readingsorc],PartitionFilters: [], PushedFilters: [IsNotNull(device_id)], ReadSchema:struct

            +- BroadcastExchangeHashedRelationBroadcastMode(List(input[0, string, true]))

               +- *(1) Project [device_id#294]

                  +- *(1) Filterisnotnull(device_id#294)

                     +- *(1) FileScan orcdefault.device_infoorc[device_id#294] Batched: true, Format: ORC, Location:InMemoryFileIndex[hdfs://master:9000/user/hive/warehouse/device_infoorc],PartitionFilters: [], PushedFilters: [IsNotNull(device_id)], ReadSchema:struct

Time taken: 0.331 seconds, Fetched 1 row(s)


我们发现,收集完小表统计信息后,如愿把小表广播出去,并进行了BroadcastHashJoin

 

第四个实验,我们有一些表,比如机构表,参数表等使用频率极高的表,有没有办法提速:

通过:spark-sql> CACHE TABLE device_info;就可以把相关表缓存起来,这样所有使用到该表的SQL语句就会自动使用缓存数据,因为往往driver好几天都不关闭会常驻内存,但是记得定期更新数据。

spark-sql> CACHE TABLE device_info;explain select count(*) from readingsORC inne r join device_info on device_info.device_id=readingsORC.device_id;

Time taken: 94.432 seconds

== Physical Plan ==

*(3) HashAggregate(keys=[], functions=[count(1)])

+- Exchange SinglePartition

  +- *(2) HashAggregate(keys=[], functions=[partial_count(1)])

      +- *(2) Project

        +- *(2) BroadcastHashJoin [device_id#85], [device_id#97], Inner, BuildRight

            :- *(2) Project [device_id#85]

            :  +- *(2) Filter isnotnull(device_id#85)

            :    +- *(2) FileScan orc default.readingsorc[device_id#85] Batched: true, Format: ORC, Location: InMemoryFileIndex[hdfs://master:9000/user/hive/warehouse/readingsorc], PartitionFilters: [], PushedFilters: [IsNotNull(device_id)], ReadSchema: struct<device_id:string>

            +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]))

              +- *(1) Filter isnotnull(device_id#97)

                  +- InMemoryTableScan [device_id#97], [isnotnull(device_id#97)]

                        +- InMemoryRelation [device_id#97, api_version#98, manufacturer#99, model#100, os_name#101], StorageLevel(disk, memory, deserialized, 1 replicas)

                              +- Scan hive default.device_info [device_id#0, api_version#1, manufacturer#2, model#3, os_name#4], HiveTableRelation `default`.`device_info`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [device_id#0, api_version#1, manufacturer#2, model#3, os_name#4]

你会发现采用了InMemoryTableScan ,除了占用内存外,效率是极高的。

大表和大表的关联,在hive里面可以通过bucket-map-join(https://data-flair.training/blogs/bucket-map-join/)来实现,效果很明显,但是在spark里面我怎么都实验不出来,如果实验出来再告诉大家。

建议:

1、现在的磁盘带宽、机柜内网络带宽、机器内存都已经大幅提升,而spark的默认参数是为了适应各种环境,设置得普遍比较低,其实完全可以把spark.sql.autoBroadcastJoinThreshold提升到100M,这样能够大幅减少shuffle的可能性

2、不管用大数据的什么产品(mpp、hadoop),如果要做性能调优,要想让系统跑得更快,一定要学会看执行计划

3、Hadoop运维工作量不小,要做好心理准备

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,684评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,143评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,214评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,788评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,796评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,665评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,027评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,679评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,346评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,664评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,766评论 1 331
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,412评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,015评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,974评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,073评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,501评论 2 343