TaskManager 启动流程

转载于:http://matt33.com/2020/03/15/flink-taskmanager-7/

TaskManager 启动流程

与 JobManager 类似,TaskManager 的启动类是 TaskManagerRunner,大概的流程如下图所示:

TaskManager 启动流程

TaskManager 启动的入口方法是 runTaskManager(),它会首先初始化 TaskManager 一些相关的服务,比如:初始化 RpcService、初始化 HighAvailabilityServices 等等,这些都是为 TaskManager 服务的启动做相应的准备工作。其实 TaskManager 初始化主要分为下面两大块:

  1. TaskManager 相关 service 的初始化:比如:内存管理器、IO 管理器、TaskSlotTable(TaskSlot 的管理是在这里进行的)等,这里也包括 TaskExecutor 的初始化,注意这里对于一些需要启动的服务在这一步并没有启动;
  2. TaskExecutor 的启动:它会启动 TM 上相关的服务,Task 的提交和运行也是在 TaskExecutor 中处理的,上一步 TM 初始化的那些服务也是在 TaskExecutor 中使用的。

TM 的服务真正 Run 起来之后,核心流程还是在 TaskExecutor 中。

TaskManager 相关服务的初始化

这里,先从 TaskManager 的入口 runTaskManager() 来看 TaskManager 相关服务的初始化流程,总结来看流程如下:

//  1. 入口方法
runTaskManager()

// 2. 创建 TaskManagerRunner 对象
TaskManagerRunner taskManagerRunner = new TaskManagerRunner(configuration, resourceId);

// 3. 启动 TaskManager 服务
startTaskManager()

// 4. 初始化相关的服务
TaskManagerServices.fromConfiguration()

首先看下具体的代码实现:

// TaskManagerRunner.java
//note: 启动 TaskManagerRunner
public static void runTaskManager(Configuration configuration, ResourceID resourceId) throws Exception {
    final TaskManagerRunner taskManagerRunner = new TaskManagerRunner(configuration, resourceId);

    taskManagerRunner.start();
}


//note: 初始化 TaskManagerRunner
public TaskManagerRunner(Configuration configuration, ResourceID resourceId) throws Exception {
    this.configuration = checkNotNull(configuration);
    this.resourceId = checkNotNull(resourceId);

    //note: akka 超时设置
    timeout = AkkaUtils.getTimeoutAsTime(configuration);

    this.executor = java.util.concurrent.Executors.newScheduledThreadPool(
        Hardware.getNumberCPUCores(),
        new ExecutorThreadFactory("taskmanager-future"));

    //note: HA 的配置及服务初始化
    highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
        configuration,
        executor,
        HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);

    //note: create rpc service
    rpcService = createRpcService(configuration, highAvailabilityServices);

    //note: 初始化心跳服务
    HeartbeatServices heartbeatServices = HeartbeatServices.fromConfiguration(configuration);

    //note: metrics 服务
    metricRegistry = new MetricRegistryImpl(
        MetricRegistryConfiguration.fromConfiguration(configuration),
        ReporterSetup.fromConfiguration(configuration));

    //note: 启动相应的 metrics 服务
    final RpcService metricQueryServiceRpcService = MetricUtils.startMetricsRpcService(configuration, rpcService.getAddress());
    metricRegistry.startQueryService(metricQueryServiceRpcService, resourceId);
//note: 初始化 blob 服务
    blobCacheService = new BlobCacheService(
        configuration, highAvailabilityServices.createBlobStore(), null
    );

    //note: 启动 TaskManager 服务及创建 TaskExecutor 对象
    taskManager = startTaskManager(
        this.configuration,
        this.resourceId,
        rpcService,
        highAvailabilityServices,
        heartbeatServices,
        metricRegistry,
        blobCacheService,
        false,
        this);

    this.terminationFuture = new CompletableFuture<>();
    this.shutdown = false;

    //note: 周期性地输出内存相关的日志信息,直到 terminationFuture complete
    MemoryLogger.startIfConfigured(LOG, configuration, terminationFuture);
}

在上面的流程中,初始化了一些最基本的服务,比如:rpc 服务,在方法的最后调用了 startTaskManager() 启动 TaskManager,其代码实现如下:

