Flink 组共享原理

本文将主要结合流程图,代码注释及要点标注进行讲解 Flink 组共享的代码逻辑,帮助读者从设计原理层更好的理解Flink Job的slot分配结果。提高对Flink Job 资源分配的理解能力。在阅读本文之前,需要读者能够对 Flink StreamGraph & JobGraph有一定的基础概念了解。

1.1 核心类 ExecutionSlotSharingGroupBuilder

1.2 类主要属性如下

//...// 其他代码占位符     
    private static class ExecutionSlotSharingGroupBuilder {
        // 并行化后的所有的 ExecutionVertex 的容器,初始化后包含当前Job的并行化后所有 ExecutionVertex
        private final SchedulingTopology topology;

        // 未并行化的 JobVertex 对应的 SlotSharingGroup 映射表,初始化时根据topology进行初始化
        private final Map<JobVertexID, SlotSharingGroup> slotSharingGroupMap;

        // 未并行化的 JobVertex 对应的 CoLocationGroup 映射表,初始化时根据topology进行初始化
        private final Map<JobVertexID, CoLocationGroup> coLocationGroupMap;

        // 并行化后的 ExecutionVertex 对应的 ExecutionSlotSharingGroup 映射表,初始化时设为 empty Map. 
        // 此属性也为最终的分配结果集
        private final Map<ExecutionVertexID, ExecutionSlotSharingGroup>
                executionSlotSharingGroupMap;

        // CoLocationConstraint 对应的 ExecutionSlotSharingGroup 映射表,初始化时设为 empty Map.
        // 此处可以理解为colocation最后依赖于ExecutionSlotSharingGroup进行控制共享
        final Map<CoLocationConstraint, ExecutionSlotSharingGroup>
                constraintToExecutionSlotSharingGroupMap;

        // 未并行化(逻辑的,JobVertex抽象层对应的) SlotSharingGroupId 对应的 
        // ExecutionSlotSharingGroup(并行化后的,ExecutionVertex抽象层对应的) 列表 的 映射表,
        // 初始化时设为 empty Map.
        final Map<SlotSharingGroupId, List<ExecutionSlotSharingGroup>> executionSlotSharingGroups;

        // ExecutionSlotSharingGroup(并行化后的,物理的,ExecutionVertex 抽象层对应的)
        // 对应的被分配到本(共享)组的JobVertex, 初始化时设为 empty Map.
        private final Map<ExecutionSlotSharingGroup, Set<JobVertexID>> assignedJobVerticesForGroups;
//...// 其他代码占位符

1.3 主要属性的初始化

private ExecutionSlotSharingGroupBuilder(
                final SchedulingTopology topology,
                final Set<SlotSharingGroup> logicalSlotSharingGroups,
                final Set<CoLocationGroup> coLocationGroups) {

            this.topology = checkNotNull(topology);

            this.slotSharingGroupMap = new HashMap<>();
            for (SlotSharingGroup slotSharingGroup : logicalSlotSharingGroups) {
                for (JobVertexID jobVertexId : slotSharingGroup.getJobVertexIds()) {
                    slotSharingGroupMap.put(jobVertexId, slotSharingGroup);
                }
            }

            this.coLocationGroupMap = new HashMap<>();
            for (CoLocationGroup coLocationGroup : coLocationGroups) {
                for (JobVertexID jobVertexId : coLocationGroup.getVertexIds()) {
                    coLocationGroupMap.put(jobVertexId, coLocationGroup);
                }
            }

            executionSlotSharingGroupMap = new HashMap<>();
            constraintToExecutionSlotSharingGroupMap = new HashMap<>();
            executionSlotSharingGroups = new HashMap<>();
            assignedJobVerticesForGroups = new IdentityHashMap<>();
        }


1.4 自顶向下理解分配的过程--先开始分配的方法起始位置

  • 方法:build

  • 流程图

在这里插入图片描述
  • 代码摘要
                /**
         * Build ExecutionSlotSharingGroups for all vertices in the topology. The
         * ExecutionSlotSharingGroup of a vertex is determined in order below:
         *
         * <p>1. 尝试查找已存在的相应的 co-location 约束对应的组。
         *
         * <p>2. 如果上游的ExecutionVertex节点与当前节点的 slot sharing group 相同,则尝试查找上游的节点中可用的组
         *
         * <p>3. 尝试查找其他可用的组.
         *
         * <p>4. 如果没有找到则自行创建新的可用组.
         */
        private Map<ExecutionVertexID, ExecutionSlotSharingGroup> build() {
            final LinkedHashMap<JobVertexID, List<SchedulingExecutionVertex>> allVertices =
                    getExecutionVertices();

            // loop on job vertices so that an execution vertex will not be added into a group
            // if that group better fits another execution vertex
            for (List<SchedulingExecutionVertex> executionVertices : allVertices.values()) {
                /**
                 * 所有的 JobVertex 对应的 并行化后的等待执行的 executionVertex
                 * 整个过程中,将剩余的未分配到slotSharingGroup 的 EV 作为 remaining 的结果
                 */
                final List<SchedulingExecutionVertex> remaining =
                        tryFindOptimalAvailableExecutionSlotSharingGroupFor(executionVertices);
                /**
                 * 对剩余的 EVs 查找可用的 ExecutionSharingGroup,
                 * 如果查找不到则进行创建
                 */
                findAvailableOrCreateNewExecutionSlotSharingGroupFor(remaining);
                //更新 executionVertices 对应的 Constraint 到 ExecutionSlotSharingGroup 的映射的关系
                updateConstraintToExecutionSlotSharingGroupMap(executionVertices);
            }
            return executionSlotSharingGroupMap;
        }

1.4.1 尝试最优化选择可用的共享组

  • 方法:tryFindOptimalAvailableExecutionSlotSharingGroupFor
  • 流程图
在这里插入图片描述
  • 代码摘要
                /**
         * @param executionVertices 某个 JV 并行化后的所有 EV
         * @return
         */
        private List<SchedulingExecutionVertex> tryFindOptimalAvailableExecutionSlotSharingGroupFor(
                final List<SchedulingExecutionVertex> executionVertices) {
          
            final List<SchedulingExecutionVertex> remaining = new ArrayList<>();
            for (SchedulingExecutionVertex executionVertex : executionVertices) {
                /**
                 * 获取 executionVertex 对应的 executionSlotSharingGroup
                 * executionSlotSharingGroup 是一个持有资源信息 和 一组 EV 的对象
                 */
                ExecutionSlotSharingGroup group =
                        tryFindAvailableCoLocatedExecutionSlotSharingGroupFor(executionVertex);
                // 如果这个共享组不存在
                if (group == null) {
                    /**
                     * 查找上游的能否和上游的某个 EV 分配到一个 SlotSharingGroup ,判断条件是 上下游两个 EV 的
                     * slotSharingGroup 是否一致
                     **/
                    group = tryFindAvailableProducerExecutionSlotSharingGroupFor(executionVertex);
                }
                /**
                 * 如果不能分配到共享组
                 * 则将当前的EV即目标EV 添加到 remaining
                 */
                if (group == null) {
                    remaining.add(executionVertex);
                } else {
                    /** 如果分配到了共享组
                     * 将EV添加到指定的共享组,
                     * 同时更新 executionSlotSharingGroupMap ,添加 EV_ID和对应group即共享组的信息
                     * 同时更新 assignedJobVerticesForGroups, 在 group 对应的 JobVertex 容器中添加 EV
                     * 对应的 JobVertex.
                     * */
                    addVertexToExecutionSlotSharingGroup(executionVertex, group);
                }
            }
            return remaining;
        }
  • 小结:
1. 优先选择 colocation 限制进行组共享
2. 如果 1. 无法达成,则优先选在和上游的任务节点(EV)进行组共享
3. 如果以上两条都不能达成,则将无法达成共享组的EV返回到remainning列表

1.4.1.1 选择 colocation 限制进行组共享

  • 方法:tryFindAvailableCoLocatedExecutionSlotSharingGroupFor
  • 流程图
在这里插入图片描述
  • 代码摘要
                /**
         * 获取 executionVertex 对应的 executionSlotSharingGroup
         * executionSlotSharingGroup 是一个持有资源信息 和 一组 EV 的对象
         * @param executionVertex 某个 具体的 EV
         * @return
         */
        private ExecutionSlotSharingGroup tryFindAvailableCoLocatedExecutionSlotSharingGroupFor(
                final SchedulingExecutionVertex executionVertex) {

            // 获取 EV_ID
            final ExecutionVertexID executionVertexId = executionVertex.getId();
            // 从 CoLocationMap (map<JV_id, CoLocationGroup>) 获取当前 EV_ID 所属的 JV_ID 对应的 CoLocationGroup
            // CoLocationGroup 是一个持有一组 JV 的容器,拥有自己的 ID
            final CoLocationGroup coLocationGroup =
                    coLocationGroupMap.get(executionVertexId.getJobVertexId());
            // 如果 EV_ID 对应的 coLocationGroup 存在
            if (coLocationGroup != null) {
                // 从对应的 coLocationGroup 构建新的 CoLocationConstraint,此处的 CoLocationConstraint
                // 已经重写 equals 方法和 hashCode 方法
                // CoLocationConstraint 包含 coLocationGroupId 和 constraintIndex 属性;
                final CoLocationConstraint constraint =
                        coLocationGroup.getLocationConstraint(executionVertexId.getSubtaskIndex());

                // 从 Map<CoLocationConstraint, ExecutionSlotSharingGroup>
                // 获取对应的 executionSlotSharingGroup
                // executionSlotSharingGroup 是一个持有资源信息 和 一组 EV 的对象
                return constraintToExecutionSlotSharingGroupMap.get(constraint);
            } else {
                // 否则返回 null
                return null;
            }
        }
  • 注意:
同一个JobVertex的不同子task即ExecutionVertex(EV)不能使用同一个共享组,即使是强制限制也不行

1.4.1.2 尝试上游 EV 是否可以进行组共享

  • 方法:tryFindAvailableProducerExecutionSlotSharingGroupFor
  • 流程图
在这里插入图片描述
  • 代码摘要
                /**
         * 查找上游的能否和上游的某个 EV 分配到一个 SlotSharingGroup ,判断条件是 上下游两个 EV 的
         * slotSharingGroup 是否一致
         * @param executionVertex 并行化后的 EV
         * @return 目标 ExecutionSlotSharingGroup 持有一组 EV 和 Resource 信息的容器,如果有,则返回true,
         * 如果没有则返回false
         */
        private ExecutionSlotSharingGroup tryFindAvailableProducerExecutionSlotSharingGroupFor(
                final SchedulingExecutionVertex executionVertex) {

            // 获取目标 EV 的 EV_ID
            final ExecutionVertexID executionVertexId = executionVertex.getId();

            // 获取目标 EV 的上游的 ResultPartition
            for (SchedulingResultPartition partition : executionVertex.getConsumedResults()) {
                // 获取 目标 EV 的上游的 ResultPartition 的生产者即与其连接的上游的 EV
                final ExecutionVertexID producerVertexId = partition.getProducer().getId();
                // 如果目标 EV 和上游 EV 不属于同一个 sharingGroup, 则跳过
                if (!inSameLogicalSlotSharingGroup(producerVertexId, executionVertexId)) {
                    continue;
                }
                // 如果目标 EV 和上游 EV 属于同一个 sharingGroup, 则获取上游 EV 对应的 sharingGroup
                final ExecutionSlotSharingGroup producerGroup =
                        executionSlotSharingGroupMap.get(producerVertexId);

                // 检查上游的 EV 对应的已分配的 ESG executionSlotSharingGroup
                // 如果上游的 EV 对应的已分配的 ESG executionSlotSharingGroup为空 ,则抛出异常
                checkState(producerGroup != null);
                // 如果当前的 executionSlotSharingGroup 对应的 JobVertex 集合中不包含  executionVertexId
                //* 所代表的的 JobVertex, 则返回 true,否则 返回 false
                if (isGroupAvailableForVertex(producerGroup, executionVertexId)) {
                    return producerGroup;
                }
            }

            return null;
        }
  • 注意:
同一个JobVertex的不同子task即ExecutionVertex(EV)不能使用同一个共享组

1.4.2 对剩余的未进行组共享的EV进行共享组分配

  • 方法:findAvailableOrCreateNewExecutionSlotSharingGroupFor
  • 流程图
在这里插入图片描述
  • 代码摘要
                /**
         * 对剩余的 EVs 查找可用的 ExecutionSharingGorup,
         * 如果查找不到则进行创建
         */
        private void findAvailableOrCreateNewExecutionSlotSharingGroupFor(
                final List<SchedulingExecutionVertex> executionVertices) {

            for (SchedulingExecutionVertex executionVertex : executionVertices) {
                // 获取当前 EV 对应的JobVertex的 slotSharingGroup
                final SlotSharingGroup slotSharingGroup =
                        getSlotSharingGroup(executionVertex.getId());
                /**
                 * slotSharingGroup->List of ExecutionSlotSharingGroup
                 * SlotSharingGroup 并行化后为   ExecutionSlotSharingGroup
                 */
                final List<ExecutionSlotSharingGroup> groups =
                        executionSlotSharingGroups.computeIfAbsent(
                                slotSharingGroup.getSlotSharingGroupId(), k -> new ArrayList<>());
                ExecutionSlotSharingGroup group = null;
                for (ExecutionSlotSharingGroup executionSlotSharingGroup : groups) {
                    /**
                     * 如果对于当前的 EV 在 ESSG中可以分配,则进行分配,且停止循环
                     */
                    if (isGroupAvailableForVertex(
                            executionSlotSharingGroup, executionVertex.getId())) {
                        group = executionSlotSharingGroup;
                        break;
                    }
                }
                /**
                 * 如果在 所有的 executionSharingGroup 中都无法分配当前的 EV
                 * 则重新创建一个 ExecutionSlotSharingGroup
                 * 并且添加到 executionSlotSharingGroups的字典中 executionSlotSharingGroups :
                 * SlotSharingGroupId -> list of ExecutionSlotSharingGroup
                 *
                 */
                if (group == null) {
                    group = new ExecutionSlotSharingGroup();
                    group.setResourceProfile(slotSharingGroup.getResourceProfile());
                    groups.add(group);
                }
                // 将EV添加到指定的共享组,
                addVertexToExecutionSlotSharingGroup(executionVertex, group);
            }
        }


1.4.3 更新 Constraint(并行化后的colocationGroupKey)与共享组的对应关系

  • 方法:updateConstraintToExecutionSlotSharingGroupMap
  • 流程图
在这里插入图片描述
  • 代码摘要
                /**
         * 更新 executionVertices 对应的 Constraint 到 ExecutionSlotSharingGroup 的映射的关系
         * @param executionVertices 入参
         */
        private void updateConstraintToExecutionSlotSharingGroupMap(
                final List<SchedulingExecutionVertex> executionVertices) {

            for (SchedulingExecutionVertex executionVertex : executionVertices) {
                 //* 获取当前的EV_id
                final ExecutionVertexID executionVertexId = executionVertex.getId();
                 //* 获取当前 EV 的 JV 对应的 CoLocationGroup
                 //* private final Map<JobVertexID, CoLocationGroup> coLocationGroupMap;
                final CoLocationGroup coLocationGroup =
                        coLocationGroupMap.get(executionVertexId.getJobVertexId());
                // 如果 当前 EV 的 JV 对应的 CoLocationGroup 存在
                if (coLocationGroup != null) {
                    // 则获取此 executionVertex 对应的  CoLocationConstraint
                    final CoLocationConstraint constraint =
                            coLocationGroup.getLocationConstraint(
                                    executionVertexId.getSubtaskIndex());
                    // Map<CoLocationConstraint, ExecutionSlotSharingGroup>
                    // 更新此 CoLocationConstraint 对应的 ExecutionSlotSharingGroup
                    constraintToExecutionSlotSharingGroupMap.put(
                            constraint, executionSlotSharingGroupMap.get(executionVertexId));
                }
            }
        }

共勉......

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,723评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,080评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,604评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,440评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,431评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,499评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,893评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,541评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,751评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,547评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,619评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,320评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,890评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,896评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,137评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,796评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,335评论 2 342

推荐阅读更多精彩内容