[Spark] Schema Inference - Spark推断Schema的实现

1. 背景

Spark在的Dataframe在使用的过程中或涉及到schema的问题,schema就是这个Row的数据结构(StructType),在代码中就是这个类的定义。如果你想解析一个json或者csv文件成dataframe,那么就需要知道他的StructType。

徒手写一个复杂类的StructType是个吃力不讨好的事情,所以Spark默认是支持自动推断schema的。但是如果使用流处理(Streaming)的话,他的支持力度是很受限的,最近在做Streaming处理的时候,遇到一些schema inference的问题,所以借机学习整理下Spark源码是如何实现的。

2. Spark版本

以下的代码基于spark的版本:

项目 version
scala 2.11
spark-core 2.4.0
spark-sql 2.4.0
mongo-spark-connector 2.11

gradle的配置:

providedRuntime group: 'org.apache.spark', name: 'spark-core_2.11', version: '2.4.0'
providedRuntime group: 'org.apache.spark', name: 'spark-sql_2.11', version: '2.4.0'
providedRuntime group: 'org.mongodb.spark', name: 'mongo-spark-connector_2.11', version: '2.3.1'

3. Schema inference

3.1 spark的Schema inference

3.1.1 通过DDL来解析Schema

DDL的格式类似于:"a INT, b STRING, c DOUBLE",

深入学习看这里:Open Data Description Language (OpenDDL)

StructType提供了接口直接通过解析DDL来识别StructType

    this.userSpecifiedSchema = Option(StructType.fromDDL(schemaString))

先把DDL string解析成SqlBaseLexer

val lexer = new SqlBaseLexer(new UpperCaseCharStream(CharStreams.fromString(command)))

然后, 然后...就看的不太懂了...

3.1.2 解析一个Json的Schema

Spark中Dataframe的文件读取是通过DataFrameReader来完成的.

都是通过DataSet的ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan)方法转为DataFrame

  def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan): DataFrame = {
    val qe = sparkSession.sessionState.executePlan(logicalPlan)
    qe.assertAnalyzed()
    new Dataset[Row](sparkSession, qe, RowEncoder(qe.analyzed.schema))
  }

schema是由QueryExecution得到的

  def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan): DataFrame = {
    val qe = sparkSession.sessionState.executePlan(logicalPlan)
    qe.assertAnalyzed()
    new Dataset[Row](sparkSession, qe, RowEncoder(qe.analyzed.schema))
  }