// TaskManagerRunner.java
//note: 创建并初始化 TaskExecutor 对象
public static TaskExecutor startTaskManager(
        Configuration configuration,
        ResourceID resourceID,
        RpcService rpcService,
        HighAvailabilityServices highAvailabilityServices,
        HeartbeatServices heartbeatServices,
        MetricRegistry metricRegistry,
        BlobCacheService blobCacheService,
        boolean localCommunicationOnly,
        FatalErrorHandler fatalErrorHandler) throws Exception {

    checkNotNull(configuration);
    checkNotNull(resourceID);
    checkNotNull(rpcService);
    checkNotNull(highAvailabilityServices);

    LOG.info("Starting TaskManager with ResourceID: {}", resourceID);

    InetAddress remoteAddress = InetAddress.getByName(rpcService.getAddress());

    //note: TM 服务相关的配置都维护在这个对象中,这里会把使用的相关参数解析并维护起来
    TaskManagerServicesConfiguration taskManagerServicesConfiguration =
        TaskManagerServicesConfiguration.fromConfiguration(
            configuration,
            resourceID,
            remoteAddress,
            EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag(),
            EnvironmentInformation.getMaxJvmHeapMemory(),
            localCommunicationOnly);

    //note: 初始化 TM 的 TaskManagerMetricGroup,并相应地初始化 TM 的基本状态(内存、CPU 等)监控
    Tuple2<TaskManagerMetricGroup, MetricGroup> taskManagerMetricGroup = MetricUtils.instantiateTaskManagerMetricGroup(
        metricRegistry,
        TaskManagerLocation.getHostName(remoteAddress),
        resourceID,
        taskManagerServicesConfiguration.getSystemResourceMetricsProbingInterval());

    //note: 初始化 TaskManagerServices(TM 相关服务的初始化都在这里)
    TaskManagerServices taskManagerServices = TaskManagerServices.fromConfiguration(
        taskManagerServicesConfiguration,
        taskManagerMetricGroup.f1,
        rpcService.getExecutor()); // TODO replace this later with some dedicated executor for io.

    //note: TaskManager 相关的配置,主要用于 TaskExecutor 的初始化
    TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(configuration);

    String metricQueryServiceAddress = metricRegistry.getMetricQueryServiceGatewayRpcAddress();

    //note: 最后创建 TaskExecutor 对象
    return new TaskExecutor(
        rpcService,
        taskManagerConfiguration,
        highAvailabilityServices,
        taskManagerServices,
        heartbeatServices,
        taskManagerMetricGroup.f0,
        metricQueryServiceAddress,
        blobCacheService,
        fatalErrorHandler,
        new PartitionTable<>());
}

这里,来着重看一下 TaskManagerServices.fromConfiguration() 这个方法,在这个方法初始了很多 TM 的服务,从下面的具体实现中也可以看出:

// TaskManagerServices.java
/**
 * Creates and returns the task manager services.
 * note:根据创建 TM 服务
 *
 * @param taskManagerServicesConfiguration task manager configuration
 * @param taskManagerMetricGroup metric group of the task manager
 * @param taskIOExecutor executor for async IO operations
 * @return task manager components
 * @throws Exception
 */
