在CDH环境集成Hudi

一、Hudi简介

Hudi是Hadoop Updates and Incrementals的缩写,用于管理HDFS上的大型分析数据集存储,主要目的是高效的减少入库延时。
Hudi是一个开源Spark三方库,支持在Hadoop上执行upserts/insert/delete操作。
Hudi数据集通过自定义的InputFormat与当前的Hadoop生态系统(Hive、parquet、spark)集成,使该框架对最终用户来说是无缝的。

二、Hudi安装

hudi安装需要通过Maven对从Git下载下来的源码进行编译。

1、安装maven(略)

图片.png

2、安装git

sudo yum install git

3、构建Hudi

cd /opt/software/
git clone https://github.com/apache/hudi.git && cd hudi
[root@dev110 hudi]# vim pom.xml

       <repository>
        <id>nexus-aliyun</id>
        <name>nexus-aliyun</name>
        <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
        <releases>
            <enabled>true</enabled>
        </releases>
        <snapshots>
            <enabled>false</enabled>
        </snapshots>
    </repository>

[root@dev110 hudi]# mvn clean package -DskipTests -DskipITs

三、使用Spark-shell对接Hudi

1、启动spark-shell

spark-shell启动,需要指定spark-avro模块,因为默认环境里没有,spark-avro模块版本还需要和spark版本对应,这里都是2.4.0。

[root@dev110 hudi]# spark-shell \
--packages org.apache.spark:spark-avro_2.11:2.4.0 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--jars /opt/software/hudi/packaging/hudi-spark-bundle/target/hudi-spark-bundle_2.11-0.6.1-SNAPSHOT.jar 
图片.png

2、导入包和类,设置表名

scala> import org.apache.hudi.QuickstartUtils._
import org.apache.hudi.QuickstartUtils._
 
scala> import scala.collection.JavaConversions._
import scala.collection.JavaConversions._
 
scala> import org.apache.spark.sql.SaveMode._
import org.apache.spark.sql.SaveMode._
 
scala> import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceReadOptions._
 
scala> import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.DataSourceWriteOptions._
 
scala> import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.config.HoodieWriteConfig._
 
scala> val tableName = "hudi_trips_cow"
tableName: String = hudi_trips_cow
 
scala> val basePath = "/tmp/hudi_trips_cow"
basePath: String = /tmp/hudi_trips_cow
 
scala> val dataGen = new DataGenerator

3、插入数据

新增数据,生成一些数据,将其加载到DataFrame中,然后将DataFrame写入Hudi表

    scala> val inserts = convertToStringList(dataGen.generateInserts(10))
    scala> val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
    scala> df.write.format("hudi").
         |   options(getQuickstartWriteConfigs).
         |   option(PRECOMBINE_FIELD_OPT_KEY, "ts").
         |   option(RECORDKEY_FIELD_OPT_KEY, "uuid").
         |   option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
         |   option(TABLE_NAME, tableName).
         |   mode(Overwrite).
         |   save(basePath)

Mode(overwrite)将覆盖重新创建表(如果已存在)。可以检查/tmp/hudi_trps_cow 路径下是否有数据生成。文件存储目录如下图所示


图片.png

4、查询数据

 val tripsSnapshotDF = spark.read. format("hudi").load(basePath + "/*/*/*/*")
 tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
 spark.sql("select fare, begin_lon, begin_lat, ts from  hudi_trips_snapshot where fare > 20.0").show()

+------------------+-------------------+-------------------+---+                
|              fare|          begin_lon|          begin_lat| ts|
+------------------+-------------------+-------------------+---+
| 33.92216483948643| 0.9694586417848392| 0.1856488085068272|  0|
| 27.79478688582596| 0.6273212202489661|0.11488393157088261|  0|
| 64.27696295884016| 0.4923479652912024| 0.5731835407930634|  0|
| 93.56018115236618|0.14285051259466197|0.21624150367601136|  0|
|  43.4923811219014| 0.8779402295427752| 0.6100070562136587|  0|
|34.158284716382845|0.46157858450465483| 0.4726905879569653|  0|
| 66.62084366450246|0.03844104444445928| 0.0750588760043035|  0|
| 41.06290929046368| 0.8192868687714224|  0.651058505660742|  0|
+------------------+-------------------+-------------------+---+

scala> spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from  hudi_trips_snapshot").show()
+-------------------+--------------------+----------------------+---------+----------+------------------+
|_hoodie_commit_time|  _hoodie_record_key|_hoodie_partition_path|    rider|    driver|              fare|
+-------------------+--------------------+----------------------+---------+----------+------------------+
|     20201012103604|be02e0a9-b16e-473...|  americas/united_s...|rider-213|driver-213| 33.92216483948643|
|     20201012103604|ce0be450-2cd1-4ec...|  americas/united_s...|rider-213|driver-213| 27.79478688582596|
|     20201012103604|bdf16b8c-a46d-4ea...|  americas/united_s...|rider-213|driver-213| 64.27696295884016|
|     20201012103604|9015e0f8-bd34-49f...|  americas/united_s...|rider-213|driver-213|19.179139106643607|
|     20201012103604|bb80978f-4908-48b...|  americas/united_s...|rider-213|driver-213| 93.56018115236618|
|     20201012103604|f7120569-ca09-4c1...|  americas/brazil/s...|rider-213|driver-213|  43.4923811219014|
|     20201012103604|f70dd1da-8372-476...|  americas/brazil/s...|rider-213|driver-213|34.158284716382845|
|     20201012103604|107b9d18-bb56-470...|  americas/brazil/s...|rider-213|driver-213| 66.62084366450246|
|     20201012103604|f18dc5a2-6b0d-4b4...|    asia/india/chennai|rider-213|driver-213| 41.06290929046368|
|     20201012103604|980b6370-7800-46a...|    asia/india/chennai|rider-213|driver-213|17.851135255091155|
+-------------------+--------------------+----------------------+---------+----------+------------------+

