摘要:Spark SQL
先上结论
-
空指针的情况:在UDF中正确指定了对应类型的前提下,DataFrame中的列如果是
string
,timestamp
,集合(array
,map
)类型,null值都会被匹配到,如果UDF逻辑需要对null进行操作,会导致空指针 -
null略过返回null的情况:在UDF中正确指定了对应类型的前提下,DataFrame中的数值类型(
Int
,Long
,Double
),以及布尔类型,null值都会被略过,此时UDF还是返回null,如果UDF中需要apply多列,只要有其中任何一列是null,整一行都不会被匹配到 -
functions下自带函数null略过:
spark.sql.functions
下的函数会自动略过null,依旧返回null,如果针对string等不会自动略过null的情况,UDF中要对null进行判断,如果需要依旧返回null则UDF返回None: Option
按照数据类型测试
先准备一个DataFrame,使用Scala的Option来构造出含有null的列
scala> val a = Seq((Some("a_b"), Some(1), Some(1L), Some(3.3), Some(false)), (None, None,None, None, None)).toDF("a", "b", "c", "d", "e")
a: org.apache.spark.sql.DataFrame = [a: string, b: int ... 3 more fields]
scala> a.show()
+----+----+----+----+-----+
| a| b| c| d| e|
+----+----+----+----+-----+
| a_b| 1| 1| 3.3|false|
|null|null|null|null| null|
+----+----+----+----+-----+
scala> a.printSchema
root
|-- a: string (nullable = true)
|-- b: integer (nullable = true)
|-- c: long (nullable = true)
|-- d: double (nullable = true)
|-- e: boolean (nullable = true)
(1)字符串列(Scala/Java:String)
Scala的String和Java是一样的,测试以下:
测试字符串split,报错空指针,说明null(None: Option[String])被处理
scala> a.withColumn("a_1", udf((s: String) => s.split("_")(0)).apply($"a")).show()
Caused by: java.lang.NullPointerException
测试字符串slice,报错空指针,说明null(None: Option[String])被处理
scala> a.withColumn("a_1", udf((s: String) => s.slice(2, 3)).apply($"a")).show()
Caused by: java.lang.NullPointerException
测试不对字符串做任何操作,只要匹配上就给到一个任意输出,正常运行
scala> a.withColumn("a_1", udf((s: String) => "3").apply($"a")).show()
+----+----+----+----+-----+---+
| a| b| c| d| e|a_1|
+----+----+----+----+-----+---+
| a_b| 1| 1| 3.3|false| 3|
|null|null|null|null| null| 3|
+----+----+----+----+-----+---+
测试字符串相加,正常运行
scala> a.withColumn("a_1", udf((s: String) => s + "123").apply($"a")).show()
+----+----+----+----+-----+-------+
| a| b| c| d| e| a_1|
+----+----+----+----+-----+-------+
| a_b| 1| 1| 3.3|false| a_b123|
|null|null|null|null| null|null123|
+----+----+----+----+-----+-------+
测试使用spark.sql.functions.split下的自带函数,null值略过
scala> a.withColumn("a", split($"a", "_")).show()
+------+----+----+----+-----+
| a| b| c| d| e|
+------+----+----+----+-----+
|[a, b]| 1| 1| 3.3|false|
| null|null|null|null| null|
+------+----+----+----+-----
字符串列结论:
- 字符串列中null会被UDF处理
- 在匹配上字符串之后,UDF中的逻辑会按照Scala的语法进行处理,如果Scala语法报错例如空指针则UDF执行报错,如果Scala语法无误则UDF执行成功
- UDF不会自动对null值做异常处理,但是spark.sql.function下的函数会对null做异常处理,处理方式是略过处理,直接返回null
如果要实现functions下的split返回null,UDF需要加null判断
scala> a.withColumn("a_1", udf((s: String) => if (s == null) null else s.split("_")).apply($"a")).show()
+----+----+----+----+-----+------+
| a| b| c| d| e| a_1|
+----+----+----+----+-----+------+
| a_b| 1| 1| 3.3|false|[a, b]|
|null|null|null|null| null| null|
+----+----+----+----+-----+------+
scala> a.withColumn("a_1", udf((s: String) => if (s == null) None else Some(s.split("_"))).apply($"a")).show()
+----+----+----+----+-----+------+
| a| b| c| d| e| a_1|
+----+----+----+----+-----+------+
| a_b| 1| 1| 3.3|false|[a, b]|
|null|null|null|null| null| null|
+----+----+----+----+-----+------+
(2)整数,长整数,小数列(Scala:Int,Long,Double)
Int和Long是Scala的类型,测试一下
测试整数相加,正常运行,null略过没有进行计算
scala> a.withColumn("b_1", udf((s: Int) => s + 1).apply($"b")).show()
+----+----+----+----+-----+----+
| a| b| c| d| e| b_1|
+----+----+----+----+-----+----+
| a_b| 1| 1| 3.3|false| 2|
|null|null|null|null| null|null|
+----+----+----+----+-----+----+
测试数组包含,正常运行,null略过没有进行计算
scala> a.withColumn("b_1", udf((s: Int) => Array(1, 2, 3).contains(s)).apply($"b")).show()
+----+----+----+----+-----+----+
| a| b| c| d| e| b_1|
+----+----+----+----+-----+----+
| a_b| 1| 1| 3.3|false|true|
|null|null|null|null| null|null|
+----+----+----+----+-----+----+
测试长整数大小判断,正常运行,null略过没有进行计算
scala> a.withColumn("c_1", udf((s: Long) => if (s >= 1L) 10L else 5L).apply($"c")).show()
+----+----+----+----+-----+----+
| a| b| c| d| e| c_1|
+----+----+----+----+-----+----+
| a_b| 1| 1| 3.3|false| 10|
|null|null|null|null| null|null|
+----+----+----+----+-----+----+
测试小数大小判断,正常运行,null略过没有进行计算
scala> a.withColumn("d_1", udf((s:Double) => s == 3.3).apply($"d")).show()
+----+----+----+----+-----+----+
| a| b| c| d| e| d_1|
+----+----+----+----+-----+----+
| a_b| 1| 1| 3.3|false|true|
|null|null|null|null| null|null|
+----+----+----+----+-----+----+
怀疑在UDF中指定了数值类型则匹配不到null,因此在UDF中使用Any
尝试以下
scala> a.withColumn("d_1", udf((s:Any) => s.toString.toDouble +1).apply($"d")).show()
Caused by: java.lang.NullPointerException
直接报错空指针,因此null值没有被处理是因为UDF中类型没有匹配上null
数值列结论:
- 数值列列中null不会被UDF(指定Int,Long,Double)处理,会直接略过
- UDF中指定了Int,Long,Double,实际上根部匹配不到null值,从而导致null直接略过,可以使用Scala: Any匹配
- UDF由于不会匹配到null,因此不会产生异常
(3)Scala集合(Seq,Map)
使用Scala的集合作为列的元素,同样使用Option来实现带有空值和Scala集合的列
scala> val a = Seq((Some(Seq(1, 2)), Some(Seq("1", "2")), Some(Seq(Seq(1,2), Seq(2, 3))), Some(Map("a" -> 1)), Some(Set(1, 2))), (None, None,None, None, None)).toDF("a", "b", "c", "d", "e")
a: org.apache.spark.sql.DataFrame = [a: array<int>, b: array<string> ... 3 more fields]
scala> a.printSchema
root
|-- a: array (nullable = true)
| |-- element: integer (containsNull = false)
|-- b: array (nullable = true)
| |-- element: string (containsNull = true)
|-- c: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: integer (containsNull = false)
|-- d: map (nullable = true)
| |-- key: string
| |-- value: integer (valueContainsNull = false)
|-- e: array (nullable = true)
| |-- element: integer (containsNull = false)
scala> a.show()
+------+------+----------------+--------+------+
| a| b| c| d| e|
+------+------+----------------+--------+------+
|[1, 2]|[1, 2]|[[1, 2], [2, 3]]|[a -> 1]|[1, 2]|
| null| null| null| null| null|
+------+------+----------------+--------+------+
其中Scala的Set,Array都被DataFrame推断为了array(Seq)
测试数组取某个或者某些元素,null值被处理,报错空指针
scala> a.withColumn("b_1", udf((s: Seq[String]) => s(0)).apply($"b")).show()
Caused by: java.lang.NullPointerException
scala> a.withColumn("a_1", udf((s: Seq[Int]) => s(0)).apply($"a")).show()
Caused by: java.lang.NullPointerException
测试多维数组取元素,null值被处理,报错空指针
scala> a.withColumn("c_1", udf((s: Seq[Seq[Int]]) => s(0)).apply($"c")).show()
Caused by: java.lang.NullPointerException
测试数组包含,null值被处理,报错空指针
scala> a.withColumn("e_1", udf((s: Seq[String]) => s.contains("1")).apply($"e")).show()
Caused by: java.lang.NullPointerException
测试Map字段提取Key Value,null值被处理,报错空指针
scala> a.withColumn("d_1", udf((s: Map[String, Int]) => s.get("a")).apply($"d")).show()
Caused by: java.lang.NullPointerException
尝试一下匹配上但是不使用会导致空指针的操作
scala> a.withColumn("e_1", udf((s: Seq[String]) => 100).apply($"e")).show()
+------+------+----------------+--------+------+---+
| a| b| c| d| e|e_1|
+------+------+----------------+--------+------+---+
|[1, 2]|[1, 2]|[[1, 2], [2, 3]]|[a -> 1]|[1, 2]|100|
| null| null| null| null| null|100|
+------+------+----------------+--------+------+---+
结果是可以通过不报错,和String类型结论一致
最后测试一下使用spark.sql.functions.array_contains下的函数操作array,执行成功,null值直接略过,非null处理逻辑正确
scala> a.withColumn("a_1", array_contains($"a", 1)).show()
+------+------+----------------+--------+------+----+
| a| b| c| d| e| a_1|
+------+------+----------------+--------+------+----+
|[1, 2]|[1, 2]|[[1, 2], [2, 3]]|[a -> 1]|[1, 2]|true|
| null| null| null| null| null|null|
+------+------+----------------+--------+------+----+
Scala集合列结论:
- Scala集合列(array,map)中null会被UDF处理
- 在匹配上字符串之后,UDF中的逻辑会按照Scala的语法进行处理,如果Scala语法报错例如空指针则UDF执行报错,如果Scala语法无误则UDF执行成功
- UDF不会自动对null值做异常处理,但是spark.sql.function下的函数会对null做异常处理,处理方式是略过处理,直接返回null
如果要实现functions下自动处理为null,UDF需要加入null判断
scala> a.withColumn("a_1", udf((s: Seq[Int]) => if (s == null) None else Some(s(0))).apply($"a")).show()
+------+------+----------------+--------+------+----+
| a| b| c| d| e| a_1|
+------+------+----------------+--------+------+----+
|[1, 2]|[1, 2]|[[1, 2], [2, 3]]|[a -> 1]|[1, 2]| 1|
| null| null| null| null| null|null|
+------+------+----------------+--------+------+----+
(4)布尔(Scala:Boolean)
重新构建一个含有null的Boolean列的DataFrame
scala> val a = Seq((Some(false)),(None)).toDF("a")
scala> a.show()
+-----+
| a|
+-----+
|false|
| null|
+-----+
测试Boolean匹配,但不不对Boolean做任何操作,结果匹配不上,null略过
scala> a.withColumn("a_1", udf((s: Boolean) =>10).apply($"a")).show()
+-----+----+
| a| a_1|
+-----+----+
|false| 10|
| null|null|
+-----+----+
测试Boolean并且对Boolean取反,结果null匹配不上,null略过
scala> a.withColumn("a_1", udf((s: Boolean) => !s).apply($"a")).show()
+-----+----+
| a| a_1|
+-----+----+
|false|true|
| null|null|
+-----+----+
如果采用spark.sql.functions.not也是同样略过
scala> a.withColumn("a_1", not($"a")).show()
+-----+----+
| a| a_1|
+-----+----+
|false|true|
| null|null|
+-----+----+
布尔列结论:
- 布尔列中null不会被UDF中的Boolean处理,会直接略过,可以使用Any匹配
(5)时间日期(Java:java.sql.Timestamp)
DataFrame的timestamp列是java.sql.Timestamp,先创建一个带有null的timestmap列
scala> val a = Seq((Some("2020-01-01")),(None)).toDF("a").select($"a".cast("timestamp"))
a: org.apache.spark.sql.DataFrame = [a: timestamp]
scala> a.show()
+-------------------+
| a|
+-------------------+
|2020-01-01 00:00:00|
| null|
+-------------------+
测试调用java.sql.TImestamp的getYear方法(计算离1900年多少年),匹配成功,null值被处理,报错空指针
scala> a.withColumn("a_1", udf((s: java.sql.Timestamp) => s.getYear ).apply($"a")).show()
Caused by: java.lang.NullPointerException
测试调用spark.sql.function下的方法,null值略过
scala> a.withColumn("a_1", date_format($"a", "yyyy-MM-dd")).show()
+-------------------+----------+
| a| a_1|
+-------------------+----------+
|2020-01-01 00:00:00|2020-01-01|
| null| null|
+-------------------+----------+
时间戳列结论:
- 时间戳列中null会被UDF中的
java.sql.Timestamp
处理- 在匹配上
java.sql.Timestamp
之后,UDF中的逻辑会按照Java的语法进行处理,如果Java语法报错例如空指针则UDF执行报错,如果Java语法无误则UDF执行成功- UDF不会自动对null值做异常处理,但是spark.sql.function下的函数会对null做异常处理,处理方式是略过处理,直接返回null