Hudi 源码之 Flink Table Service 排期和执行

前言

Hudi Flink支持配置table service的异步执行。Schedule的时机为checkpoint完成的时候。执行过程在线程池中完成。Flink Hudi 常用的table service有compaction,clustering和clean三种。它们对应的配置项为:

  • clustering.async.enabled:是否开启异步的clustering。默认不开启。
  • compaction.async.enabled:是否开启异步compaction。默认开启。
  • clean.async.enabled:是否开启异步clean。默认开启。

本篇主要分析Flink中Hudi table service的排期和执行时机。至于compaction clustering和clean表服务具体的执行逻辑,参见:

Scheduling排期

Hudi Flink table service的排期主要位于如下两个方法中:

  • StreamWriteOperatorCoordinator::handleInputEvent: batch模式需要schedule的table service。
  • StreamWriteOperatorCoordinator::notifyCheckpointComplete: streaming模式需要schedule的table service

接下来我们分别分析这两个方法。
handleEndInputWvent方法:

private void handleEndInputEvent(WriteMetadataEvent event) {  
  addEventToBuffer(event);  
  // 如果已经接收到所有数据
  if (allEventsReceived()) {  
    // start to commit the instant.  
    // 提交
    // 如果数据成功写入,返回true,表示提交成功
    boolean committed = commitInstant(this.instant);  
    if (committed) {  
      // The executor thread inherits the classloader of the #handleEventFromOperator  
      // caller, which is a AppClassLoader.      Thread.currentThread().setContextClassLoader(getClass().getClassLoader());  
      // sync Hive synchronously if it is enabled in batch mode.  
      // 如果开启了hive sync,执行
      syncHive();  
      // schedules the compaction or clustering if it is enabled in batch execution mode  
      // 表服务排期
      scheduleTableServices(true);  
    }  
  }  
}

notifyCheckpointComplete方法。在checkpoint执行成功的时候执行回调。

@Override  
public void notifyCheckpointComplete(long checkpointId) {  
  executor.execute(  
      () -> {  
        // The executor thread inherits the classloader of the #notifyCheckpointComplete  
        // caller, which is a AppClassLoader.  
        Thread.currentThread().setContextClassLoader(getClass().getClassLoader());  
        // for streaming mode, commits the ever received events anyway,  
        // the stream write task snapshot and flush the data buffer synchronously in sequence,        
        // so a successful checkpoint subsumes the old one(follows the checkpoint subsuming contract)        
        // 如果数据成功写入,返回true,表示提交成功
        final boolean committed = commitInstant(this.instant, checkpointId);  
        // schedules the compaction or clustering if it is enabled in stream execution mode  
        // 排期表服务
        scheduleTableServices(committed);  
  
        if (committed) {  
          // start new instant.  
          // 写入instant
          startInstant();  
          // sync Hive if is enabled  
          // 如果开启了hive sync,执行
          syncHiveAsync();  
        }  
      }, "commits the instant %s", this.instant  
  );  
}

scheduleTableServices方法:

private void scheduleTableServices(Boolean committed) {  
  // if compaction is on, schedule the compaction  
  // 如果是MOR表,并且开启了compaction.schedule.enabled配置(默认开启)
  if (tableState.scheduleCompaction) {  
    CompactionUtil.scheduleCompaction(writeClient, tableState.isDeltaTimeCompaction, committed);  
  }  
  // if clustering is on, schedule the clustering  
  // clustering.schedule.enabled如果开启(默认不开启)
  // 对于bucket index表,如果配置的是consistent hash(一致性hash),要求写入类型必须是upsert
  // 否则(SIMPLE类型)要求写入类型必须是insert
  if (tableState.scheduleClustering) {  
    ClusteringUtil.scheduleClustering(conf, writeClient, committed);  
  }  
}

Executing执行

HoodieTableSink

Hudi Flink创建table service异步任务流位于HoodieTableSink::getSinkRuntimeProvider