其中的qe.analyzed.schema这句就是QueryExecution先分析生成LogicPlan,分析的源码在CheckAnalysis.scala中的def checkAnalysis(plan: LogicalPlan): Unit

 def checkAnalysis(plan: LogicalPlan): Unit = {
    // We transform up and order the rules so as to catch the first possible failure instead
    // of the result of cascading resolution failures.
    plan.foreachUp {

      case p if p.analyzed => // Skip already analyzed sub-plans

      case u: UnresolvedRelation =>
        u.failAnalysis(s"Table or view not found: ${u.tableIdentifier}")

      case operator: LogicalPlan =>
        // Check argument data types of higher-order functions downwards first.
        // If the arguments of the higher-order functions are resolved but the type check fails,
        // the argument functions will not get resolved, but we should report the argument type
        // check failure instead of claiming the argument functions are unresolved.
        operator transformExpressionsDown {
          case hof: HigherOrderFunction
              if hof.argumentsResolved && hof.checkArgumentDataTypes().isFailure =>
            hof.checkArgumentDataTypes() match {
              case TypeCheckResult.TypeCheckFailure(message) =>
                hof.failAnalysis(
                  s"cannot resolve '${hof.sql}' due to argument data type mismatch: $message")
            }
      
      
      ...
      ...
      
}       

最终由Logic的output: Seq[Attribute]为StructType:

  lazy val schema: StructType = StructType.fromAttributes(output)

具体每个Attribute转你为StructType的代码如下:

  private[sql] def fromAttributes(attributes: Seq[Attribute]): StructType =
    StructType(attributes.map(a => StructField(a.name, a.dataType, a.nullable, a.metadata)))

3.1.3 Kafka的Schema

在使用Kafka的Streaming的时候,自动推断只能推断到固定的几个StructField, 如果value是Json的话,也不会进一步解析出来。
这个是因为Kafka和json的dataSource是不一样的
DataFrame在load的时候,会有DataSource基于provider name来找到这个provider的data source的类定义

// DataSource.scala line 613
def lookupDataSource(provider: String, conf: SQLConf): Class[_] = {
    val provider1 = backwardCompatibilityMap.getOrElse(provider, provider) match {
      case name if name.equalsIgnoreCase("orc") &&
          conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "native" =>
        classOf[OrcFileFormat].getCanonicalName
      case name if name.equalsIgnoreCase("orc") &&
          conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "hive" =>
        "org.apache.spark.sql.hive.orc.OrcFileFormat"
      case "com.databricks.spark.avro" if conf.replaceDatabricksSparkAvroEnabled =>
        "org.apache.spark.sql.avro.AvroFileFormat"
      case name => name
    }
    val provider2 = s"$provider1.DefaultSource" 
    ...
}
  • 如果输入provider name是"json"返回的是JsonFileFormat
  • 如果是”kafka“返回的是KafkaSourceProvider
    KafkaSourceProvidersourceSchemakafkaSchema
  override def sourceSchema(
      sqlContext: SQLContext,
      schema: Option[StructType],
      providerName: String,
      parameters: Map[String, String]): (String, StructType) = {
    validateStreamOptions(parameters)
    require(schema.isEmpty, "Kafka source has a fixed schema and cannot be set with a custom one")
    (shortName(), KafkaOffsetReader.kafkaSchema)
  }

具体kafkaSchema的定义如下:

  def kafkaSchema: StructType = StructType(Seq(
    StructField("key", BinaryType),
    StructField("value", BinaryType),
    StructField("topic", StringType),
    StructField("partition", IntegerType),
    StructField("offset", LongType),
    StructField("timestamp", TimestampType),
    StructField("timestampType", IntegerType)
  ))

3.2 mongo-spark的Schema inference

3.2.1 MongoInferSchema源码分析

看mongoSpark源码的时候,意外从一个toDF的方法里发现了有个MongoInferSchema实现了类型推断.

  /**
   * Creates a `DataFrame` based on the schema derived from the optional type.
   *
   * '''Note:''' Prefer [[toDS[T<:Product]()*]] as computations will be more efficient.
   *  The rdd must contain an `_id` for MongoDB versions < 3.2.
   *
   * @tparam T The optional type of the data from MongoDB, if not provided the schema will be inferred from the collection
   * @return a DataFrame
   */
  def toDF[T <: Product: TypeTag](): DataFrame = {
    val schema: StructType = MongoInferSchema.reflectSchema[T]() match {
      case Some(reflectedSchema) => reflectedSchema
      case None                  => MongoInferSchema(toBsonDocumentRDD)
    }
    toDF(schema)
  }

于是研究了下,发现MongoInferSchema的实现分两种情况:

  • 给定了要解析的class类型

如果是给定了要解析的class类型,那就很好办,直接基于Spark的ScalaReflectionschemaFor函数将class转为Schema:

case class Schema(dataType: DataType, nullable: Boolean)

这个SchemaScalaReflection中定义的一个case class,本质是个catalyst DataType
所以可以再进一步直接转为StructType, 所以代码实现很简单:

ScalaReflection.schemaFor[T].dataType.asInstanceOf[StructType]
  • 未给定要解析的class类型

如果没有给定要解析的class类型,那就直接根据从mongo里读取的RDD来推断Schema. 这个具体的实现方式是对RDD进行采样,采样数可以在readConfig中设置,默认值是1000(private val DefaultSampleSize: Int = 1000).

因为从mongo读取出来的格式就是BsonDocument, 所以采样的过程就是将每个BsonDocument转为StructType

  private def getSchemaFromDocument(document: BsonDocument, readConfig: ReadConfig): StructType = {
    val fields = new util.ArrayList[StructField]()
    document.entrySet.asScala.foreach(kv => fields.add(DataTypes.createStructField(kv.getKey, getDataType(kv.getValue, readConfig), true)))
    DataTypes.createStructType(fields)
  }

然后将采样的1000个集合进行两两merge,获取兼容的类型,最终得到RootType,即为所需的Schema:

// perform schema inference on each row and merge afterwards
val rootType: DataType = sampleData
.map(getSchemaFromDocument(_, mongoRDD.readConfig))
.treeAggregate[DataType](StructType(Seq()))(
compatibleType(_, _, mongoRDD.readConfig, nested = false),
compatibleType(_, _, mongoRDD.readConfig, nested = false)
)

3.2.2 MongoInferSchema存在的问题

3.2.2.1 Java兼容性问题

虽然scala脱胎于java,但是在类型和结构上也逐渐出现了很多的不同点,包括部分基础结构和各种各样的复杂结构。所以如果要推断的类是java类,MongoInferSchema 也提供了MongoInferSchemaJava 实现类型反射:

/**
 * A helper for inferring the schema from Java
 *
 * In Spark 2.2.0 calling this method from Scala 2.10 caused compilation errors with the shadowed library in
 * `JavaTypeInference`. Moving it into Java stops Scala falling over and allows it to continue to work.
 *
 * See: SPARK-126
 */
final class MongoInferSchemaJava {

    @SuppressWarnings("unchecked")
    public static <T> StructType reflectSchema(final Class<T> beanClass) {
        return (StructType) JavaTypeInference.inferDataType(beanClass)._1();
    }

}

具体的推断实现在def inferDataType(typeToken: TypeToken[_], seenTypeSet: Set[Class[_]]函数中,代码如下,这里就不详细展开了。

 /**
   * Infers the corresponding SQL data type of a Java type.
   * @param typeToken Java type
   * @return (SQL data type, nullable)
   */
  private def inferDataType(typeToken: TypeToken[_], seenTypeSet: Set[Class[_]] = Set.empty)
    : (DataType, Boolean) = {
    typeToken.getRawType match {
      case c: Class[_] if c.isAnnotationPresent(classOf[SQLUserDefinedType]) =>
        (c.getAnnotation(classOf[SQLUserDefinedType]).udt().newInstance(), true)

      case c: Class[_] if UDTRegistration.exists(c.getName) =>
        val udt = UDTRegistration.getUDTFor(c.getName).get.newInstance()
          .asInstanceOf[UserDefinedType[_ >: Null]]
        (udt, true)
       ...
       ...
       ...
        val properties = getJavaBeanReadableProperties(other)
        val fields = properties.map { property =>
          val returnType = typeToken.method(property.getReadMethod).getReturnType
          val (dataType, nullable) = inferDataType(returnType, seenTypeSet + other)
          new StructField(property.getName, dataType, nullable)
        }
        (new StructType(fields), true)
    }
  }

所以如果大家要使用mongo-spark的类型推断,那么可以基于scala和java封装2个接口函数用于Schema Infer, 下面是我自己封装的2个函数:

  /**
    * @see [[MongoInferSchema.apply]]
    */
  protected def inferSchemaScala[T <: Product : TypeTag](): StructType = {
    MongoInferSchema.reflectSchema[T]() match {
      case Some(reflectedSchema) => reflectedSchema
      // canonicalizeType erases all empty structs, including the only one we want to keep
      case None => StructType(Seq())
    }
  }

  /**
    * @see [[MongoInferSchema.apply]]
    */
  protected def inferSchemaJava[T](beanClass: Class[T]): StructType = {
    MongoInferSchema.reflectSchema(beanClass)
  }

3.2.2.2 采样推断不准确问题

产生不准确的原因在于:

  • 采用点不完整
    毕竟是采样,如果某个字段在采样点没出现,则会导致最终推断的不准确
  • 集合类结构泛型推断错误
    另外一个问题是,比如字段里有个Map[String , String]类型,可能会把其中的key推断成不同的StrutType,而不是统一推断成String。我自己做过测试,会一定程度上依赖某些key是否会高频出现,所以说这种infer schema具有不确定性。

解决方案:

  • 使用的数据结构尽量简单,不要有嵌套或者复杂结构
    但这种情况,真正的生产环境很难,大部分公司的代码结构,迭代了那么久,怎么会那么简单呢,对吧?
  • 给定要解析的class类型
    这个是个很好的方案,可以确保没有错误,同时,如果类的字段或者结构发生变化了,可以确保无缝兼容,不用重新修改代码。

4. 总结

以上介绍了几种spark内部实现 schema inference 源码和使用方式。在日常大部分工作中这些东西都是被spark隐藏的,而且如果没有特殊场景,也是不需要涉及到这里的东西。我是因为刚好遇到Spark Streaming读写Kafka的Topic,但发现读到的RDD/DataFrame没有很好的解析Schema,于是研究了下相关的实现。
最终基于项目选择了MongoInferSchema的实现方式,友好的解决了问题。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,126评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,254评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,445评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,185评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,178评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,970评论 1 284
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,276评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,927评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,400评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,883评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,997评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,646评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,213评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,204评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,423评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,423评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,722评论 2 345

推荐阅读更多精彩内容