Spark SQL:分类讨论UDF对DataFrame列存在空值null的处理

摘要:Spark SQL

先上结论

  • 空指针的情况:在UDF中正确指定了对应类型的前提下,DataFrame中的列如果是stringtimestamp,集合(arraymap)类型,null值都会被匹配到,如果UDF逻辑需要对null进行操作,会导致空指针
  • null略过返回null的情况:在UDF中正确指定了对应类型的前提下,DataFrame中的数值类型(IntLongDouble),以及布尔类型,null值都会被略过,此时UDF还是返回null,如果UDF中需要apply多列,只要有其中任何一列是null,整一行都不会被匹配到
  • functions下自带函数null略过spark.sql.functions下的函数会自动略过null,依旧返回null,如果针对string等不会自动略过null的情况,UDF中要对null进行判断,如果需要依旧返回null则UDF返回None: Option

按照数据类型测试

先准备一个DataFrame,使用Scala的Option来构造出含有null的列

scala> val a = Seq((Some("a_b"), Some(1), Some(1L), Some(3.3), Some(false)), (None, None,None, None, None)).toDF("a", "b", "c", "d", "e")
a: org.apache.spark.sql.DataFrame = [a: string, b: int ... 3 more fields]

scala> a.show()
+----+----+----+----+-----+
|   a|   b|   c|   d|    e|
+----+----+----+----+-----+
| a_b|   1|   1| 3.3|false|
|null|null|null|null| null|
+----+----+----+----+-----+

scala> a.printSchema
root
 |-- a: string (nullable = true)
 |-- b: integer (nullable = true)
 |-- c: long (nullable = true)
 |-- d: double (nullable = true)
 |-- e: boolean (nullable = true)

(1)字符串列(Scala/Java:String)

Scala的String和Java是一样的,测试以下:
测试字符串split,报错空指针,说明null(None: Option[String])被处理

scala> a.withColumn("a_1", udf((s: String) => s.split("_")(0)).apply($"a")).show()
Caused by: java.lang.NullPointerException

测试字符串slice,报错空指针,说明null(None: Option[String])被处理

scala> a.withColumn("a_1", udf((s: String) => s.slice(2, 3)).apply($"a")).show()
Caused by: java.lang.NullPointerException

测试不对字符串做任何操作,只要匹配上就给到一个任意输出,正常运行

scala> a.withColumn("a_1", udf((s: String) => "3").apply($"a")).show()
+----+----+----+----+-----+---+
|   a|   b|   c|   d|    e|a_1|
+----+----+----+----+-----+---+
| a_b|   1|   1| 3.3|false|  3|
|null|null|null|null| null|  3|
+----+----+----+----+-----+---+

测试字符串相加,正常运行

scala> a.withColumn("a_1", udf((s: String) => s + "123").apply($"a")).show()
+----+----+----+----+-----+-------+
|   a|   b|   c|   d|    e|    a_1|
+----+----+----+----+-----+-------+
| a_b|   1|   1| 3.3|false| a_b123|
|null|null|null|null| null|null123|
+----+----+----+----+-----+-------+

测试使用spark.sql.functions.split下的自带函数,null值略过

scala> a.withColumn("a", split($"a", "_")).show()
+------+----+----+----+-----+
|     a|   b|   c|   d|    e|
+------+----+----+----+-----+
|[a, b]|   1|   1| 3.3|false|
|  null|null|null|null| null|
+------+----+----+----+-----

字符串列结论

  • 字符串列中null会被UDF处理
  • 在匹配上字符串之后,UDF中的逻辑会按照Scala的语法进行处理,如果Scala语法报错例如空指针则UDF执行报错,如果Scala语法无误则UDF执行成功
  • UDF不会自动对null值做异常处理,但是spark.sql.function下的函数会对null做异常处理,处理方式是略过处理,直接返回null

如果要实现functions下的split返回null,UDF需要加null判断

scala> a.withColumn("a_1", udf((s: String) => if (s == null) null else s.split("_")).apply($"a")).show()
+----+----+----+----+-----+------+
|   a|   b|   c|   d|    e|   a_1|
+----+----+----+----+-----+------+
| a_b|   1|   1| 3.3|false|[a, b]|
|null|null|null|null| null|  null|
+----+----+----+----+-----+------+
scala> a.withColumn("a_1", udf((s: String) => if (s == null) None else Some(s.split("_"))).apply($"a")).show()
+----+----+----+----+-----+------+
|   a|   b|   c|   d|    e|   a_1|
+----+----+----+----+-----+------+
| a_b|   1|   1| 3.3|false|[a, b]|
|null|null|null|null| null|  null|
+----+----+----+----+-----+------+

(2)整数,长整数,小数列(Scala:Int,Long,Double)

Int和Long是Scala的类型,测试一下
测试整数相加,正常运行,null略过没有进行计算