// ...
// Append mode  
// 如果是增量写入模式
if (OptionsResolver.isAppendMode(conf)) {  
  // close compaction for append mode  
  // 关闭compaction schedule模式
  conf.set(FlinkOptions.COMPACTION_SCHEDULE_ENABLED, false);  
  // append 模式写入数据
  DataStream<Object> pipeline = Pipelines.append(conf, rowType, dataStream);  
  // 如果需要异步clustering
  // write.operation为insert并且启用了异步clustering(clustering.async.enabled为true)
  if (OptionsResolver.needsAsyncClustering(conf)) {  
    // 执行clustering
    return Pipelines.cluster(conf, rowType, pipeline);  
    // 如果hoodie.cleaner.policy.failed.writes配置为lazy
  } else if (OptionsResolver.isLazyFailedWritesCleanPolicy(conf)) {  
    // add clean function to rollback failed writes for lazy failed writes cleaning policy  
    // 执行清理
    return Pipelines.clean(conf, pipeline);  
  } else {  
    // 否则什么也不做
    return Pipelines.dummySink(pipeline);  
  }  
}  
  
DataStream<Object> pipeline;  
// bootstrap加载索引
final DataStream<HoodieRecord> hoodieRecordDataStream =  
    Pipelines.bootstrap(conf, rowType, dataStream, context.isBounded(), overwrite);  
// write pipeline  
// 流式写入
pipeline = Pipelines.hoodieStreamWrite(conf, hoodieRecordDataStream);  
// compaction  
// 是否需要异步压缩
// compaction.async.enabled是否为true。默认为true
if (OptionsResolver.needsAsyncCompaction(conf)) {  
  // use synchronous compaction for bounded source.  
  // 如果是bounded数据源(有头有尾),使用同步压缩
  if (context.isBounded()) {  
    conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);  
  }  
  // 执行压缩
  return Pipelines.compact(conf, pipeline);  
} else {  
  // 如果没有配置压缩,执行清理
  return Pipelines.clean(conf, pipeline);  
}
// ...

通过上面的分析不难得知,Flink中compact,clean和clustering表服务都在Pipeline中创建。接下来我们分析Pipeline的源代码。

Pipelines

Pipeline创建了一条专用的数据流,这些数据流分别用来周期性创建compaction和clustering的执行计划,以及执行compact,clean和clustering。它们独立于系统的业务数据流。

Pipelines::compact

该方法用来启动周期压缩任务流。

public static DataStreamSink<CompactionCommitEvent> compact(Configuration conf, DataStream<Object> dataStream) {  
// 使用CompactionPlanOperator下发compaction执行计划
  DataStreamSink<CompactionCommitEvent> compactionCommitEventDataStream = dataStream.transform("compact_plan_generate",  
          TypeInformation.of(CompactionPlanEvent.class),  
          new CompactionPlanOperator(conf))  
      // plan生成过程必须是单并行度
      .setParallelism(1) // plan generate must be singleton  
      .setMaxParallelism(1)  
      // make the distribution strategy deterministic to avoid concurrent modifications  
      // on the same bucket files      .keyBy(plan -> plan.getOperation().getFileGroupId().getFileId())  
      // 使用CompactorOperator执行压缩计划
      .transform("compact_task",  
          TypeInformation.of(CompactionCommitEvent.class),  
          new CompactOperator(conf))  
      // 并行度配置为compaction.tasks
      .setParallelism(conf.getInteger(FlinkOptions.COMPACTION_TASKS))  
      // CompactionSommitSink检查并提交compaction instant
      .addSink(new CompactionCommitSink(conf))  
      .name("compact_commit")  
      // 执行commit的并行度必须是1
      .setParallelism(1); // compaction commit should be singleton  
  compactionCommitEventDataStream.getTransformation().setMaxParallelism(1);  
  return compactionCommitEventDataStream;  
}

CompactionPlanOperator::notifyCheckpointComplete。在checkpoint的时候检查是否生成的有requested状态的compaction instant。如果有,生成CompactionPlanEvent发往下游。
SteamWriteOperatorCoordinator用来生成requested状态的compaction instant,CompactionPlanOperator用来获取到这些compaction instant,读取保存的执行计划然后发往下游。

