一、Hudi简介
Hudi是Hadoop Updates and Incrementals的缩写,用于管理HDFS上的大型分析数据集存储,主要目的是高效的减少入库延时。
Hudi是一个开源Spark三方库,支持在Hadoop上执行upserts/insert/delete操作。
Hudi数据集通过自定义的InputFormat与当前的Hadoop生态系统(Hive、parquet、spark)集成,使该框架对最终用户来说是无缝的。
二、Hudi安装
hudi安装需要通过Maven对从Git下载下来的源码进行编译。
1、安装maven(略)
2、安装git
sudo yum install git
3、构建Hudi
cd /opt/software/
git clone https://github.com/apache/hudi.git && cd hudi
[root@dev110 hudi]# vim pom.xml
<repository>
<id>nexus-aliyun</id>
<name>nexus-aliyun</name>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
[root@dev110 hudi]# mvn clean package -DskipTests -DskipITs
三、使用Spark-shell对接Hudi
1、启动spark-shell
spark-shell启动,需要指定spark-avro模块,因为默认环境里没有,spark-avro模块版本还需要和spark版本对应,这里都是2.4.0。
[root@dev110 hudi]# spark-shell \
--packages org.apache.spark:spark-avro_2.11:2.4.0 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--jars /opt/software/hudi/packaging/hudi-spark-bundle/target/hudi-spark-bundle_2.11-0.6.1-SNAPSHOT.jar
2、导入包和类,设置表名
scala> import org.apache.hudi.QuickstartUtils._
import org.apache.hudi.QuickstartUtils._
scala> import scala.collection.JavaConversions._
import scala.collection.JavaConversions._
scala> import org.apache.spark.sql.SaveMode._
import org.apache.spark.sql.SaveMode._
scala> import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceReadOptions._
scala> import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.DataSourceWriteOptions._
scala> import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.config.HoodieWriteConfig._
scala> val tableName = "hudi_trips_cow"
tableName: String = hudi_trips_cow
scala> val basePath = "/tmp/hudi_trips_cow"
basePath: String = /tmp/hudi_trips_cow
scala> val dataGen = new DataGenerator
3、插入数据
新增数据,生成一些数据,将其加载到DataFrame中,然后将DataFrame写入Hudi表
scala> val inserts = convertToStringList(dataGen.generateInserts(10))
scala> val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
scala> df.write.format("hudi").
| options(getQuickstartWriteConfigs).
| option(PRECOMBINE_FIELD_OPT_KEY, "ts").
| option(RECORDKEY_FIELD_OPT_KEY, "uuid").
| option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
| option(TABLE_NAME, tableName).
| mode(Overwrite).
| save(basePath)
Mode(overwrite)将覆盖重新创建表(如果已存在)。可以检查/tmp/hudi_trps_cow 路径下是否有数据生成。文件存储目录如下图所示
4、查询数据
val tripsSnapshotDF = spark.read. format("hudi").load(basePath + "/*/*/*/*")
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
spark.sql("select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot where fare > 20.0").show()
+------------------+-------------------+-------------------+---+
| fare| begin_lon| begin_lat| ts|
+------------------+-------------------+-------------------+---+
| 33.92216483948643| 0.9694586417848392| 0.1856488085068272| 0|
| 27.79478688582596| 0.6273212202489661|0.11488393157088261| 0|
| 64.27696295884016| 0.4923479652912024| 0.5731835407930634| 0|
| 93.56018115236618|0.14285051259466197|0.21624150367601136| 0|
| 43.4923811219014| 0.8779402295427752| 0.6100070562136587| 0|
|34.158284716382845|0.46157858450465483| 0.4726905879569653| 0|
| 66.62084366450246|0.03844104444445928| 0.0750588760043035| 0|
| 41.06290929046368| 0.8192868687714224| 0.651058505660742| 0|
+------------------+-------------------+-------------------+---+
scala> spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from hudi_trips_snapshot").show()
+-------------------+--------------------+----------------------+---------+----------+------------------+
|_hoodie_commit_time| _hoodie_record_key|_hoodie_partition_path| rider| driver| fare|
+-------------------+--------------------+----------------------+---------+----------+------------------+
| 20201012103604|be02e0a9-b16e-473...| americas/united_s...|rider-213|driver-213| 33.92216483948643|
| 20201012103604|ce0be450-2cd1-4ec...| americas/united_s...|rider-213|driver-213| 27.79478688582596|
| 20201012103604|bdf16b8c-a46d-4ea...| americas/united_s...|rider-213|driver-213| 64.27696295884016|
| 20201012103604|9015e0f8-bd34-49f...| americas/united_s...|rider-213|driver-213|19.179139106643607|
| 20201012103604|bb80978f-4908-48b...| americas/united_s...|rider-213|driver-213| 93.56018115236618|
| 20201012103604|f7120569-ca09-4c1...| americas/brazil/s...|rider-213|driver-213| 43.4923811219014|
| 20201012103604|f70dd1da-8372-476...| americas/brazil/s...|rider-213|driver-213|34.158284716382845|
| 20201012103604|107b9d18-bb56-470...| americas/brazil/s...|rider-213|driver-213| 66.62084366450246|
| 20201012103604|f18dc5a2-6b0d-4b4...| asia/india/chennai|rider-213|driver-213| 41.06290929046368|
| 20201012103604|980b6370-7800-46a...| asia/india/chennai|rider-213|driver-213|17.851135255091155|
+-------------------+--------------------+----------------------+---------+----------+------------------+
5、更新数据 {#updates}
这类似于插入新数据。使用数据生成器生成对现有行程的更新,加载到DataFrame中并将DataFrame写入hudi数据集。
val updates = convertToStringList(dataGen.generateUpdates(10))
val df = spark.read.json(spark.sparkContext.parallelize(updates, 2));
df.write.format("org.apache.hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath);
6、再次查询
spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from hudi_trips_snapshot").show()
+-------------------+--------------------+----------------------+---------+----------+------------------+
|_hoodie_commit_time| _hoodie_record_key|_hoodie_partition_path| rider| driver| fare|
+-------------------+--------------------+----------------------+---------+----------+------------------+
| 20201012105507|be02e0a9-b16e-473...| americas/united_s...|rider-284|driver-284| 90.9053809533154|
| 20201012105507|ce0be450-2cd1-4ec...| americas/united_s...|rider-284|driver-284| 98.3428192817987|
| 20201012105507|bdf16b8c-a46d-4ea...| americas/united_s...|rider-284|driver-284|49.527694252432056|
| 20201012103604|9015e0f8-bd34-49f...| americas/united_s...|rider-213|driver-213|19.179139106643607|
| 20201012103604|bb80978f-4908-48b...| americas/united_s...|rider-213|driver-213| 93.56018115236618|
| 20201012105507|f18dc5a2-6b0d-4b4...| asia/india/chennai|rider-284|driver-284| 9.384124531808036|
| 20201012105507|980b6370-7800-46a...| asia/india/chennai|rider-284|driver-284| 90.25710109008239|
| 20201012103604|f7120569-ca09-4c1...| americas/brazil/s...|rider-213|driver-213| 43.4923811219014|
| 20201012105507|f70dd1da-8372-476...| americas/brazil/s...|rider-284|driver-284| 29.47661370147079|
| 20201012105507|107b9d18-bb56-470...| americas/brazil/s...|rider-284|driver-284| 63.72504913279929|
+-------------------+--------------------+----------------------+---------+----------+------------------+
7、增量查询
Hudi还提供了获取给定提交时间戳以来已更改的记录流的功能。 这可以通过使用Hudi的增量视图并提供所需更改的开始时间来实现。 如果我们需要给定提交之后的所有更改(这是常见的情况),则无需指定结束时间。
// reload data
spark.read.format("org.apache.hudi").load(basePath + "/*/*/*/*").createOrReplaceTempView("hudi_ro_table")
val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_ro_table order by commitTime").map(k => k.getString(0)).take(50)
val beginTime = commits(commits.length - 2) // commit time we are interested in
// 增量查询数据
val incViewDF = spark.read.format("org.apache.hudi").
option(VIEW_TYPE_OPT_KEY, VIEW_TYPE_INCREMENTAL_OPT_VAL).
option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
load(basePath);
incViewDF.registerTempTable("hudi_incr_table")
spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, rider, driver, ts from hudi_incr_table where fare > 20.0").show()
+-------------------+------------------+--------------------+-------------------+---------+----------+---+
|_hoodie_commit_time| fare| begin_lon| begin_lat| rider| driver| ts|
+-------------------+------------------+--------------------+-------------------+---------+----------+---+
| 20201012105507| 90.9053809533154| 0.19949323322922063|0.18294079059016366|rider-284|driver-284| 0|
| 20201012105507| 98.3428192817987| 0.3349917833248327| 0.4777395067707303|rider-284|driver-284| 0|
| 20201012105507|49.527694252432056| 0.5142184937933181| 0.7340133901254792|rider-284|driver-284| 0|
| 20201012105507| 90.25710109008239| 0.4006983139989222|0.08528650347654165|rider-284|driver-284| 0|
| 20201012105507| 29.47661370147079|0.010872312870502165| 0.1593867607188556|rider-284|driver-284| 0|
| 20201012105507| 63.72504913279929| 0.888493603696927| 0.6570857443423376|rider-284|driver-284| 0|
+-------------------+------------------+--------------------+-------------------+---------+----------+---+
这将提供在开始时间提交之后发生的所有更改,其中包含票价大于20.0的过滤器。关于此功能的独特之处在于,它现在使您可以在批量数据上创作流式管道。
参考:https://blog.csdn.net/x950913/article/details/107178364
参考:https://www.bookstack.cn/read/apache-hudi-0.5-zh/docs-0.5.0-quickstart.md