分布式计算系统的精粹在于移动计算而非移动数据,但是在实际的计算过程中,总存在着移动数据的情况。移动数据,将数据从一个节点移动到另一个节点进行计算,不但消耗了网络IO,也消耗了磁盘IO,降低了整个计算的效率。Hadoop的核心思想是MapReduce,但shuffle又是MapReduce的核心。shuffle的主要工作是从Map结束到Reduce开始之间的过程。也是整个计算过程中最影响效率的地方。
在工作过程中,我发现网络上很多文章都讲了shuffle是什么,但是就是不告诉你怎么做,实际的企业级应用中如何使用是最优的。所以我根据实践写了这边文章帮助大家更好的使用spark。
我们使用spark主要有2种方式:
1、spark编程的方式计算,如果有能力,要想写出最高效的代码,编程是最好的。但是这往往是学习代价最高的,而且大企业开发人员非常多,动辄数百号人使用hadoop作为数据仓库,要想每一个人都学会spark是很难的。
2、采用thriftserver暴露JDBC接口的方式就成为了企业的最佳选择,而且很多第三方工具也只能采用jdbc连接。这样就能够极大降低hadoop作为数据仓库的使用门槛和学习成本。
要想知道为什么会产生shuffle,还需要掌握Spark宽依赖与窄依赖:https://www.jianshu.com/p/5c2301dfa360
JOIN的时候必然会出现宽依赖,SparkSQL的3种Join实现:http://www.cnblogs.com/0xcafedaddy/p/7614299.html请自行学习,这方面网上的资料也比较多。本文就是讲jdbc方式下如何避免或减少shuffle和一些网络上很少讲到的点。
我使用的版本是开源的hadoop-3.2.0、spark-2.4.0,为什么一定要说版本,因为各厂商的hadoop版本执行计划和参数都有差异,但是原理都差不多。
实验一:
readingsORC一个大表,device_info(1M数据都不到)是一张小表,spark是通过spark.sql.autoBroadcastJoinThreshold这个变量来设置小表广播的,默认是小于10M就广播。我们首先直接关联看一下:
spark-sql>explain select count(*) from readingsORC inner join device_info ondevice_info.device_id=readingsORC.device_id;
==Physical Plan ==
*(6)HashAggregate(keys=[], functions=[count(1)])
+-Exchange SinglePartition
+- *(5) HashAggregate(keys=[],functions=[partial_count(1)])
+- *(5) Project
+- *(5)SortMergeJoin[device_id#409], [device_id#421], Inner
:- *(2) Sort [device_id#409 ASCNULLS FIRST], false, 0
: +- Exchange hashpartitioning(device_id#409, 200)
: +- *(1) Project [device_id#409]
: +- *(1) Filter isnotnull(device_id#409)
: +- *(1) FileScan orcdefault.readingsorc[device_id#409] Batched: true, Format: ORC, Location:InMemoryFileIndex[hdfs://master:9000/user/hive/warehouse/readingsorc],PartitionFilters: [], PushedFilters: [IsNotNull(device_id)], ReadSchema:struct
+- *(4) Sort [device_id#421 ASCNULLS FIRST], false, 0
+- Exchangehashpartitioning(device_id#421, 200)
+- *(3) Filterisnotnull(device_id#421)
+- Scan hive default.device_info[device_id#421], HiveTableRelation `default`.`device_info`,org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [device_id#421,api_version#422, manufacturer#423, model#424, os_name#425]
我们发现采用了效率最低的SortMergeJoin,task总数也非常多,必然非常慢,这并不是我们想要的。
第二个实验,我们指定小表MAPJOIN(device_info):
explainselect/*+ MAPJOIN(device_info) */count(*) from readingsORC inner join device_info ondevice_info.device_id=readingsORC.device_id;
spark-sql>explain select /*+ MAPJOIN(device_info) */ count(*) from readingsORC inner joindevice_info on device_info.device_id=readingsORC.device_id;
==Physical Plan ==
*(3)HashAggregate(keys=[], functions=[count(1)])
+-Exchange SinglePartition
+- *(2) HashAggregate(keys=[],functions=[partial_count(1)])
+- *(2) Project
+- *(2)BroadcastHashJoin[device_id#339], [device_id#351], Inner,BuildRight
:- *(2) Project [device_id#339]
: +- *(2) Filter isnotnull(device_id#339)
: +- *(2) FileScan orcdefault.readingsorc[device_id#339] Batched: true, Format: ORC, Location: InMemoryFileIndex[hdfs://master:9000/user/hive/warehouse/readingsorc],PartitionFilters: [], PushedFilters: [IsNotNull(device_id)], ReadSchema:struct
+- BroadcastExchangeHashedRelationBroadcastMode(List(input[0, string, false]))
+- *(1) Filterisnotnull(device_id#351)
+- Scan hivedefault.device_info [device_id#351], HiveTableRelation `default`.`device_info`,org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [device_id#351,api_version#352, manufacturer#353, model#354, os_name#355]
Time taken: 0.569 seconds, Fetched 1 row(s)
我们发现,采用了BroadcastHashJoin广播小表,达到要求,如果手工能够敲写SQL语句,例如写跑批任务的时候,采用这种方式一点问题都没有,但是我们经常会采用第三方的BI工具来连接JDBC,SQL语句都是自动生成的,根本没有办法让你写入MAPJOIN(device_info),或者开发人员不熟悉数据量大小,不知道写MAPJOIN(device_info)。这该怎么办呢?
其实要满足BroadcastHashJoin需要满足两个条件:
1、 表的统计信息是否正确,也就是你要让执行计划知道你是小表,这一条我看基本上99%的文章都没有告诉你,这也是我折腾两天才发现的,我发现了就想告诉大家。我想起做MPP数据库跑批结束都会要求收集统计信息才想到的。
2、广播的表数据量小于spark.sql.autoBroadcastJoinThreshold值,这一条几乎所有文章都说了。
我们做第三个实验,先收集统计信息:
spark-sql> ANALYZE TABLE device_infoorc COMPUTE STATISTICS;
spark-sql> explain select count(*) from readingsORCinner join device_infoorc on device_infoorc.device_id=readingsORC.device_id;
==Physical Plan ==
*(3)HashAggregate(keys=[], functions=[count(1)])
+-Exchange SinglePartition
+- *(2) HashAggregate(keys=[],functions=[partial_count(1)])
+- *(2) Project
+- *(2)BroadcastHashJoin[device_id#282], [device_id#294], Inner,BuildRight
:- *(2) Project [device_id#282]
: +- *(2) Filter isnotnull(device_id#282)
: +- *(2) FileScan orc default.readingsorc[device_id#282]Batched: true, Format: ORC, Location:InMemoryFileIndex[hdfs://master:9000/user/hive/warehouse/readingsorc],PartitionFilters: [], PushedFilters: [IsNotNull(device_id)], ReadSchema:struct
+- BroadcastExchangeHashedRelationBroadcastMode(List(input[0, string, true]))
+- *(1) Project [device_id#294]
+- *(1) Filterisnotnull(device_id#294)
+- *(1) FileScan orcdefault.device_infoorc[device_id#294] Batched: true, Format: ORC, Location:InMemoryFileIndex[hdfs://master:9000/user/hive/warehouse/device_infoorc],PartitionFilters: [], PushedFilters: [IsNotNull(device_id)], ReadSchema:struct
Time taken: 0.331 seconds, Fetched 1 row(s)
我们发现,收集完小表统计信息后,如愿把小表广播出去,并进行了BroadcastHashJoin。
第四个实验,我们有一些表,比如机构表,参数表等使用频率极高的表,有没有办法提速:
通过:spark-sql> CACHE TABLE device_info;就可以把相关表缓存起来,这样所有使用到该表的SQL语句就会自动使用缓存数据,因为往往driver好几天都不关闭会常驻内存,但是记得定期更新数据。
spark-sql> CACHE TABLE device_info;explain select count(*) from readingsORC inne r join device_info on device_info.device_id=readingsORC.device_id;
Time taken: 94.432 seconds
== Physical Plan ==
*(3) HashAggregate(keys=[], functions=[count(1)])
+- Exchange SinglePartition
+- *(2) HashAggregate(keys=[], functions=[partial_count(1)])
+- *(2) Project
+- *(2) BroadcastHashJoin [device_id#85], [device_id#97], Inner, BuildRight
:- *(2) Project [device_id#85]
: +- *(2) Filter isnotnull(device_id#85)
: +- *(2) FileScan orc default.readingsorc[device_id#85] Batched: true, Format: ORC, Location: InMemoryFileIndex[hdfs://master:9000/user/hive/warehouse/readingsorc], PartitionFilters: [], PushedFilters: [IsNotNull(device_id)], ReadSchema: struct<device_id:string>
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]))
+- *(1) Filter isnotnull(device_id#97)
+- InMemoryTableScan [device_id#97], [isnotnull(device_id#97)]
+- InMemoryRelation [device_id#97, api_version#98, manufacturer#99, model#100, os_name#101], StorageLevel(disk, memory, deserialized, 1 replicas)
+- Scan hive default.device_info [device_id#0, api_version#1, manufacturer#2, model#3, os_name#4], HiveTableRelation `default`.`device_info`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [device_id#0, api_version#1, manufacturer#2, model#3, os_name#4]
你会发现采用了InMemoryTableScan ,除了占用内存外,效率是极高的。
大表和大表的关联,在hive里面可以通过bucket-map-join(https://data-flair.training/blogs/bucket-map-join/)来实现,效果很明显,但是在spark里面我怎么都实验不出来,如果实验出来再告诉大家。
建议:
1、现在的磁盘带宽、机柜内网络带宽、机器内存都已经大幅提升,而spark的默认参数是为了适应各种环境,设置得普遍比较低,其实完全可以把spark.sql.autoBroadcastJoinThreshold提升到100M,这样能够大幅减少shuffle的可能性
2、不管用大数据的什么产品(mpp、hadoop),如果要做性能调优,要想让系统跑得更快,一定要学会看执行计划
3、Hadoop运维工作量不小,要做好心理准备