再谈RDD、DataFrame、DataSet关系以及相互转换(JAVA API)

Spark提供了三种主要的与数据相关的API:

  • RDD
  • DataFrame
  • DataSet
三者图示

下面详细介绍下各自的特点:

RDD

主要描述:RDD是Spark提供的最主要的一个抽象概念(Resilient Distributed Dataset),它是一个element的collection,分区化的位于集群的节点中,支持并行处理。

  • RDD的特性

    • 分布式:
      RDD使用MapReduce算子来广泛的适应在集群中并行分布式的大数据集的处理和产生。并且方便用户使用高级别的算子在并行计算中。
    • 不可变:
      RDD是由一个records的collection组成,而且是分区的。分区是RDD并行化的基础单元,而且每个分区就是对数据的逻辑分割,它是不可变的,它是通过已经存在的分区的某些transformations创建得到。这种不可变性方便在计算中做到数据一致性。
    • 错误容忍:
      在实际中如果我们丢失了RDD的部分分区,可以通过对丢失分区关联性的transformation重新计算得到。而不是在众多节点中做数据的复制等操作。这个特性是RDD的最大优点,它节省了大量的数据管理、复制等操作,使得计算速度更快。
    • 惰性执行:
      所有的transformation都是惰性的,他们并不是立刻计算出结果,而是只是记住了各个transformation对数据集的依赖关系。当driver程序需要一个action结果时才开始执行。
    • 功能支持:
      RDD支持两种类型的算子:transformation是指从已经存在的数据集中计算得到新的数据集;action是指通过对通过对数据集的计算得到一个结果返回给driver。
    • 数据格式:
      轻松且有效支持各种数据,包括结构化的和非结构化的。
    • 编程语言:
      RDD的API支持Scala、Java、Python和R
  • RDD的限制

    • 没有内置的优化引擎
      当对结构化的数据进行处理时,RDD没有使用Spark的高级优化器,比如catalyst优化器和Tungsten执行引擎。
    • 处理结构化的数据
      不像Dataframe或者Dataset,RDD不会主动推测出数据的schema,而是需要用户在代码里指示。

DataFrame

Spark从1.3版本开始引入Dataframe,它克服了RDD的最主要的挑战。

主要描述:Dataframe是一个分布式的数据collection,而且将数据按照列名进行组织。在概念上它与关系型的数据库的表或者R/Python语言中的DataFrame类似。与之一起提供的还有,Spark引入了catalyst优化器,它可以优化查询。

  • DataFrame的特性

    • 分布式的Row对象的Collection:
      分布式、列名组织的数据、后台优化。
      具体到代码里面,Dataframe就是Dataset<Row>
    • 数据处理:
      处理支持结构或者非结构化的格式(比如Avro, CSV, elastic search, 以及Cassandra)以及不同的文件系统(HDFS, HIVE tables, MySQL, etc)。它支持非常多的数据源
    • 使用catalyst优化器优化:
      它对SQL查询以及DataFrame API都提供优化支持。Dataframe使用catalyst树transformation框架有四个步骤:
      1、Analyzing a logical plan to resolve references
      2、Logical plan optimization
      3、Physical planning
      4、Code generation to compile parts of the query to Java bytecode.
    • Hive兼容性:
      使用Spark的SQL可以无修改的支持Hive查询在已经存在的Hive warehouses。它重用了Hive的前端、MetaStore并且对已经存在的Hive数据、查询和UDF提供完整的兼容性。
    • Tungsten:
      Tungsten提供了一个物理执行后端,管理内存动态产生expression evaluation的字节码
    • 编程语言:
      Dataframe API支持Scala、Java、Python和R
  • DataFrame的限制

    • 没有编译阶段的类型检查:
      不能在编译时刻对安全性做出检查,而且限制了用户对于未知结构的数据进行操作。比如下面代码在编译时没有错误,但是在执行时会出现异常:
    case class Person(name : String , age : Int) 
    val dataframe = sqlContect.read.json("people.json") 
    dataframe.filter("salary > 10000").show 
    => throws Exception : cannot resolve 'salary' given input age , name
    
    • 不能保留类对象的结构:
      一旦把一个类结构的对象转成了Dataframe,就不能转回去了。下面这个栗子就是指出了:
    case class Person(name : String , age : Int)
    val personRDD = sc.makeRDD(Seq(Person("A",10),Person("B",20)))
    val personDF = sqlContect.createDataframe(personRDD)
    personDF.rdd // returns RDD[Row] , does not returns RDD[Person]
    

DataSet

