Spark - 动态注册UDF

昨天有位大哥问小弟一个Spark问题,他们想在不停Spark程序的情况下动态更新UDF的逻辑,他一问我这个问题的时候,本猪心里一惊,Spark**还能这么玩?我出于程序员的本能回复他肯定不行,但今天再回过来头想了一想,昨天脑子肯定进水了,回复太肤浅了,既然Spark可以通过编程方式注册UDF,当然把那位大哥的代码逻辑使用反射加载进去再调用不就行了?这不就是JVM的优势么,怪自己的反射没学到家,说搞就搞起。

分析过程

我会说这波分析过程很无聊,你还会看么?


想看更多Spark有趣的文章来关注本猪,包你跟我一样肥。

跟着本猪看一个Spark注册UDF的例子

spark.udf.register(name, (a1: String) => a1.toUpperCase)

点击register的源码进去看

一个`A1`:参数类型,`RT`:返回类型
def register[RT: TypeTag, A1: TypeTag](name: String, func: Function1[A1, RT]): UserDefinedFunction = {
    val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
    val inputTypes = Try(ScalaReflection.schemaFor[A1].dataType :: Nil).toOption
    def builder(e: Seq[Expression]) = if (e.length == 1) {
      ScalaUDF(func, dataType, e, inputTypes.getOrElse(Nil), Some(name), nullable, udfDeterministic = true)
    } else {
      ...
    }
    ...
  }
