Delta Presto Integration & Manifests 机制

  1. Delta 0.5 已于上周发布,增加了不少新特性,这篇文章主要讲解其 Presto Integration 和 Manifests 机制。

  2. 该功能与我们之前平台化 Delta Lake 实践(离线篇) 的很多工作都较为相似,比如与 metastore 的集成,直接通过 manifest 读取 delta 存活文件等。

  3. Delta Lake 在 0.5 之前只支持通过 Spark 读取数据,在新版本中增加了其他处理引擎通过 manifest 文件访问 Delta Lake 的能力。下文以Presto 为例说明如何通过 manifest 文件访问数据,manifest 文件的生成及其一些限制。

使用

Presto 使用 manifest 文件从 hive 外部表中读取数据,manifest文件是一个文本文件,包含该表/分区所有存活数据的路径列表。

当使用 manifest 文件在 Hive metastore 中定义外部表时,Presto 将会先读取 manifest 中的文件路径列表再去访问想要的文件,而不是直接通过目录列表来查找文件。

通过 spark 生成 manifest 文件

支持 sql / scala / java / python 四种 api,以 sql 和 scala 为例。
sql

GENERATE symlink_format_manifest FOR TABLE delta.`pathToDeltaTable`

Scala

val deltaTable = DeltaTable.forPath(pathToDeltaTable)
deltaTable.generate("symlink_format_manifest")

使用 GENERATE 命令会在/path/to/deltaTable/_symlink_format_manifest/ 目录下 生成一个 manifest 文件,其中包含了所有存活的文件路径。

cat /path/to/deltaTable/_symlink_format_manifest/manifest

hdfs://tdhdfs-cs-hz/user/hive/warehouse/bigdata.db/delta_lsw_test/part-00000-0a69ce8d-0d9e-47e2-95b2-001bd196441d-c000.snappy.parquet
hdfs://tdhdfs-cs-hz/user/hive/warehouse/bigdata.db/delta_lsw_test/part-00000-ba1767cb-ff0e-4e65-8e83-7a0cdce6a2f4-c000.snappy.parquet

如果是分区表,例如以 ds 作为分区字段,生成的结构如果下,每个分区下都有一个 manifest 文件包含了该分区的存活文件路径。

/path/to/table/_delta_log
/path/to/table/ds=20190101
/path/to/table/ds=20190102
/path/to/table/_symlink_format_manifest
---- /path/to/table/_symlink_format_manifest/ds=20190101/manifest
---- /path/to/table/_symlink_format_manifest/ds=20190102/manifest

存活文件定义:add file - remove file

定义 Hive Metastore 外部表读取相应文件

CREATE EXTERNAL TABLE mytable ( ... )   -- 与 Delta table 一致的 schema 信息
PARTITIONED BY ( ... )  -- 分区参数可选,需要与 Delta table 一致
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION '<pathToDeltaTable>/_symlink_format_manifest/'  -- 指定 manifest 地址

通过 SymlinkTextInputFormat ,Presto 可以直接从 manifest 中读取需要的文件而不需要直接定位到数据目录。

如果是分区表的话,在运行 generate 后,需要运行 MSCK REPAIR TABLE 使 Hive Metastore 能发现最新的分区。使用 repair 有两种场景:

  1. 每次清单文件生成后调用:每次 generate 都调用 repair,这种方式在分区多的情况下性能表现会非常糟糕,我们的做法是在数据写入时从 spark 获取到相应的变更分区然后依次执行 ADD PARTITION操作。
  2. 在需要新分区的时候调用:如果是按天粒度的分区表,可以选择在午夜12点创建新分区同时执行 generate 后运行一次 repair 。

important: 如果使用了 kerberos 认证,必须要在 etc/catalog/hive.properties 中配置 yarn-site.xml,否则在查询数据时会提示错误