主要描述:Dataset API是对DataFrame的一个扩展,使得可以支持类型安全的检查,并且对类结构的对象支持程序接口。它是强类型的,不可变collection,并映射成一个相关的schema。
Dataset API的核心是一个被称为Encoder的概念。它是负责对JVM的对象以及表格化的表达(tabular representation)之间的相互转化。
表格化的表达在存储时使用了Spark内置的Tungsten二进制形式,允许对序列化数据操作并改进了内存使用。在Spark 1.6版本之后,支持自动化生成Encoder,可以对广泛的primitive类型(比如String,Integer,Long等)、Scala的case class以及Java Bean自动生成对应的Encoder。

  • DataSet的特性

    • 支持RDD和Dataframe的优点:
      包括RDD的类型安全检查,Dataframe的关系型模型,查询优化,Tungsten执行,排序和shuffling。
    • Encoder:
      通过使用Encoder,用户可以轻松转换JVM对象到一个Dataset,允许用户在结构化和非结构化的数据操作。
    • 编程语言:
      Scala和Java
    • 类型安全检查:
      提供编译阶段的安全类型检查。比如下面这个栗子:
    case class Person(name : String , age : Int)
    val personRDD = sc.makeRDD(Seq(Person("A",10),Person("B",20)))
    val personDF = sqlContect.createDataframe(personRDD)
    val ds:Dataset[Person] = personDF.as[Person]
    ds.filter(p => p.age > 25)
    ds.filter(p => p.salary > 25)
     // error : value salary is not a member of person
    ds.rdd // returns RDD[Person]
    
    • 相互转换:
      Dataset可以让用户轻松从RDD和Dataframe转换到Dataset不需要额外太多代码。
  • DataSet的限制

    • 需要把类型转成String:
      Querying the data from datasets currently requires us to specify the fields in the class as a string. Once we have queried the data, we are forced to cast column to the required data type. On the other hand, if we use map operation on Datasets, it will not use Catalyst optimizer.
      比如:
    ds.select(col("name").as[String], $"age".as[Int]).collect()
    

Java API中三种数据格式的相互转换

首先构造一个数据集,是由Person类的结构组成的,然后在此之上看这三个API实例的构造以及相互转换

  • 数据创建