@Override  
public void notifyCheckpointComplete(long checkpointId) {  
  try {  
    table.getMetaClient().reloadActiveTimeline();  
    // There is no good way to infer when the compaction task for an instant crushed  
    // or is still undergoing. So we use a configured timeout threshold to control the rollback:    // {@code FlinkOptions.COMPACTION_TIMEOUT_SECONDS},    // when the earliest inflight instant has timed out, assumes it has failed    // already and just rolls it back.  
    // comment out: do we really need the timeout rollback ?    // CompactionUtil.rollbackEarliestCompaction(table, conf);    
    scheduleCompaction(table, checkpointId);  
  } catch (Throwable throwable) {  
    // make it fail-safe  
    LOG.error("Error while scheduling compaction plan for checkpoint: " + checkpointId, throwable);  
  }  
}

scheduleCompaction方法读取第一个状态为reqested状态的compaction instant,获取到它的compaction plan,将该compaction涉及到的compaction plan中的compactionOperation(即Compaction操作涉及到的file group信息)包装为CompactionPlanEvent发往下游。

private void scheduleCompaction(HoodieFlinkTable<?> table, long checkpointId) throws IOException {  
  // 获取包含所有pending compaction的timeline
  HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline();  
  
  // the first instant takes the highest priority.  
  // 找到时间最早的requested compaction instant
  Option<HoodieInstant> firstRequested = pendingCompactionTimeline  
      .filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED).firstInstant();  
  // record metrics  
  compactionMetrics.setFirstPendingCompactionInstant(firstRequested);  
  compactionMetrics.setPendingCompactionCount(pendingCompactionTimeline.countInstants());  
  // 如果没有requested状态的compaction instant,说明没有必要schedule
  if (!firstRequested.isPresent()) {  
    // do nothing.  
    LOG.info("No compaction plan for checkpoint " + checkpointId);  
    return;  
  }  
  // 获取这个requested compaction instant对应的时间
  String compactionInstantTime = firstRequested.get().getTimestamp();  
  
  // generate compaction plan  
  // should support configurable commit metadata 
  获取compaction plan
  HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(  
      table.getMetaClient(), compactionInstantTime);  

  // 如果没有获取到有效的compactio plan
  if (compactionPlan == null || (compactionPlan.getOperations() == null)  
      || (compactionPlan.getOperations().isEmpty())) {  
    // do nothing.  
    LOG.info("Empty compaction plan for instant " + compactionInstantTime);  
  } else {  
    // 获取这个requested状态的 compaction instant
    HoodieInstant instant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime);  
    // Mark instant as compaction inflight  
    // 将它的状态修改为inflight
    table.getActiveTimeline().transitionCompactionRequestedToInflight(instant);  
    table.getMetaClient().reloadActiveTimeline();  
    // 获取plan中所有的compaction operation,封装为compactionPlanEvent发往下游
    List<CompactionOperation> operations = compactionPlan.getOperations().stream()  
        .map(CompactionOperation::convertFromAvroRecordInstance).collect(toList());  
    LOG.info("Execute compaction plan for instant {} as {} file groups", compactionInstantTime, operations.size());  
    // 删除标记文件
    WriteMarkersFactory  
        .get(table.getConfig().getMarkersType(), table, compactionInstantTime)  
        .deleteMarkerDir(table.getContext(), table.getConfig().getMarkersDeleteParallelism());  
    // 每一个operation封装为一个CompactionPlanEvent
    // 这样到下游的时候可以将这些压缩任务均分
    // 每个并行度处理一部分file group的压缩
    for (CompactionOperation operation : operations) {  
      output.collect(new StreamRecord<>(new CompactionPlanEvent(compactionInstantTime, operation)));  
    }  
  }  
}

CompactOperatorprocessElement方法接收上面生成的CompactionPlanEvent,执行压缩任务。

