现象
在运行[spark][1]程序期间,在针对 [dataFrame][2] 的map操作中,产生了类似[HashMap][3] key重复的现象,如图所示
这个问题导致了后续统计上的一系列问题
分析
起初我们是实际跟踪代码进行分析的,但是发现scala代码中没有任何问题,各种处理也非常合理
代码罗列如下,想看就看看,不看也能理解
// 目标是针对businessid进行聚合,然后输出各个业务id下,销售天数
// 获取数据的代码
val dataframe = spark.sql("select businessid,date,money from table1")
case class Stat() {
// 针对每一个businessid的统计
var moneyByDay: HashMap[String, Double] = HashMap[String, Double]()
// 针对每一条记录的id进行相加
def moneyByDayOp(data: (String, Double)) = {
if (this.moneyByDay.contains(data._1)) {
val tmpMoney = this.moneyByDay.get(data._1)
val finalMoney = tmpMoney.getOrElse[Double](0) + data._2
this.moneyByDay.remove(data._1)
this.moneyByDay.put(data._1, finalMoney)
} else {
if (data._2 > 0)
this.moneyByDay.put(data._1, data._2)
}
}
}
/**
* 归并多条记录的结果
**/
def reduceStatModel(one: Stat, another: Stat): Stat = {
one.moneyByDay ++= another.moneyByDay
one
}
/**
* 针对每条记录生成一个统计值
**/
import org.apache.spark.sql.Row
def parseProductDay(data: Row): (Long, Stat) = {
val result: Stat = new Stat
val a: String = data.getAs[String]("date")
result.moneyByDayOp(a, data.getAs[Long]("money"))
(data.getAs[Long]("product_id"), result)
}
// 核心流程
val finalRs = dataframe.rdd
.map(line => parseProductDay(line))
.reduceByKey(reduceStatModel(_, _))
val hashMp = finalRs.collect()(0)._2.moneyByDay
hashMp.put("20170727",1)
针对的存储,其实就是Stat类中的 moneyByDay对象,本质上是一个HashMap,并且通过泛型控制,key类型为String,value的类型为Double
放到集群上执行,是可以通过的,并且得到类似上图的结论
hashMap中包含重复的key,只能是两个可能
- key类型相同,但是可能原始字符串中有空格,或者不可见字符
- key类型不同,一个是字符串,另一个是其他数据类型
经过检查,1的可能性排除
问题原因是2,居然是2,排除了所有不可能之后,最后的真相即使再不可能,也是真的
原因
其实这种结论是意料之外,情理之中
众所周知,java的泛型检查是编译时的检查,实际运行时,容器类存储和运算都是将对象看做object进行处理的
对于基于jvm的scala,泛型本质上也是一个静态编译检查
上述代码,如果table1表中的字段date,其类型是string,那么万事ok,运算和结论都会正常
但是如果date是其他类型,比如int,那么就会产生问题
问题出现在上述代码的37行
val a: String = data.getAs[String]("date")
// 这行代码是org.apache.spark.sql.Row对象的一个调用,目的是获取指定类型的字段,并且转化为指定类型
// 源码如下:
/**
* Returns the value at position i.
* For primitive types if value is null it returns 'zero value' specific for primitive
* ie. 0 for Int - use isNullAt to ensure that value is not null
*
* @throws ClassCastException when data type does not match.
*/
def getAs[T](i: Int): T = get(i).asInstanceOf[T] // 此处是进行强制转化
/**
* Returns the value of a given fieldName.
* For primitive types if value is null it returns 'zero value' specific for primitive
* ie. 0 for Int - use isNullAt to ensure that value is not null
*
* @throws UnsupportedOperationException when schema is not defined.
* @throws IllegalArgumentException when fieldName do not exist.
* @throws ClassCastException when data type does not match.
*/
def getAs[T](fieldName: String): T = getAs[T](fieldIndex(fieldName))
解决方案
将table1表的字段设计成string,代码可以运行通过
-
37行代码改为如下,转化string即可
val a: String = String.valueOf(data.getAs[String]("date"))
疑问
在spark-shell中调用这段代码,其实是会报错的
val a = data.getAs[String]("date")
但是为什么集群执行会通过?
并且返回了一个HashMap[String,Double],其中的key都是Int。。。每次调用foreach方法都会报错
-
https://spark.apache.org/ "apache基于内存的分布式计算框架" ↩
-
https://spark.apache.org/docs/latest/sql-programming-guide.html "spark-dataframe, 与pandas的Dataframe相似,是针对分布式计算的抽象和实现" ↩
-
http://www.scala-lang.org/api/current/scala/collection/mutable/HashMap.html " "scala.mutable.hashmap scala中的map分为可变和不可变的" ↩