public static TaskManagerServices fromConfiguration(
        TaskManagerServicesConfiguration taskManagerServicesConfiguration,
        MetricGroup taskManagerMetricGroup,
        Executor taskIOExecutor) throws Exception {

    // pre-start checks
    checkTempDirs(taskManagerServicesConfiguration.getTmpDirPaths());

    //note: 创建 taskEventDispatcher
    final TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher();

    // start the I/O manager, it will create some temp directories.
    //note: 创建 IO 管理器
    final IOManager ioManager = new IOManagerAsync(taskManagerServicesConfiguration.getTmpDirPaths());

    //note: 创建 ShuffleEnvironment 对象(默认是 NettyShuffleEnvironment)
    final ShuffleEnvironment<?, ?> shuffleEnvironment = createShuffleEnvironment(
        taskManagerServicesConfiguration,
        taskEventDispatcher,
        taskManagerMetricGroup);
    final int dataPort = shuffleEnvironment.start();

    //note: 创建 KvStateService 实例并启动
    final KvStateService kvStateService = KvStateService.fromConfiguration(taskManagerServicesConfiguration);
    kvStateService.start();

    //note: 初始化 taskManagerLocation,记录 connection 信息
    final TaskManagerLocation taskManagerLocation = new TaskManagerLocation(
        taskManagerServicesConfiguration.getResourceID(),
        taskManagerServicesConfiguration.getTaskManagerAddress(),
        dataPort);

    // this call has to happen strictly after the network stack has been initialized
    //note: 初始化 MemoryManager
    final MemoryManager memoryManager = createMemoryManager(taskManagerServicesConfiguration);
    final long managedMemorySize = memoryManager.getMemorySize();

    //note: 初始化 BroadcastVariableManager 对象
    final BroadcastVariableManager broadcastVariableManager = new BroadcastVariableManager();

    //note: 当前 TM 拥有的 slot 及每个 slot 的资源信息
    final int numOfSlots = taskManagerServicesConfiguration.getNumberOfSlots();
    final List<ResourceProfile> resourceProfiles =
        Collections.nCopies(numOfSlots, computeSlotResourceProfile(numOfSlots, managedMemorySize));

    //note: 注册一个超时(AKKA 超时设置)服务(在 TaskSlotTable 用于监控 slot 分配是否超时)
    final TimerService<AllocationID> timerService = new TimerService<>(
        new ScheduledThreadPoolExecutor(1),
        taskManagerServicesConfiguration.getTimerServiceShutdownTimeout());

    //note: 这里会维护 slot 相关列表
    final TaskSlotTable taskSlotTable = new TaskSlotTable(resourceProfiles, timerService);

    //note: 维护 jobId 与 JobManager connection 之间的关系
    final JobManagerTable jobManagerTable = new JobManagerTable();

    //note: 监控注册的 job 的 JobManger leader 信息
    final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation, taskManagerServicesConfiguration.getRetryingRegistrationConfiguration());

    final String[] stateRootDirectoryStrings = taskManagerServicesConfiguration.getLocalRecoveryStateRootDirectories();

    final File[] stateRootDirectoryFiles = new File[stateRootDirectoryStrings.length];

    for (int i = 0; i < stateRootDirectoryStrings.length; ++i) {
        stateRootDirectoryFiles[i] = new File(stateRootDirectoryStrings[i], LOCAL_STATE_SUB_DIRECTORY_ROOT);
    }

    //note: 创建 TaskExecutorLocalStateStoresManager 对象:维护状态信息
    final TaskExecutorLocalStateStoresManager taskStateManager = new TaskExecutorLocalStateStoresManager(
        taskManagerServicesConfiguration.isLocalRecoveryEnabled(),
        stateRootDirectoryFiles,
        taskIOExecutor);

    //note: 将上面初始化的这些服务,封装到一个 TaskManagerServices 对象中
    return new TaskManagerServices(
        taskManagerLocation,
        memoryManager,
        ioManager,
        shuffleEnvironment,
        kvStateService,
        broadcastVariableManager,
        taskSlotTable,
        jobManagerTable,
        jobLeaderService,
        taskStateManager,
        taskEventDispatcher);
}

看到这里,是否有点懵圈了,是不是感觉 TaskManager 实现还挺复杂的,但与 TaskManager 要做的功能相比,上面的实现还不够,真正在 TaskManager 中处理复杂繁琐工作的组件是 TaskExecutor,这个才是 TaskManager 的核心。

TaskExecutor 的启动

回顾一下文章最开始的流程图,TaskManagerRunner 调用 run() 方法之后,真正要启动的是 TaskExecutor 服务,其 onStart() 具体实现如下:

//note: 启动服务
@Override
public void onStart() throws Exception {
    try {
        //note: 启动 TM 的相关服务
        startTaskExecutorServices();
    } catch (Exception e) {
        final TaskManagerException exception = new TaskManagerException(String.format("Could not start the TaskExecutor %s", getAddress()), e);
        onFatalError(exception);
        throw exception;
    }

    //note: 注册超时检测,如果超时还未注册完成,就抛出错误,启动失败
    startRegistrationTimeout();
}

这里,主要分为两个部分:

startTaskExecutorServices(): 启动 TaskManager 相关的服务,结合流程图主要是四大块:
启动心跳服务;
向 Flink Master 的 ResourceManager 注册 TaskManager;
启动 TaskSlotTable 服务(TaskSlot 的维护主要在这个服务中);
启动 JobLeaderService 服务,它主要是监控各个作业 JobManager leader 的变化;
startRegistrationTimeout(): 启动注册超时的检测,默认是5 min,如果超过这个时间还没注册完成,就会抛出异常退出进程,启动失败。
TaskExecutor 启动的核心实现是在 startTaskExecutorServices() 中,其实现如下:

private void startTaskExecutorServices() throws Exception {
    try {
        //note: 启动心跳服务
        startHeartbeatServices();

        //note: 与集群的 ResourceManager 建立连接(并创建一个 listener)
        // start by connecting to the ResourceManager
        resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());

        // tell the task slot table who's responsible for the task slot actions
        //note: taskSlotTable 启动
        taskSlotTable.start(new SlotActionsImpl());

        // start the job leader service
        //note: 启动 job leader 服务
        jobLeaderService.start(getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl());

        fileCache = new FileCache(taskManagerConfiguration.getTmpDirectories(), blobCacheService.getPermanentBlobService());
    } catch (Exception e) {
        handleStartTaskExecutorServicesException(e);
    }
}

接下来,详细这块的实现。

1 启动心跳服务

TaskExecutor 启动的第一个服务就是 HeartbeatManager,这里会启动两个:

  1. jobManagerHeartbeatManager: 用于与 JobManager(如果 Job 有 task 在这个 TM 上,这个 Job 的 JobManager 就与 TaskManager 有心跳通信)之间的心跳通信管理,如果 timeout,这里会重连;
  2. resourceManagerHeartbeatManager:用于与 ResourceManager 之间的通信管理,如果 timeout,这里也会重连。
// TaskExecutor.java
//note: 启动心跳服务
private void startHeartbeatServices() {
    final ResourceID resourceId = taskExecutorServices.getTaskManagerLocation().getResourceID();
    //note: 创建一个与 JM 通信的心跳管理器
    jobManagerHeartbeatManager = heartbeatServices.createHeartbeatManager(
        resourceId,
        new JobManagerHeartbeatListener(),
        getMainThreadExecutor(),
        log);

    //note: 创建一个与 RM 通信的心跳管理器
    resourceManagerHeartbeatManager = heartbeatServices.createHeartbeatManager(
        resourceId,
        new ResourceManagerHeartbeatListener(),
        getMainThreadExecutor(),
        log);
}

2. 向 RM 注册 TM

TaskManger 向 ResourceManager 注册是通过 ResourceManagerLeaderListener 来完成的,它会监控 ResourceManager 的 leader 变化,如果有新的 leader 被选举出来,将会调用 notifyLeaderAddress() 方法去触发与 ResourceManager 的重连,其实现如下:

// TaskExecutor.java
/**
 * The listener for leader changes of the resource manager.
 * note:监控 ResourceManager leader 变化的 listener
 */
private final class ResourceManagerLeaderListener implements LeaderRetrievalListener {

    //note: 如果 leader 被选举处理(包括挂掉之后重新选举),将会调用这个方法通知 TM
    @Override
    public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {
        runAsync(
            () -> notifyOfNewResourceManagerLeader(
                leaderAddress,
                ResourceManagerId.fromUuidOrNull(leaderSessionID)));
    }

    @Override
    public void handleError(Exception exception) {
        onFatalError(exception);
    }
}


//note: 如果 RM 的 new leader 选举出来了,这里会新创建一个 ResourceManagerAddress 对象,并重新建立连接
private void notifyOfNewResourceManagerLeader(String newLeaderAddress, ResourceManagerId newResourceManagerId) {
    resourceManagerAddress = createResourceManagerAddress(newLeaderAddress, newResourceManagerId);
    reconnectToResourceManager(new FlinkException(String.format("ResourceManager leader changed to new address %s", resourceManagerAddress)));
}



//note: 重新与 ResourceManager 连接(可能是 RM leader 切换)
private void reconnectToResourceManager(Exception cause) {
    closeResourceManagerConnection(cause);
    //note: 注册超时检测,如果 timeout 还没注册成功,这里就会 failed
    startRegistrationTimeout();
    //note: 与 RM 重新建立连接
    tryConnectToResourceManager();
}


//note: 建立与 ResourceManager 的连接
private void tryConnectToResourceManager() {
    if (resourceManagerAddress != null) {
        connectToResourceManager();
    }
}


//note: 与 ResourceManager 建立连接
private void connectToResourceManager() {
    assert(resourceManagerAddress != null);
    assert(establishedResourceManagerConnection == null);
    assert(resourceManagerConnection == null);

    log.info("Connecting to ResourceManager {}.", resourceManagerAddress);

    //note: 与 RM 建立连接
    resourceManagerConnection =
        new TaskExecutorToResourceManagerConnection(
            log,
            getRpcService(),
            getAddress(),
            getResourceID(),
            taskManagerConfiguration.getRetryingRegistrationConfiguration(),
            taskManagerLocation.dataPort(),
            hardwareDescription,
            resourceManagerAddress.getAddress(),
            resourceManagerAddress.getResourceManagerId(),
            getMainThreadExecutor(),
            new ResourceManagerRegistrationListener());
    resourceManagerConnection.start();
}