@Override  
public void processElement(StreamRecord<CompactionPlanEvent> record) throws Exception {  
  final CompactionPlanEvent event = record.getValue();  
  // 获取inflight compaction instant time
  final String instantTime = event.getCompactionInstantTime(); 
  // 获取compaction operation 
  final CompactionOperation compactionOperation = event.getOperation(); 
  // 如果是异步压缩
  // 在线程池中执行压缩,不会影响checkpoint过程 
  if (asyncCompaction) {  
    // executes the compaction task asynchronously to not block the checkpoint barrier propagate.  
    executor.execute(  
        () -> doCompaction(instantTime, compactionOperation, collector, reloadWriteConfig()),  
        (errMsg, t) -> collector.collect(new CompactionCommitEvent(instantTime, compactionOperation.getFileId(), taskID)),  
        "Execute compaction for instant %s from task %d", instantTime, taskID);  
  } else {  
    // executes the compaction task synchronously for batch mode.  
    // 否则同步执行compaction
    LOG.info("Execute compaction for instant {} from task {}", instantTime, taskID);  
    doCompaction(instantTime, compactionOperation, collector, writeClient.getConfig());  
  }  
}

接下来的doCompaction方法间接调用了HoodieFlinkMergeOnReadTableCompactorcompact方法,前面的文章已有分析,这里不再赘述。

Pipelines::cluster

该方法启动周期clustering任务流。

public static DataStreamSink<ClusteringCommitEvent> cluster(Configuration conf, RowType rowType, DataStream<Object> dataStream) {  
  // 使用ClusteringPlanOperator下发clustering执行计划
  DataStream<ClusteringCommitEvent> clusteringStream = dataStream.transform("cluster_plan_generate",  
          TypeInformation.of(ClusteringPlanEvent.class),  
          new ClusteringPlanOperator(conf))  
      // 下发执行计划的并行度必须是1
      .setParallelism(1) // plan generate must be singleton  
      .setMaxParallelism(1) // plan generate must be singleton  
      .keyBy(plan ->  
          // make the distribution strategy deterministic to avoid concurrent modifications  
          // on the same bucket files    
          // 按照ClusteringPlanEvent的fileId分组
          // 针对同一个file slice的clustering操作会分配给相同的线程执行,防止并发修改    
          plan.getClusteringGroupInfo().getOperations()                .stream().map(ClusteringOperation::getFileId.collect(Collectors.joining()))  
      .transform("clustering_task",  
          TypeInformation.of(ClusteringCommitEvent.class),  
          // 通过ClusteringOperator执行clustering
          new ClusteringOperator(conf, rowType))  
      // clustering任务的并行度为clustering.tasks
      .setParallelism(conf.getInteger(FlinkOptions.CLUSTERING_TASKS));  
  // 如果启用了排序,即clustering.plan.strategy.sort.columns配置项不为空
  // 配置该步骤的执行内存,对应配置项为write.sort.memory
  if (OptionsResolver.sortClusteringEnabled(conf)) {  
    ExecNodeUtil.setManagedMemoryWeight(clusteringStream.getTransformation(),  
        conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L);  
  }  
  // 检查并提交clustering instant
  DataStreamSink<ClusteringCommitEvent> clusteringCommitEventDataStream = clusteringStream.addSink(new ClusteringCommitSink(conf))  
      .name("clustering_commit")  
      .setParallelism(1); // clustering commit should be singleton  
  clusteringCommitEventDataStream.getTransformation().setMaxParallelism(1);  
  return clusteringCommitEventDataStream;  
}

ClusteringPlanOperatorschedule clustering的过程和前面schedule compaction的非常相似。

private void scheduleClustering(HoodieFlinkTable<?> table, long checkpointId) {  
  // 获取request状态的clustering instant
  List<HoodieInstant> pendingClusteringInstantTimes =  
      ClusteringUtils.getPendingClusteringInstantTimes(table.getMetaClient());  
  // the first instant takes the highest priority.  
  // 获取时间最早的一个
  Option<HoodieInstant> firstRequested = Option.fromJavaOptional(  
      pendingClusteringInstantTimes.stream()  
          .filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED).findFirst());  
  
  // record metrics  
  clusteringMetrics.setFirstPendingClusteringInstant(firstRequested);  
  clusteringMetrics.setPendingClusteringCount(pendingClusteringInstantTimes.size());  
  
  if (!firstRequested.isPresent()) {  
    // do nothing.  
    LOG.info("No clustering plan for checkpoint " + checkpointId);  
    return;  
  }  
  
  String clusteringInstantTime = firstRequested.get().getTimestamp();  
  
  // generate clustering plan  
  // should support configurable commit metadata  HoodieInstant clusteringInstant = HoodieTimeline.getReplaceCommitRequestedInstant(clusteringInstantTime);  
  // 拿到之前生成的clustering plan
  Option<Pair<HoodieInstant, HoodieClusteringPlan>> clusteringPlanOption = ClusteringUtils.getClusteringPlan(  
      table.getMetaClient(), clusteringInstant);  

  // 如果没有获取到有效的clustering plan,直接返回
  if (!clusteringPlanOption.isPresent()) {  
    // do nothing.  
    LOG.info("No clustering plan scheduled");  
    return;  
  }  
  
  HoodieClusteringPlan clusteringPlan = clusteringPlanOption.get().getRight();  
  
  if (clusteringPlan == null || (clusteringPlan.getInputGroups() == null)  
      || (clusteringPlan.getInputGroups().isEmpty())) {  
    // do nothing.  
    LOG.info("Empty clustering plan for instant " + clusteringInstantTime);  
  } else {  
    // Mark instant as clustering inflight  
    table.getActiveTimeline().transitionReplaceRequestedToInflight(clusteringInstant, Option.empty());  
    table.getMetaClient().reloadActiveTimeline();  
    // 遍历所有的inputGroup,封装为ClusteringPlanEvent
    for (HoodieClusteringGroup clusteringGroup : clusteringPlan.getInputGroups()) {  
      LOG.info("Execute clustering plan for instant {} as {} file slices", clusteringInstantTime, clusteringGroup.getSlices().size());  
      output.collect(new StreamRecord<>(  
          new ClusteringPlanEvent(clusteringInstantTime, ClusteringGroupInfo.create(clusteringGroup), clusteringPlan.getStrategy().getStrategyParams())  
      ));  
    }  
  }  
}