com.facebook.presto.spi.PrestoException: Can't get Master Kerberos principal for use as renewer
    at com.facebook.presto.hive.BackgroundHiveSplitLoader$HiveSplitLoaderTask.process(BackgroundHiveSplitLoader.java:191)
    at com.facebook.presto.hive.util.ResumableTasks.safeProcessTask(ResumableTasks.java:47)
    at com.facebook.presto.hive.util.ResumableTasks.access$000(ResumableTasks.java:20)
    at com.facebook.presto.hive.util.ResumableTasks$1.run(ResumableTasks.java:35)
    at io.airlift.concurrent.BoundedExecutor.drainQueue(BoundedExecutor.java:78)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Can't get Master Kerberos principal for use as renewer
    at org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:116)
    at org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:100)
    at org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodes(TokenCache.java:80)
    at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:206)
    at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:315)
    at com.facebook.presto.hive.BackgroundHiveSplitLoader.loadPartition(BackgroundHiveSplitLoader.java:304)
    at com.facebook.presto.hive.BackgroundHiveSplitLoader.loadSplits(BackgroundHiveSplitLoader.java:258)
    at com.facebook.presto.hive.BackgroundHiveSplitLoader.access$300(BackgroundHiveSplitLoader.java:93)
    at com.facebook.presto.hive.BackgroundHiveSplitLoader$HiveSplitLoaderTask.process(BackgroundHiveSplitLoader.java:187)
    ... 7 more

Generate 过程

Generate 命令生成 manifest 的逻辑并不复杂,有兴趣的同学可以看下,方法入口:

DeltaGenerateCommand -> GenerateSymlinkManifest.generateFullManifest(spark: SparkSession,deltaLog: DeltaLog)

  1. 在分区表每个分区或者非分区表中原子性的更新 manifest 文件
def writeSingleManifestFile(
 manifestDirAbsPath: String,
 dataFileRelativePaths: Iterator[String]): Unit = {

 val manifestFilePath = new Path(manifestDirAbsPath, "manifest")
 val fs = manifestFilePath.getFileSystem(hadoopConf.value)
 fs.mkdirs(manifestFilePath.getParent())

 val manifestContent = dataFileRelativePaths.map { relativePath =>
   DeltaFileOperations.absolutePath(tableAbsPathForManifest, relativePath).toString
 }
 val logStore = LogStore(SparkEnv.get.conf, hadoopConf.value)
 logStore.write(manifestFilePath, manifestContent, overwrite = true)
}

// 这部分修复了Delta 0.5 删除非分区表失效的BUG,已将 PR 提交社区,未合入主分支
val newManifestPartitionRelativePaths =
 if (fileNamesForManifest.isEmpty && partitionCols.isEmpty) {
   writeSingleManifestFile(manifestRootDirPath, Iterator())
   Set.empty[String]
 } else {
   withRelativePartitionDir(spark, partitionCols, fileNamesForManifest)
     .select("relativePartitionDir", "path").as[(String, String)]
     .groupByKey(_._1).mapGroups {
     (relativePartitionDir: String, relativeDataFilePath: Iterator[(String, String)]) =>
       val manifestPartitionDirAbsPath = {
         if (relativePartitionDir == null || relativePartitionDir.isEmpty) manifestRootDirPath
         else new Path(manifestRootDirPath, relativePartitionDir).toString
       }
       writeSingleManifestFile(manifestPartitionDirAbsPath, relativeDataFilePath.map(_._2))
       relativePartitionDir
   }.collect().toSet
 }
  1. 删除分区表中的失效的分区 manifest 文件
val existingManifestPartitionRelativePaths = {
 val manifestRootDirAbsPath = fs.makeQualified(new Path(manifestRootDirPath))
 if (fs.exists(manifestRootDirAbsPath)) {
   val index = new InMemoryFileIndex(spark, Seq(manifestRootDirAbsPath), Map.empty, None)
   val prefixToStrip = manifestRootDirAbsPath.toUri.getPath
   index.inputFiles.map { p =>
     val relativeManifestFilePath =
       new Path(p).toUri.getPath.stripPrefix(prefixToStrip).stripPrefix(Path.SEPARATOR)
     new Path(relativeManifestFilePath).getParent.toString 
   }.filterNot(_.trim.isEmpty).toSet
 } else Set.empty[String]
}