def register[RT: TypeTag, A1: TypeTag, A2: TypeTag](name: String, func: Function2[A1, A2, RT]): UserDefinedFunction
def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag](name: String, func: Function3[A1, A2, A3, RT]): UserDefinedFunction = {
def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag](name: String, func: Function4[A1, A2, A3, A4, RT]): UserDefinedFunction = {
...

1. func是上面方法的重点,既然想要动态UDF逻辑代码,那我们把Function1这个函数实现不就可以了?再利用JVM反射的技术调用,完美。
2. 顺便还看出了在scala-2.10.x版本中case class的元素是不能超过 \color{#FF0000}{22} 个的。


上面的UDF注册的原型其实是

val udf = new Function1[String,String] {
      override def apply(a1: String): String = {
        a1.toUpperCase
      }
}
spark.udf.register(name, udf)

到这里我有一个 肤浅 并且 大胆 的想法,我把那位大哥的代码放到apply方法里面调用不就行了?

再打一个广告,快关注我吧。

val udf = new Function1[String,String] {
      override def apply(a1: String): String = {
        //method.invoke(instance) //使用反射加载代码,把大哥动态逻辑方法method拿出来调用。
      }
}

1. 但是还有一些问题要解决,我不能强制我的老大哥只能传递一个参数吧,那也太年轻不懂事了,至少让他可以随意传 \color{#FF0000}{22} 参数。
2. 唯一的解决方法,就是要控制Function1Function22函数的动态生成,找了半天没发现Function的动态生成,然后还发现Spark也是根据参数长度生成FunctionN的,真**刷新本猪的三观呀。
3. 既然实现方式找到了,那就简单了,只要通过反射就能 上知天文,下知地理


既然是Spark,肯定要用Scala去写反射了。

case class ClassInfo(clazz: Class[_], instance: Any, defaultMethod: Method, methods: Map[String, Method], func:String) {
  def invoke[T](args: Object*): T = {
    defaultMethod.invoke(instance, args: _*).asInstanceOf[T]
  }
}
object ClassCreateUtils extends Logging{
  private val clazzs = new util.HashMap[String, ClassInfo]()
  private val classLoader = scala.reflect.runtime.universe.getClass.getClassLoader
  private val toolBox = universe.runtimeMirror(classLoader).mkToolBox()
  def apply(func: String): ClassInfo = this.synchronized {
    var clazz = clazzs.get(func)
    if (clazz == null) {
      val (className, classBody) = wrapClass(func)
      val zz = compile(prepareScala(className, classBody))
      val defaultMethod = zz.getDeclaredMethods.head
      val methods = zz.getDeclaredMethods
      clazz = ClassInfo(
        zz,
        zz.newInstance(),
        defaultMethod,
        methods = methods.map { m => (m.getName, m) }.toMap,
        func
      )
      clazzs.put(func, clazz)
      logInfo(s"dynamic load class => $clazz")
    }
    clazz
  }
  def compile(src: String): Class[_] = {
    val tree = toolBox.parse(src)
    toolBox.compile(tree).apply().asInstanceOf[Class[_]]
  }
  def prepareScala(className: String, classBody: String): String = {
    classBody + "\n" + s"scala.reflect.classTag[$className].runtimeClass"
  }
  def wrapClass(function: String): (String, String) = {
    val className = s"dynamic_class_${UUID.randomUUID().toString.replaceAll("-", "")}"
    val classBody =
      s"""
         |class $className{
         |  $function
         |}
            """.stripMargin
    (className, classBody)
  }
}

上面的代码是小弟给大佬写好的,不用大佬亲自动手了。


Spark 大数据更多技术文章,here

使用方法就灰常简单了我的大佬们。

val infos = ClassCreateUtils(
      """
        |def apply(name:String)=name.toUpperCase
      """.stripMargin
)
    
println(infos.defaultMethod.invoke(infos.instance,"dounine 本猪会一点点 spark"))
# 输出结果不用猜也知道是
DOUNINE 本猪会一点点 SPARK
# 也可以手动指定方法
println(infos.methods("apply").invoke(infos.instance,"dounine 本猪会一点点 spark"))

根据反射的方法信息生成FunctionN

object ScalaGenerateFuns {

  def apply(func: String): (AnyRef, Array[DataType], DataType) = {
    val (argumentTypes, returnType) = getFunctionReturnType(func)
    (generateFunction(func, argumentTypes.length), argumentTypes, returnType)
  }

  //获取方法的参数类型及返回类型
  private def getFunctionReturnType(func: String): (Array[DataType], DataType) = {
    val classInfo = ClassCreateUtils(func)
    val method = classInfo.defaultMethod
    val dataType = JavaTypeInference.inferDataType(method.getReturnType)._1
    (method.getParameterTypes.map(JavaTypeInference.inferDataType).map(_._1), dataType)
  }

  //生成22个Function
  def generateFunction(func: String, argumentsNum: Int): AnyRef = {
    lazy val instance = ClassCreateUtils(func).instance
    lazy val method = ClassCreateUtils(func).methods("apply")

    argumentsNum match {
      case 0 => new (() => Any) with Serializable with Logging {
        override def apply(): Any = {
          try {
            method.invoke(instance)
          } catch {
            case e: Exception =>
              logError(e.getMessage)
          }
        }
      }
      case 1 => new (Object => Any) with Serializable with Logging {
        override def apply(v1: Object): Any = {
          try {
            method.invoke(instance, v1)
          } catch {
            case e: Exception =>
              e.printStackTrace()
              logError(e.getMessage)
              null
          }
        }
      }
      case 2 => new ((Object, Object) => Any) with Serializable with Logging {
        override def apply(v1: Object, v2: Object): Any = {
          try {
            method.invoke(instance, v1, v2)
          } catch {
            case e: Exception =>
              logError(e.getMessage)
              null
          }
        }
      }
      //... 麻烦大佬自己去写剩下的20个了,这里装不下了,不然浏览器会崩溃的,然后电脑会重启的,为了大佬的电脑着想。
}

前戏我们都做完了,高潮的环节来了。

Spark 动态加载代码注册UDF

我们最后再照着register的实现方式,把我们动态Function注册给Spark

1. val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
2. val inputTypes = Try(ScalaReflection.schemaFor[A1].dataType :: Nil).toOption
3. def builder(e: Seq[Expression]) = if (e.length == 1) {
  ScalaUDF(func, dataType, e, inputTypes.getOrElse(Nil), 
    Some(name), nullable, udfDeterministic = true)
}
4. functionRegistry.createOrReplaceTempFunction(name, builder)

1. 这句代码比较好理解,就是获取RT返回值类型,就是我们的returnType
2. 就是参数类型,对应的修改如下

val inputTypes = Try(argumentTypes.toList).toOption

3. 刚开始看到这个时候,我是一脸???,后来看源码才发现builder是一种自定类型,源码如下

type FunctionBuilder = Seq[Expression] => Expression

改造方式如下

def builder(e: Seq[Expression]) = ScalaUDF(rf, returnType, e, inputTypes.getOrElse(Nil), Some(name))

4. 看到这句的时候我以为简单了,直接使用spark.sessionState.functionRegistry发现编译不过,看到private[sql]这个作用域的时候有点崩溃,本来是想用下面的方式注册的。

val udf = UserDefinedFunction(rf, returnType, inputTypes).withName(name)
spark.udf.register(name, udf)

是小弟我想太多了,另辟捷径,做了那么多工作难道就白费了?


关注可以安慰小弟

发现下面这句代码,瞬间找到了家的方向。

functionRegistry.registerFunction(new FunctionIdentifier(name), builder)

人生巅峰

到此,大猪的分析与编码已经完成,下面是今天给大哥的解决方案。
方法实现可以通过查询sql得到,或者接口都渴以。

    val spark = SparkSession
      .builder()
      .appName("test")
      .master("local[*]")
      .getOrCreate()

    val name = "hello"

    val (fun, argumentTypes, returnType) = ScalaSourceUDF(
      """
        |def apply(name:String)=name+" => hi"
        |""".stripMargin)

    val inputTypes = Try(argumentTypes.toList).toOption

    def builder(e: Seq[Expression]) = ScalaUDF(fun, returnType, e, inputTypes.getOrElse(Nil), Some(name))

    spark.sessionState.functionRegistry.registerFunction(new FunctionIdentifier(name), builder)

    val rdd = spark
      .sparkContext
      .parallelize(Array(("dounine", "20")))
      .map(x => Row.fromSeq(Array(x._1, x._2)))

    val types = StructType(
      Array(
        StructField("name", StringType),
        StructField("age", StringType)
      )
    )

    spark.createDataFrame(rdd, types).createTempView("log")

    spark.sql("select hello(name) from log").show(false)

真打脸,昨天还说不行的。



最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,053评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,527评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,779评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,685评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,699评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,609评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,989评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,654评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,890评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,634评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,716评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,394评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,976评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,950评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,191评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,849评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,458评论 2 342

推荐阅读更多精彩内容