Clustering对应一个file slice的操作封装为了HoodieClusteringGroup(和compatcion的CompactionOperation对应)。这里将同一个plan中所有的HoodieClusteringGroup,每一个封装为ClusteringPlanEvent,目的是为了下游可以并行执行clustering。

ClusteringOperatorprocessElement方法执行clustering计划。和compaction相同,分为同步执行和异步执行两种方式。

@Override  
public void processElement(StreamRecord<ClusteringPlanEvent> element) throws Exception {  
  final ClusteringPlanEvent event = element.getValue();  
  final String instantTime = event.getClusteringInstantTime();  
  final List<ClusteringOperation> clusteringOperations = event.getClusteringGroupInfo().getOperations();  
  if (this.asyncClustering) {  
    // executes the compaction task asynchronously to not block the checkpoint barrier propagate.  
    executor.execute(  
        () -> doClustering(instantTime, clusteringOperations),  
        (errMsg, t) -> collector.collect(new ClusteringCommitEvent(instantTime, getFileIds(clusteringOperations), taskID)),  
        "Execute clustering for instant %s from task %d", instantTime, taskID);  
  } else {  
    // executes the clustering task synchronously for batch mode.  
    LOG.info("Execute clustering for instant {} from task {}", instantTime, taskID);  
    doClustering(instantTime, clusteringOperations);  
  }  
}

doClustering方法为clustering过程的纯Flink实现。