scala> a.withColumn("b_1", udf((s: Int) => s + 1).apply($"b")).show()
+----+----+----+----+-----+----+
|   a|   b|   c|   d|    e| b_1|
+----+----+----+----+-----+----+
| a_b|   1|   1| 3.3|false|   2|
|null|null|null|null| null|null|
+----+----+----+----+-----+----+

测试数组包含,正常运行,null略过没有进行计算

scala> a.withColumn("b_1", udf((s: Int) => Array(1, 2, 3).contains(s)).apply($"b")).show()
+----+----+----+----+-----+----+
|   a|   b|   c|   d|    e| b_1|
+----+----+----+----+-----+----+
| a_b|   1|   1| 3.3|false|true|
|null|null|null|null| null|null|
+----+----+----+----+-----+----+

测试长整数大小判断,正常运行,null略过没有进行计算

scala> a.withColumn("c_1", udf((s: Long) => if (s >= 1L) 10L else 5L).apply($"c")).show()
+----+----+----+----+-----+----+
|   a|   b|   c|   d|    e| c_1|
+----+----+----+----+-----+----+
| a_b|   1|   1| 3.3|false|  10|
|null|null|null|null| null|null|
+----+----+----+----+-----+----+

测试小数大小判断,正常运行,null略过没有进行计算

scala> a.withColumn("d_1", udf((s:Double) => s == 3.3).apply($"d")).show()
+----+----+----+----+-----+----+
|   a|   b|   c|   d|    e| d_1|
+----+----+----+----+-----+----+
| a_b|   1|   1| 3.3|false|true|
|null|null|null|null| null|null|
+----+----+----+----+-----+----+

怀疑在UDF中指定了数值类型则匹配不到null,因此在UDF中使用Any尝试以下

scala> a.withColumn("d_1", udf((s:Any) => s.toString.toDouble +1).apply($"d")).show()
Caused by: java.lang.NullPointerException

直接报错空指针,因此null值没有被处理是因为UDF中类型没有匹配上null

数值列结论

  • 数值列列中null不会被UDF(指定Int,Long,Double)处理,会直接略过
  • UDF中指定了Int,Long,Double,实际上根部匹配不到null值,从而导致null直接略过,可以使用Scala: Any匹配
  • UDF由于不会匹配到null,因此不会产生异常

(3)Scala集合(Seq,Map)

使用Scala的集合作为列的元素,同样使用Option来实现带有空值和Scala集合的列

scala> val a = Seq((Some(Seq(1, 2)), Some(Seq("1", "2")), Some(Seq(Seq(1,2), Seq(2, 3))), Some(Map("a" -> 1)), Some(Set(1, 2))), (None, None,None,  None, None)).toDF("a", "b", "c", "d", "e") 
a: org.apache.spark.sql.DataFrame = [a: array<int>, b: array<string> ... 3 more fields]

scala> a.printSchema
root
 |-- a: array (nullable = true)
 |    |-- element: integer (containsNull = false)
 |-- b: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- c: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: integer (containsNull = false)
 |-- d: map (nullable = true)
 |    |-- key: string
 |    |-- value: integer (valueContainsNull = false)
 |-- e: array (nullable = true)
 |    |-- element: integer (containsNull = false)

scala> a.show()
+------+------+----------------+--------+------+
|     a|     b|               c|       d|     e|
+------+------+----------------+--------+------+
|[1, 2]|[1, 2]|[[1, 2], [2, 3]]|[a -> 1]|[1, 2]|
|  null|  null|            null|    null|  null|
+------+------+----------------+--------+------+

其中Scala的Set,Array都被DataFrame推断为了array(Seq)
测试数组取某个或者某些元素,null值被处理,报错空指针

scala> a.withColumn("b_1", udf((s: Seq[String]) => s(0)).apply($"b")).show()
Caused by: java.lang.NullPointerException
scala> a.withColumn("a_1", udf((s: Seq[Int]) => s(0)).apply($"a")).show()
Caused by: java.lang.NullPointerException

测试多维数组取元素,null值被处理,报错空指针

scala> a.withColumn("c_1", udf((s: Seq[Seq[Int]]) => s(0)).apply($"c")).show()
Caused by: java.lang.NullPointerException

测试数组包含,null值被处理,报错空指针

scala> a.withColumn("e_1", udf((s: Seq[String]) => s.contains("1")).apply($"e")).show()
Caused by: java.lang.NullPointerException

测试Map字段提取Key Value,null值被处理,报错空指针

scala> a.withColumn("d_1", udf((s: Map[String, Int]) => s.get("a")).apply($"d")).show()
Caused by: java.lang.NullPointerException

尝试一下匹配上但是不使用会导致空指针的操作

