使用sparksql访问几个hive表join的情况时结果为空,且这个sql在hive里执行是成功的。
val sparkSession = SparkSession
.builder()
.config("jars","lib/*")
.appName("Spark Hive Example")
.enableHiveSupport()
.getOrCreate()
sparkSession.sql("select t1.c2,count(*) from t1 join t2 on (t1.c1=t2.c1) group by t1.c2").collect().map(r => mergeToOracle(r))
查看了t1,t2表的结构
- t1是json格式,MR任务生成
- t2是parquet格式,sqoop导出
单独查询两个表的结果
sparkSession.sql("select * from t1 limit 10").collect().map(r => println(r)) //正常显示
sparkSession.sql("select * from t2 limit 10").collect().map(r => println(r)) //有结果,全部为null
因此可以判断是读parquet的结果出错,因此导致两个表join也没有结果。如果直接按文件读取parquet文件,使用临时表查询呢,结果正常显示,且与其他表join也是正常。
sparkSession.read.parquet("/path of the hive table/").createOrReplaceTempView("temp_a")
val rs = sparkSession.sql("select * from temp_a")
rs.printSchema()
rs.show()
线上环境刚好还有另外一套sparksql运行的beta环境,将sql拿去执行是没有问题的,因此比较了当前执行环境和该beta环境的配制,发现其中有一个区别是spark.sql.hive.convertMetastoreParquet配制为false。
sparkSession.sqlContext.setConf("spark.sql.hive.convertMetastoreParquet","false")
加上这句之后数据正常了,这个配制有什么用,看字面也能理解了。
spark.sql.hive.convertMetastoreParquet default is true.When set to false, Spark
SQL will use the Hive SerDe for parquet tables instead of the built in support.
当向Hive metastore中读写Parquet表时,Spark SQL将使用Spark SQL自带的Parquet SerDe(SerDe:Serialize/Deserilize的简称,目的是用于序列化和反序列化),而不是用Hive的SerDe,Spark SQL自带的SerDe拥有更好的性能。这个优化的配置参数为spark.sql.hive.convertMetastoreParquet,默认值为开启。