private void doClustering(String instantTime, List<ClusteringOperation> clusteringOperations) throws Exception {  
  clusteringMetrics.startClustering();  
  // 采用bulk insert的方式写入clustering之后的数据
  BulkInsertWriterHelper writerHelper = new BulkInsertWriterHelper(this.conf, this.table, this.writeConfig,  
      instantTime, this.taskID, getRuntimeContext().getNumberOfParallelSubtasks(), getRuntimeContext().getAttemptNumber(),  
      this.rowType, true);  
  
  Iterator<RowData> iterator;  
  // 如果clustering操作涉及到log文件,使用readRecordsForGroupWithLogs
  // 否则仅使用readRecordsForGroupBaseFiles
  // 这两个方法读取file group中的数据,以iterator的形式返回
  if (clusteringOperations.stream().anyMatch(operation -> CollectionUtils.nonEmpty(operation.getDeltaFilePaths()))) {  
    // if there are log files, we read all records into memory for a file group and apply updates.  
    iterator = readRecordsForGroupWithLogs(clusteringOperations, instantTime);  
  } else {  
    // We want to optimize reading records for case there are no log files.  
    iterator = readRecordsForGroupBaseFiles(clusteringOperations);  
  }  
  // 如果配置了clustering.plan.strategy.sort.columns
  // 说明需要排序
  if (this.sortClusteringEnabled) {  
    RowDataSerializer rowDataSerializer = new RowDataSerializer(rowType);
    // 使用BinaryexternalSorter来排序  
    // BinaryExternalSorter根据clustering.plan.strategy.sort.columns
    // 生成排序代码
    BinaryExternalSorter sorter = initSorter();  
    while (iterator.hasNext()) {  
      RowData rowData = iterator.next();  
      BinaryRowData binaryRowData = rowDataSerializer.toBinaryRow(rowData).copy();  
      sorter.write(binaryRowData);  
    }  
    // 使用bulk insert写入排序之后的数据
    BinaryRowData row = binarySerializer.createInstance();  
    while ((row = sorter.getIterator().next(row)) != null) {  
      writerHelper.write(row);  
    }  
    sorter.close();  
  } else {  
    while (iterator.hasNext()) {  
      writerHelper.write(iterator.next());  
    }  
  }  
  
  List<WriteStatus> writeStatuses = writerHelper.getWriteStatuses(this.taskID);  
  clusteringMetrics.endClustering();  
  collector.collect(new ClusteringCommitEvent(instantTime, getFileIds(clusteringOperations), writeStatuses, this.taskID));  
  writerHelper.close();  
}

Pipelines::clean

clean方法的实现较为简单。Flink为数据流增加了一个CleanFunction类型的sink,并行度为1。代码如下所示。

public static DataStreamSink<Object> clean(Configuration conf, DataStream<Object> dataStream) {  
  DataStreamSink<Object> cleanCommitDataStream = dataStream.addSink(new CleanFunction<>(conf))  
      .setParallelism(1)  
      .name("clean_commits");  
  cleanCommitDataStream.getTransformation().setMaxParallelism(1);  
  return cleanCommitDataStream;

我们继续分析CleanFunction。如果启用了异步clean,CleanFunction在启动的时候(open)异步执行一次。创建checkpoint的时候snapshotState启动异步clean服务。在checkpoint完成的时候notifyCheckpointComplete等待clean错操作执行完毕。相关代码如下所示。

@Override  
public void open(Configuration parameters) throws Exception {  
  super.open(parameters);  
  this.writeClient = FlinkWriteClients.createWriteClient(conf, getRuntimeContext());  
  this.executor = NonThrownExecutor.builder(LOG).waitForTasksFinish(true).build();  
  String instantTime = writeClient.createNewInstantTime();  
  LOG.info(String.format("exec clean with instant time %s...", instantTime));  
  if (conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED)) {  
    executor.execute(() -> {  
      this.isCleaning = true;  
      try {  
        this.writeClient.clean(instantTime);  
      } finally {  
        this.isCleaning = false;  
      }  
    }, "wait for cleaning finish");  
  }  
}  
  
@Override  
public void notifyCheckpointComplete(long l) throws Exception {  
  if (conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED) && isCleaning) {  
    executor.execute(() -> {  
      try {  
        this.writeClient.waitForCleaningFinish();  
      } finally {  
        // ensure to switch the isCleaning flag  
        this.isCleaning = false;  
      }  
    }, "wait for cleaning finish");  
  }  
}  
  
@Override  
public void snapshotState(FunctionSnapshotContext context) throws Exception {  
  if (conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED) && !isCleaning) {  
    try {  
      this.writeClient.startAsyncCleaning();  
      this.isCleaning = true;  
    } catch (Throwable throwable) {  
      // catch the exception to not affect the normal checkpointing  
      LOG.warn("Error while start async cleaning", throwable);  
    }  
  }  
}

除了Pipeline::clean方法直接使用CleanFunction之外,我们还注意到ClusteringCommitSinkCompactionCommitSink都继承了CleanFunction,都没有重写snapshotStatenotifyCheckPointComplete方法。因此这两个sink的行为和CleanFunction一致,在checkpoint的时候会触发clean操作。
继续观察这两个sink的doCommit方法,我们发现最后一段都有如下代码:

// Whether to clean up the old log file when compaction  
if (!conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED) && !isCleaning) {  
  this.writeClient.clean();  
}

该段代码表示,如果没有启用异步clean,且当前时刻没有clean没有正在执行,执行同步clean操作。
接下来我们分析this.writeClient.startAsyncCleaning()调用,一路跟踪下去。跟踪过程中的非关键代码这里不再展示。

  • CleanFunction的this.writeClient.startAsyncCleaning()
  • HoodieFlinkWriteClient的tableServiceClient.startAsyncCleanerService(this);
  • BaseHoodieTableServiceClient的this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(writeClient);
    分析到这里,我们确定了clean表服务是在AsyncCleanerService中封装的。继续分析startAsyncCleaningIfEnabled方法,该方法首先判断是否开启了异步clean配置,如果开启了,创建一个异步clean服务,代码如下所示:
public static AsyncCleanerService startAsyncCleaningIfEnabled(BaseHoodieWriteClient writeClient) {  
  HoodieWriteConfig config = writeClient.getConfig();  
  // hoodie.clean.automatic和hoodie.clean.async这两个任意一个配置为false,不运行该服务
  if (!config.isAutoClean() || !config.isAsyncClean()) {  
    LOG.info("The HoodieWriteClient is not configured to auto & async clean. Async clean service will not start.");  
    return null;  
  }  
  // 创建并启动AsyncCleanerService
  AsyncCleanerService asyncCleanerService = new AsyncCleanerService(writeClient);  
  asyncCleanerService.start(null);  
  return asyncCleanerService;  
}

我们继续分析asyncCleanerService.start,它位于HoodieAsyncServicestart方法中。
其中的startService方法将异步服务逻辑本身和运行异步服务的executor封装为Pair返回。

public void start(Function<Boolean, Boolean> onShutdownCallback) {  
  if (started) {  
    LOG.warn("The async service already started.");  
    return;  
  }  
  Pair<CompletableFuture, ExecutorService> res = startService();  
  future = res.getKey();  
  executor = res.getValue();  
  started = true;  
  shutdownCallback(onShutdownCallback);  
}

startService方法在实现类AsyncCleanerFunction中,如下所示。

@Override  
protected Pair<CompletableFuture, ExecutorService> startService() {  
  String instantTime = writeClient.createNewInstantTime();  
  LOG.info(String.format("Starting async clean service with instant time %s...", instantTime));  
  //在线程池中启动异步clean操作
  return Pair.of(CompletableFuture.supplyAsync(() -> {  
    writeClient.clean(instantTime);  
    return true;  
  }, executor), executor);  
}

调用完该方法之后,异步clean的逻辑封装赋值到future变量中。
按照上面的分析notifyCheckpointComplete的时候执行waitForCompletion方法。如果clean操作还没有结束,这里阻塞等待其执行完毕。

public static void waitForCompletion(AsyncCleanerService asyncCleanerService) {  
  if (asyncCleanerService != null) {  
    LOG.info("Waiting for async clean service to finish");  
    try {  
      asyncCleanerService.waitForShutdown();  
    } catch (Exception e) {  
      throw new HoodieException("Error waiting for async clean service to finish", e);  
    }  
  }  
}

分析HoodieAsnycServicewaitForshutdown方法,同步等待clean执行完毕,内容如下:

public void waitForShutdown() throws ExecutionException, InterruptedException {  
  if (future == null) {  
    return;  
  }  
  try {  
    future.get();  
  } catch (ExecutionException ex) {  
    LOG.error("Service shutdown with error", ex);  
    throw ex;  
  }  
}

参考文献

Compaction | Apache Hudi
Cleaning | Apache Hudi
Clustering | Apache Hudi

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,839评论 6 482
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,543评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 153,116评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,371评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,384评论 5 374
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,111评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,416评论 3 400
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,053评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,558评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,007评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,117评论 1 334
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,756评论 4 324
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,324评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,315评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,539评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,578评论 2 355
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,877评论 2 345

推荐阅读更多精彩内容