scala> a.withColumn("e_1", udf((s: Seq[String]) => 100).apply($"e")).show()
+------+------+----------------+--------+------+---+
|     a|     b|               c|       d|     e|e_1|
+------+------+----------------+--------+------+---+
|[1, 2]|[1, 2]|[[1, 2], [2, 3]]|[a -> 1]|[1, 2]|100|
|  null|  null|            null|    null|  null|100|
+------+------+----------------+--------+------+---+

结果是可以通过不报错,和String类型结论一致
最后测试一下使用spark.sql.functions.array_contains下的函数操作array,执行成功,null值直接略过,非null处理逻辑正确

scala> a.withColumn("a_1", array_contains($"a", 1)).show()
+------+------+----------------+--------+------+----+
|     a|     b|               c|       d|     e| a_1|
+------+------+----------------+--------+------+----+
|[1, 2]|[1, 2]|[[1, 2], [2, 3]]|[a -> 1]|[1, 2]|true|
|  null|  null|            null|    null|  null|null|
+------+------+----------------+--------+------+----+

Scala集合列结论

  • Scala集合列(array,map)中null会被UDF处理
  • 在匹配上字符串之后,UDF中的逻辑会按照Scala的语法进行处理,如果Scala语法报错例如空指针则UDF执行报错,如果Scala语法无误则UDF执行成功
  • UDF不会自动对null值做异常处理,但是spark.sql.function下的函数会对null做异常处理,处理方式是略过处理,直接返回null

如果要实现functions下自动处理为null,UDF需要加入null判断

scala> a.withColumn("a_1", udf((s: Seq[Int]) => if (s == null) None else Some(s(0))).apply($"a")).show()
+------+------+----------------+--------+------+----+
|     a|     b|               c|       d|     e| a_1|
+------+------+----------------+--------+------+----+
|[1, 2]|[1, 2]|[[1, 2], [2, 3]]|[a -> 1]|[1, 2]|   1|
|  null|  null|            null|    null|  null|null|
+------+------+----------------+--------+------+----+

(4)布尔(Scala:Boolean)

重新构建一个含有null的Boolean列的DataFrame

scala> val a = Seq((Some(false)),(None)).toDF("a")
scala> a.show()
+-----+
|    a|
+-----+
|false|
| null|
+-----+

测试Boolean匹配,但不不对Boolean做任何操作,结果匹配不上,null略过

scala> a.withColumn("a_1", udf((s: Boolean) =>10).apply($"a")).show()
+-----+----+
|    a| a_1|
+-----+----+
|false|  10|
| null|null|
+-----+----+

测试Boolean并且对Boolean取反,结果null匹配不上,null略过

scala> a.withColumn("a_1", udf((s: Boolean) => !s).apply($"a")).show()
+-----+----+
|    a| a_1|
+-----+----+
|false|true|
| null|null|
+-----+----+

如果采用spark.sql.functions.not也是同样略过

scala> a.withColumn("a_1", not($"a")).show()
+-----+----+
|    a| a_1|
+-----+----+
|false|true|
| null|null|
+-----+----+

布尔列结论

  • 布尔列中null不会被UDF中的Boolean处理,会直接略过,可以使用Any匹配

(5)时间日期(Java:java.sql.Timestamp)

DataFrame的timestamp列是java.sql.Timestamp,先创建一个带有null的timestmap列

scala> val a = Seq((Some("2020-01-01")),(None)).toDF("a").select($"a".cast("timestamp"))
a: org.apache.spark.sql.DataFrame = [a: timestamp]

scala> a.show()
+-------------------+
|                  a|
+-------------------+
|2020-01-01 00:00:00|
|               null|
+-------------------+

测试调用java.sql.TImestamp的getYear方法(计算离1900年多少年),匹配成功,null值被处理,报错空指针

scala> a.withColumn("a_1", udf((s: java.sql.Timestamp) => s.getYear ).apply($"a")).show()
Caused by: java.lang.NullPointerException

测试调用spark.sql.function下的方法,null值略过

scala> a.withColumn("a_1", date_format($"a", "yyyy-MM-dd")).show()
+-------------------+----------+
|                  a|       a_1|
+-------------------+----------+
|2020-01-01 00:00:00|2020-01-01|
|               null|      null|
+-------------------+----------+

时间戳列结论

  • 时间戳列中null会被UDF中的java.sql.Timestamp处理
  • 在匹配上java.sql.Timestamp之后,UDF中的逻辑会按照Java的语法进行处理,如果Java语法报错例如空指针则UDF执行报错,如果Java语法无误则UDF执行成功
  • UDF不会自动对null值做异常处理,但是spark.sql.function下的函数会对null做异常处理,处理方式是略过处理,直接返回null

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,937评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,503评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,712评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,668评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,677评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,601评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,975评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,637评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,881评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,621评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,710评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,387评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,971评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,947评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,189评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,805评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,449评论 2 342

推荐阅读更多精彩内容