在上面的最后一步,创建了 TaskExecutorToResourceManagerConnection 对象,它启动后,会向 ResourceManager 注册 TM,具体的方法实现如下:

// TaskExecutorToResourceManagerConnection.java
@Override
protected CompletableFuture<RegistrationResponse> invokeRegistration(
        ResourceManagerGateway resourceManager, ResourceManagerId fencingToken, long timeoutMillis) throws Exception {

    Time timeout = Time.milliseconds(timeoutMillis);
    return resourceManager.registerTaskExecutor(
        taskExecutorAddress,
        resourceID,
        dataPort,
        hardwareDescription,
        timeout);
}

ResourceManager 在收到这个请求,会做相应的处理,主要要做的事情就是:先从缓存里移除旧的 TM 注册信息(如果之前存在的话),然后再更新缓存,并增加心跳监控,只有这些工作完成之后,TM 的注册才会被认为是成功的。

3. 启动 TaskSlotTable 服务

TaskSlotTable 从名字也可以看出,它主要是为 TaskSlot 服务的,它主要的功能有以下三点:

  1. 维护这个 TM 上所有 TaskSlot 与 Task、及 Job 的关系;
  2. 维护这个 TM 上所有 TaskSlot 的状态;
  3. TaskSlot 在进行 allocate/free 操作,通过 TimeService 做超时检测。

先看下 TaskSlotTable 是如何初始化的:

// TaskManagerServices.java
//note: 当前 TM 拥有的 slot 及每个 slot 的资源信息
//note: TM 的 slot 数由 taskmanager.numberOfTaskSlots 决定,默认是 1
final int numOfSlots = taskManagerServicesConfiguration.getNumberOfSlots();
final List<ResourceProfile> resourceProfiles =
    Collections.nCopies(numOfSlots, computeSlotResourceProfile(numOfSlots, managedMemorySize));

//note: 注册一个超时(AKKA 超时设置)服务(在 TaskSlotTable 用于监控 slot 分配是否超时)
//note: 超时参数由 akka.ask.timeout 控制,默认是 10s
final TimerService<AllocationID> timerService = new TimerService<>(
    new ScheduledThreadPoolExecutor(1),
    taskManagerServicesConfiguration.getTimerServiceShutdownTimeout());

//note: 这里会维护 slot 相关列表
final TaskSlotTable taskSlotTable = new TaskSlotTable(resourceProfiles, timerService);

TaskSlotTable 的初始化,只需要两个变量:

resourceProfiles: TM 上每个 Slot 的资源信息;
timerService: 超时检测服务,来保证操作超时时做相应的处理。
TaskSlotTable 的启动流程如下:

// TaskExecutor.java

// tell the task slot table who's responsible for the task slot actions
//note: taskSlotTable 启动
taskSlotTable.start(new SlotActionsImpl());

//note: SlotActions 相关方法的实现
private class SlotActionsImpl implements SlotActions {

    //note: 释放 slot 资源
    @Override
    public void freeSlot(final AllocationID allocationId) {
        runAsync(() ->
            freeSlotInternal(
                allocationId,
                new FlinkException("TaskSlotTable requested freeing the TaskSlot " + allocationId + '.')));
    }

    //note: 如果 slot 相关的操作(分配/释放)失败,这里将会调用这个方法
    //note: 监控的手段是:操作前先注册一个 timeout 监控,操作完成后再取消这个监控,如果在这个期间 timeout 了,就会调用这个方法
    //note: TimeService 的 key 是 AllocationID
    @Override
    public void timeoutSlot(final AllocationID allocationId, final UUID ticket) {
        runAsync(() -> TaskExecutor.this.timeoutSlot(allocationId, ticket));
    }
}

4. 启动 JobLeaderService 服务

TaskExecutor 启动的最后一步是,启动 JobLeader 服务,这个服务通过 JobLeaderListenerImpl 监控 Job 的 JobManager leader 的变化,如果 leader 被选举出来之后,这里将会与新的 JobManager leader 建立通信连接。