5、更新数据 {#updates}

这类似于插入新数据。使用数据生成器生成对现有行程的更新,加载到DataFrame中并将DataFrame写入hudi数据集。

val updates = convertToStringList(dataGen.generateUpdates(10))
val df = spark.read.json(spark.sparkContext.parallelize(updates, 2));
df.write.format("org.apache.hudi").
    options(getQuickstartWriteConfigs).
    option(PRECOMBINE_FIELD_OPT_KEY, "ts").
    option(RECORDKEY_FIELD_OPT_KEY, "uuid").
    option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
    option(TABLE_NAME, tableName).
    mode(Append).
    save(basePath);

6、再次查询

spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from  hudi_trips_snapshot").show()
+-------------------+--------------------+----------------------+---------+----------+------------------+
|_hoodie_commit_time|  _hoodie_record_key|_hoodie_partition_path|    rider|    driver|              fare|
+-------------------+--------------------+----------------------+---------+----------+------------------+
|     20201012105507|be02e0a9-b16e-473...|  americas/united_s...|rider-284|driver-284|  90.9053809533154|
|     20201012105507|ce0be450-2cd1-4ec...|  americas/united_s...|rider-284|driver-284|  98.3428192817987|
|     20201012105507|bdf16b8c-a46d-4ea...|  americas/united_s...|rider-284|driver-284|49.527694252432056|
|     20201012103604|9015e0f8-bd34-49f...|  americas/united_s...|rider-213|driver-213|19.179139106643607|
|     20201012103604|bb80978f-4908-48b...|  americas/united_s...|rider-213|driver-213| 93.56018115236618|
|     20201012105507|f18dc5a2-6b0d-4b4...|    asia/india/chennai|rider-284|driver-284| 9.384124531808036|
|     20201012105507|980b6370-7800-46a...|    asia/india/chennai|rider-284|driver-284| 90.25710109008239|
|     20201012103604|f7120569-ca09-4c1...|  americas/brazil/s...|rider-213|driver-213|  43.4923811219014|
|     20201012105507|f70dd1da-8372-476...|  americas/brazil/s...|rider-284|driver-284| 29.47661370147079|
|     20201012105507|107b9d18-bb56-470...|  americas/brazil/s...|rider-284|driver-284| 63.72504913279929|
+-------------------+--------------------+----------------------+---------+----------+------------------+

7、增量查询

Hudi还提供了获取给定提交时间戳以来已更改的记录流的功能。 这可以通过使用Hudi的增量视图并提供所需更改的开始时间来实现。 如果我们需要给定提交之后的所有更改(这是常见的情况),则无需指定结束时间。

// reload data
spark.read.format("org.apache.hudi").load(basePath + "/*/*/*/*").createOrReplaceTempView("hudi_ro_table")
val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from  hudi_ro_table order by commitTime").map(k => k.getString(0)).take(50)
val beginTime = commits(commits.length - 2) // commit time we are interested in
// 增量查询数据
val incViewDF = spark.read.format("org.apache.hudi").
    option(VIEW_TYPE_OPT_KEY, VIEW_TYPE_INCREMENTAL_OPT_VAL).
    option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
    load(basePath);
incViewDF.registerTempTable("hudi_incr_table")
spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, rider, driver,  ts from  hudi_incr_table where fare > 20.0").show()
+-------------------+------------------+--------------------+-------------------+---------+----------+---+
|_hoodie_commit_time|              fare|           begin_lon|          begin_lat|    rider|    driver| ts|
+-------------------+------------------+--------------------+-------------------+---------+----------+---+
|     20201012105507|  90.9053809533154| 0.19949323322922063|0.18294079059016366|rider-284|driver-284|  0|
|     20201012105507|  98.3428192817987|  0.3349917833248327| 0.4777395067707303|rider-284|driver-284|  0|
|     20201012105507|49.527694252432056|  0.5142184937933181| 0.7340133901254792|rider-284|driver-284|  0|
|     20201012105507| 90.25710109008239|  0.4006983139989222|0.08528650347654165|rider-284|driver-284|  0|
|     20201012105507| 29.47661370147079|0.010872312870502165| 0.1593867607188556|rider-284|driver-284|  0|
|     20201012105507| 63.72504913279929|   0.888493603696927| 0.6570857443423376|rider-284|driver-284|  0|
+-------------------+------------------+--------------------+-------------------+---------+----------+---+

这将提供在开始时间提交之后发生的所有更改,其中包含票价大于20.0的过滤器。关于此功能的独特之处在于,它现在使您可以在批量数据上创作流式管道。

参考:https://blog.csdn.net/x950913/article/details/107178364
参考:https://www.bookstack.cn/read/apache-hudi-0.5-zh/docs-0.5.0-quickstart.md

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,547评论 6 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,399评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,428评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,599评论 1 274
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,612评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,577评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,941评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,603评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,852评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,605评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,693评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,375评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,955评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,936评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,172评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,970评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,414评论 2 342