val manifestFilePartitionsToDelete =
existingManifestPartitionRelativePaths.diff(newManifestPartitionRelativePaths)
deleteManifestFiles(manifestRootDirPath, manifestFilePartitionsToDelete, hadoopConf)

一些限制

数据一致性

在 Delta Lake 更新 manifest 时,它会原子的自动覆盖现有的 manifest 文件。因此,Presto 将始终看到一致的数据文件视图,然而,保证一致性的粒度取决于表是否分区。

  • 非分区表
    所有的文件路径都写在一个会原子更新的 manifest 文件中(参考上文结构),这种情况下 Presto 能看到一致性快照。
  • 分区表:
    manifest 文件将以 hive 分区的目录结构 (参考上文结构),这意味着每个分区都是原子更新,所以 Presto 能看到一个分区内的一致性视图而不是跨分区的一致性视图。此外,由于所有的分区并不是同时更新,所以读取时可能会在不同分区中读到不同 manifest 版本。

简单的说,如果 Presto 在 Spark 更新清单文件时发起读请求,由于 manifest 所有分区并不是一次原子更新操作,所以有可能得到的结果并不是最新的数据。

性能

大量的文件数量会造成 Presto 性能下降,官方的建议是在执行 generate 生成 manifest 前先对文件进行 compact 操作。分区表的单个分区或是非分区表的文件数量不超过1000。

Schema 推断

原生的 Delta Lake 支持 schema evolution,意味着无论 hive metastore 定义的 schema 如何,都会基于文件使用最新的 Schema。由于 Presto 直接使用了定义在 hive metastore 中的 schema ,所以如果要修改 schema 信息,必须要对表进行相应更新 。

后记

一些BUG

测试过程中还发现了一个 BUG,如果将非分区表的数据全部删除,则 generate 后 manifest 不会更新。
提交了一个RP,目前已合入主分支,将在 0.6 版本 release。
Generate does not update manifest if delete data from unpartitioned table

实践经验
首先,由于需要额外的调用 generate 命令生成/更新 manifest 文件,使用体验肯定不如直接通过 Spark 读取数据。
其次,在 generate 过程中进行数据读取有可能会遇到跨分区查询版本不一致的情况,但是瑕不掩瑜,通过 manifest,与大数据生态其他处理引擎的道路被打开了。

  1. 就像在 Delta Lake 实践(离线篇) 这篇文章中提到的,我们大数据平台有一个功能是表数据/分区数据预览,通过 spark 去查用户体验会相当差(耗时长),我们之前的做法是自定义了一个工具类在查询时从 _delta_log中生成 manifest,再通过 manifest 获取到的文件路径直接从文件系统中读取 Parquet 实现,有了 generate 功能,就可以直接读取 manifest 文件,外部系统扩展工作量极大的简化。(delta generate 这种在写入时生成 manifest 的方式更适合那种读多写少的场景)

  2. 在我们的生产环境中,presto 和 spark 使用的同一套 hive metastore ,但是 spark 直接读取上述创建的外部表会报错(就算能读也会有一致性风险),解决办法是在平台拦截了 sql 方法,通过判断 table property 识别 delta 表,然后直接转化为 delta api 对数据进行操作,Presto 则是直接访问外表,解决了冲突的问题。

  3. delta lake 提供了 convert to delta 方法能够将 hive(parquet) 表转为 delta 表,其本质是在原表目录下生成了一个 _delta_log 目录,但是如果要对整个 hive metastore 做兼容还需要一些额外的工作,将在下篇文章进行说明。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,362评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,330评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,247评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,560评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,580评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,569评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,929评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,587评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,840评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,596评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,678评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,366评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,945评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,929评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,165评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,271评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,403评论 2 342