// TaskExecutor.java

// start the job leader service
//note: 启动 job leader 服务
jobLeaderService.start(getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl());

//note: JobLeaderListener 的实现
private final class JobLeaderListenerImpl implements JobLeaderListener {

    @Override
    public void jobManagerGainedLeadership(
        final JobID jobId,
        final JobMasterGateway jobManagerGateway,
        final JMTMRegistrationSuccess registrationMessage) {
        //note: 建立与 JobManager 的连接
        runAsync(
            () ->
                establishJobManagerConnection(
                    jobId,
                    jobManagerGateway,
                    registrationMessage));
    }

    @Override
    public void jobManagerLostLeadership(final JobID jobId, final JobMasterId jobMasterId) {
        log.info("JobManager for job {} with leader id {} lost leadership.", jobId, jobMasterId);

        runAsync(() ->
            closeJobManagerConnection(
                jobId,
                new Exception("Job leader for job id " + jobId + " lost leadership.")));
    }

    @Override
    public void handleError(Throwable throwable) {
        onFatalError(throwable);
    }
}

到这里,TaskManager 的启动流程就梳理完了,TaskManager 在实现上整体的复杂度还是比较高的,毕竟它要做的事情是非常多的,下面的几个问题,将会进一步分析 TaskManager 内部的实现机制。

TaskManager 提供了哪些能力/功能?

要想知道 TaskManager 提供了哪些能力,个人认为有一个最简单有效的方法就是查看其对外提供的 API 接口,它向上层暴露哪些 API,这些 API 背后都是 TaskManager 能力的体现,TaskManager 对外的包括的 API 列表如下:

  1. requestSlot(): RM 向 TM 请求一个 slot 资源;
  2. requestStackTraceSample(): 请求某个 task 在执行过程中的一个 stack trace 抽样;
  3. submitTask(): JobManager 向 TM 提交 task;
  4. updatePartitions(): 更新这个 task 对应的 Partition 信息;
  5. releasePartitions(): 释放这个 job 的所有中间结果,比如 close 的时候触发;
  6. triggerCheckpoint(): Checkpoint Coordinator 触发 task 的 checkpoint;
  7. confirmCheckpoint(): Checkpoint Coordinator 通知 task 这个 checkpoint 完成;
  8. cancelTask(): task 取消;
  9. heartbeatFromJobManager(): 接收来自 JobManager 的心跳请求;
  10. heartbeatFromResourceManager(): 接收来自 ResourceManager 的心跳请求;
  11. disconnectJobManager()
  12. disconnectResourceManager()
  13. freeSlot(): JobManager 释放 Slot;
  14. requestFileUpload(): 一些文件(log 等)的上传请求;
  15. requestMetricQueryServiceAddress(): 请求 TM 的 metric query service 地址;
  16. canBeReleased(): 检查 TM 是否可以被 realease;

把上面的 API 列表分分类,大概有以下几块:

  1. slot 的资源管理:slot 的分配/释放;
  2. task 运行:接收来自 JobManager 的 task 提交、也包括该 task 对应的 Partition(中间结果)信息;
  3. checkpoint 相关的处理;
  4. 心跳监控、连接建立等。

通常,可以任务 TaskManager 提供的功能主要是前三点,如下图所示:

TaskManager 提供的功能

TaskManager 怎么发现 RM leader(在使用 ZK 做 HA 的情况下)?

这个是 Flink HA 内容,Flink HA 机制是有一套统一的框架,它跟这个问题(TM 如何维护 JobManager 的关系,如果 JobManager 挂掉,TM 会如何处理? )的原理是一样的,这里以 ResourceManager Leader 的发现为例简单介一下。

这里,我们以使用 Zookeeper 模式的情况来讲述,ZooKeeper 做 HA 是业内最常用的方案,Flink 在实现并没有使用 ZkClient 这个包,而是使用 curator 来做的(有兴趣可以看下这篇文章 跟着实例学习ZooKeeper的用法: 缓存)。

关于 Flink HA 的使用,可以参考官方文档——JobManager High Availability (HA)。这里 TaskExecutor 在注册完 ResourceManagerLeaderListener 后,如果 leader 被选举出来或者有节点有变化,就通过它的 notifyLeaderAddress() 方法来通知 TaskExecutor,核心还是利用了 ZK 的 watcher 机制。同理, JobManager leader 的处理也是一样。

TM Slot 资源是如何管理的?