Person类的定义
数据创建
  • 直接构建出 JavaRDD<Person>

    JavaRDD<Person> personJavaRDD = jsc.parallelize(personList);
    System.out.println("1. 直接构建出 JavaRDD<Person>");
    personJavaRDD.foreach(element -> System.out.println(element.toString()));
    

    Print结果:

    直接构建出 JavaRDD<Person>
    Person: name = Andy, age = 32
    Person: name = Michael, age = 23
    Person: name = Justin, age = 19

  • 直接构建出 Dataset<Person>

          Encoder<Person> personEncoder = Encoders.bean(Person.class);
          Dataset<Person> personDS = spark.createDataset(personList, personEncoder);
          System.out.println("2. 直接构建出 Dataset<Person>");
          personDS.show();
          personDS.printSchema();
    

    Print结果:

    1. 直接构建出 Dataset<Person>
      +---+-------+
      |age| name|
      +---+-------+
      | 32| Andy|
      | 23|Michael|
      | 19| Justin|
      +---+-------+
      root
      |-- age: integer (nullable = false)
      |-- name: string (nullable = true)
  • 直接构建出 Dataset<Row>

          Dataset<Row> personDF = spark.createDataFrame(personList, Person.class);
          System.out.println("3. 直接构建出 Dataset<Row>");
          personDF.show();
          personDF.printSchema();
    

    Print结果:

    1. 直接构建出 Dataset<Row>
      +---+-------+
      |age| name|
      +---+-------+
      | 32| Andy|
      | 23|Michael|
      | 19| Justin|
      +---+-------+
      root
      |-- age: integer (nullable = false)
      |-- name: string (nullable = true)
  • JavaRDD<Person> -> Dataset<Person>

          personDS = spark.createDataset(personJavaRDD.rdd(), personEncoder);
          System.out.println("1->2 JavaRDD<Person> -> Dataset<Person>");
          personDS.show();
          personDS.printSchema();
    

    Print结果:

    1->2 JavaRDD<Person> -> Dataset<Person>
    +---+-------+
    |age| name|
    +---+-------+
    | 32| Andy|
    | 23|Michael|
    | 19| Justin|
    +---+-------+
    root
    |-- age: integer (nullable = true)
    |-- name: string (nullable = true)

  • JavaRDD<Person> -> Dataset<Row>

          personDF = spark.createDataFrame(personJavaRDD, Person.class);
          System.out.println("1->3 JavaRDD<Person> -> Dataset<Row>");
          personDF.show();
          personDF.printSchema();
    

    Print结果:

    1->3 JavaRDD<Person> -> Dataset<Row>
    +---+-------+
    |age| name|
    +---+-------+
    | 32| Andy|
    | 23|Michael|
    | 19| Justin|
    +---+-------+
    root
    |-- age: integer (nullable = false)
    |-- name: string (nullable = true)

  • 补充从JavaRDD<Row>到Dataset<Row>

          JavaRDD<Row> personRowRdd = personJavaRDD.map(person -> RowFactory.create(person.age, person.name));
          List<StructField> fieldList = new ArrayList<>();
          fieldList.add(DataTypes.createStructField("age", DataTypes.IntegerType, false));
          fieldList.add(DataTypes.createStructField("name", DataTypes.StringType, false));
          StructType rowAgeNameSchema = DataTypes.createStructType(fieldList);
          personDF = spark.createDataFrame(personRowRdd, rowAgeNameSchema);
          System.out.println("\n\n\n补充,由JavaRDD<Row> -> Dataset<Row>");
          personDF.show();
          personDF.printSchema();
    

    主要就是使用RowFactory把Row中的每一项写好后,通过spark的createDataFrame来创建。其中对于Row的解读包含在了自建的StructType中。

  • Dataset<Person> -> JavaRDD<Person>

          personJavaRDD = personDS.toJavaRDD();
          System.out.println("2->1 Dataset<Person> -> JavaRDD<Person>");
          personJavaRDD.foreach(element -> System.out.println(element.toString()));
    

    Print结果:

    2->1 Dataset<Person> -> JavaRDD<Person>
    Person: name = Justin, age = 19
    Person: name = Andy, age = 32
    Person: name = Michael, age = 23

  • Dataset<Row> -> JavaRDD<Person>

          personJavaRDD = personDF.toJavaRDD().map(row -> {
              String name = row.getAs("name");
              int age = row.getAs("age");
              return new Person(name, age);
          });
          System.out.println("3->1 Dataset<Row> -> JavaRDD<Person>");
          personJavaRDD.foreach(element -> System.out.println(element.toString()));
    

    Print结果:

    3->1 Dataset<Row> -> JavaRDD<Person>
    Person: name = Justin, age = 19
    Person: name = Michael, age = 23
    Person: name = Andy, age = 32

  • Dataset<Person> -> Dataset<Row>

          List<StructField> fieldList = new ArrayList<>();
          fieldList.add(DataTypes.createStructField("name", DataTypes.StringType, false));
          fieldList.add(DataTypes.createStructField("age", DataTypes.IntegerType, false));
          StructType rowSchema = DataTypes.createStructType(fieldList);
          ExpressionEncoder<Row> rowEncoder = RowEncoder.apply(rowSchema);
          Dataset<Row> personDF_fromDS = personDS.map(
                  (MapFunction<Person, Row>) person -> {
                      List<Object> objectList = new ArrayList<>();
                      objectList.add(person.name);
                      objectList.add(person.age);
                      return RowFactory.create(objectList.toArray());
                  },
                  rowEncoder
          );
          System.out.println("2->3 Dataset<Person> -> Dataset<Row>");
          personDF_fromDS.show();
          personDF_fromDS.printSchema();
    

    Print结果:

    2->3 Dataset<Person> -> Dataset<Row>
    +---+-------+
    |age| name|
    +---+-------+
    | 32| Andy|
    | 23|Michael
    | 19| Justin|
    +---+-------+
    root
    |-- age: integer (nullable = false)
    |-- name: string (nullable = true)

  • Dataset<Row> -> Dataset<Person>

          personDS = personDF.map(new MapFunction<Row, Person>() {
              @Override
              public Person call(Row value) throws Exception {
                  return new Person(value.getAs("name"), value.getAs("age"));
              }
          }, personEncoder);
          System.out.println("3->2 Dataset<Row> -> Dataset<Person>");
          personDS.show();
          personDS.printSchema();
    

    Print结果:

    3->2 Dataset<Row> -> Dataset<Person>
    +---+-------+
    |age| name|
    +---+-------+
    | 32| Andy|
    | 23|Michael|
    | 19| Justin|
    +---+-------+
    root
    |-- age: integer (nullable = true)
    |-- name: string (nullable = true)

总结:
其实RDD的Map和Dataset的Map只有一点不同,就是Dataset的Map要指定一个Encoder的参数。

需要用Encoder类给出
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,126评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,254评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,445评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,185评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,178评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,970评论 1 284
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,276评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,927评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,400评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,883评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,997评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,646评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,213评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,204评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,423评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,423评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,722评论 2 345

推荐阅读更多精彩内容