TaskManager Slot 资源的管理主要是在 TaskSlotTable 中处理的,slot 资源的申请与释放都通过 它处理的,相关的流程如下图所示(图中只描述了主要逻辑,相关的异常处理没有展示在图中):

TaskManager slot 的分配与释放

slot 的申请

这里先看下 slot 资源请求的处理,其实现如下:

// TaskExecutor.java

//note: slot 请求
@Override
public CompletableFuture<Acknowledge> requestSlot(
    final SlotID slotId,
    final JobID jobId,
    final AllocationID allocationId,
    final String targetAddress,
    final ResourceManagerId resourceManagerId,
    final Time timeout) {
    // TODO: Filter invalid requests from the resource manager by using the instance/registration Id

    log.info("Receive slot request {} for job {} from resource manager with leader id {}.",
        allocationId, jobId, resourceManagerId);

    try {
        if (!isConnectedToResourceManager(resourceManagerId)) {
            //note: 如果 TM 并没有跟这个 RM 通信,就抛出异常
            final String message = String.format("TaskManager is not connected to the resource manager %s.", resourceManagerId);
            log.debug(message);
            throw new TaskManagerException(message);
        }

        if (taskSlotTable.isSlotFree(slotId.getSlotNumber())) {
            //note: Slot 状态是 free,还未分配出去
            if (taskSlotTable.allocateSlot(slotId.getSlotNumber(), jobId, allocationId, taskManagerConfiguration.getTimeout())) {
                log.info("Allocated slot for {}.", allocationId);
                //note: allcate 成功
            } else {
                log.info("Could not allocate slot for {}.", allocationId);
                throw new SlotAllocationException("Could not allocate slot.");
            }
        } else if (!taskSlotTable.isAllocated(slotId.getSlotNumber(), jobId, allocationId)) {
            //note: slot 已经分配出去,但分配的并不是当前这个作业
            final String message = "The slot " + slotId + " has already been allocated for a different job.";

            log.info(message);

            final AllocationID allocationID = taskSlotTable.getCurrentAllocation(slotId.getSlotNumber());
            throw new SlotOccupiedException(message, allocationID, taskSlotTable.getOwningJob(allocationID));
        }

        if (jobManagerTable.contains(jobId)) {
            //note: 如果 TM 已经有这个 JobManager 的 meta,这里会将这个 job 的 slot 分配再汇报给 JobManager 一次
            offerSlotsToJobManager(jobId);
        } else {
            try {
                //note: 监控这个作业 JobManager 的 leader 变化
                jobLeaderService.addJob(jobId, targetAddress);
            } catch (Exception e) {
                // free the allocated slot
                try {
                    taskSlotTable.freeSlot(allocationId);
                } catch (SlotNotFoundException slotNotFoundException) {
                    // slot no longer existent, this should actually never happen, because we've
                    // just allocated the slot. So let's fail hard in this case!
                    onFatalError(slotNotFoundException);
                }

                // release local state under the allocation id.
                localStateStoresManager.releaseLocalStateForAllocationId(allocationId);

                // sanity check
                if (!taskSlotTable.isSlotFree(slotId.getSlotNumber())) {
                    onFatalError(new Exception("Could not free slot " + slotId));
                }

                throw new SlotAllocationException("Could not add job to job leader service.", e);
            }
        }
    } catch (TaskManagerException taskManagerException) {
        return FutureUtils.completedExceptionally(taskManagerException);
    }

    return CompletableFuture.completedFuture(Acknowledge.get());
}

相应的处理逻辑如下:

首先检测这个这个 RM 是否当前建立连接的 RM,如果不是,就抛出相应的异常,需要等到 TM 连接上 RM 之后才能处理 RM 上的 slot 请求;
判断这个 slot 是否可以分配
如果 slot 是 FREE 状态,就进行分配(调用 TaskSlotTable 的 allocateSlot() 方法),如果分配失败,就抛出相应的异常;
如果 slot 已经分配,检查分配的是不是当前作业的 AllocationId,如果不是,也会抛出相应的异常,告诉 RM 这个 Slot 已经分配出去了;
如果 TM 已经有了这个 JobManager 的 meta,这里会将这个 job 在这个 TM 上的 slot 分配再重新汇报给 JobManager 一次;
而 TaskSlotTable 在处理 slot 的分配时,主要是根据内部缓存的信息做相应的检查,其 allocateSlot() 的方法的实现如下:

// TaskSlotTable.java
public boolean allocateSlot(int index, JobID jobId, AllocationID allocationId, Time slotTimeout) {
    checkInit();

    TaskSlot taskSlot = taskSlots.get(index);

    //note: 分配这个 TaskSlot
    boolean result = taskSlot.allocate(jobId, allocationId);

    if (result) {
        //note: 分配成功,记录到缓存中
        // update the allocation id to task slot map
        allocationIDTaskSlotMap.put(allocationId, taskSlot);

        // register a timeout for this slot since it's in state allocated
        timerService.registerTimeout(allocationId, slotTimeout.getSize(), slotTimeout.getUnit());

        // add this slot to the set of job slots
        Set<AllocationID> slots = slotsPerJob.get(jobId);

        if (slots == null) {
            slots = new HashSet<>(4);
            slotsPerJob.put(jobId, slots);
        }

        slots.add(allocationId);
    }

    return result;
}

slot 的释放

这里再看下 Slot 的资源是如何释放的,代码实现如下:

// TaskExecutor.java

//note: 释放这个 slot 资源
@Override
public CompletableFuture<Acknowledge> freeSlot(AllocationID allocationId, Throwable cause, Time timeout) {
    freeSlotInternal(allocationId, cause);

    return CompletableFuture.completedFuture(Acknowledge.get());
}


//note: 将本地分配的 slot 释放掉(free the slot)
private void freeSlotInternal(AllocationID allocationId, Throwable cause) {
    checkNotNull(allocationId);

    log.debug("Free slot with allocation id {} because: {}", allocationId, cause.getMessage());

    try {
        final JobID jobId = taskSlotTable.getOwningJob(allocationId);

        //note: 释放这个 slot
        final int slotIndex = taskSlotTable.freeSlot(allocationId, cause);

        if (slotIndex != -1) {
            //note: 成功释放掉的情况下

            if (isConnectedToResourceManager()) {
                //note: 通知 ResourceManager 这个 slot 因为被释放了,所以可以变可用了
                // the slot was freed. Tell the RM about it
                ResourceManagerGateway resourceManagerGateway = establishedResourceManagerConnection.getResourceManagerGateway();

                resourceManagerGateway.notifySlotAvailable(
                    establishedResourceManagerConnection.getTaskExecutorRegistrationId(),
                    new SlotID(getResourceID(), slotIndex),
                    allocationId);
            }

            if (jobId != null) {
                closeJobManagerConnectionIfNoAllocatedResources(jobId);
            }
        }
    } catch (SlotNotFoundException e) {
        log.debug("Could not free slot for allocation id {}.", allocationId, e);
    }

    //note: 释放这个 allocationId 的相应状态信息
    localStateStoresManager.releaseLocalStateForAllocationId(allocationId);
}

总结一下,TaskExecutor 在处理 slot 释放请求的理逻辑如下:

先调用 TaskSlotTable 的 freeSlot() 方法,尝试释放这个 slot:
如果这个 slot 没有 task 在运行,那么 slot 是可以释放的(状态更新为 FREE);
先将 slot 状态更新为 RELEASING,然后再遍历这个 slot 上的 task,逐个将其标记为 failed;
如果 slot 被成功释放(状态是 FREE),这里将会通知 RM 这个 slot 现在又可用了;
更新缓存信息。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,723评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,080评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,604评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,440评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,431评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,499评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,893评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,541评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,751评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,547评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,619评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,320评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,890评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,896评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,137评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,796评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,335评论 2 342

推荐阅读更多精彩内容

  • 转载于:http://matt33.com/2019/12/23/flink-master-5/[http://m...
    无色的叶阅读 1,474评论 0 1
  • JobManager 详解 JobMaster 在实现中,也依赖了很多的服务,其中最重要的是 SchedulerN...
    无色的叶阅读 2,565评论 0 4
  • Flink源码分析系列文档目录 请点击:Flink 源码分析系列文档目录[https://www.jianshu....
    AlienPaul阅读 2,455评论 1 7
  • 1. 背景 心跳机制通过定期向对方发送请求方式用于检测客户端或者服务端是否存活的一种机制,常见的心跳检测有两种: ...
    sj_91d7阅读 1,616评论 0 5
  • 本来已为逃得过,但原来避无可避,在k8s上玩flink,走起~参考URL:https://www.dazhuanl...
    万州客阅读 738评论 0 0