本文将主要结合流程图,代码注释及要点标注进行讲解 Flink 组共享的代码逻辑,帮助读者从设计原理层更好的理解Flink Job的slot分配结果。提高对Flink Job 资源分配的理解能力。在阅读本文之前,需要读者能够对 Flink StreamGraph & JobGraph有一定的基础概念了解。
1.1 核心类 ExecutionSlotSharingGroupBuilder
1.2 类主要属性如下
//...// 其他代码占位符
private static class ExecutionSlotSharingGroupBuilder {
// 并行化后的所有的 ExecutionVertex 的容器,初始化后包含当前Job的并行化后所有 ExecutionVertex
private final SchedulingTopology topology;
// 未并行化的 JobVertex 对应的 SlotSharingGroup 映射表,初始化时根据topology进行初始化
private final Map<JobVertexID, SlotSharingGroup> slotSharingGroupMap;
// 未并行化的 JobVertex 对应的 CoLocationGroup 映射表,初始化时根据topology进行初始化
private final Map<JobVertexID, CoLocationGroup> coLocationGroupMap;
// 并行化后的 ExecutionVertex 对应的 ExecutionSlotSharingGroup 映射表,初始化时设为 empty Map.
// 此属性也为最终的分配结果集
private final Map<ExecutionVertexID, ExecutionSlotSharingGroup>
executionSlotSharingGroupMap;
// CoLocationConstraint 对应的 ExecutionSlotSharingGroup 映射表,初始化时设为 empty Map.
// 此处可以理解为colocation最后依赖于ExecutionSlotSharingGroup进行控制共享
final Map<CoLocationConstraint, ExecutionSlotSharingGroup>
constraintToExecutionSlotSharingGroupMap;
// 未并行化(逻辑的,JobVertex抽象层对应的) SlotSharingGroupId 对应的
// ExecutionSlotSharingGroup(并行化后的,ExecutionVertex抽象层对应的) 列表 的 映射表,
// 初始化时设为 empty Map.
final Map<SlotSharingGroupId, List<ExecutionSlotSharingGroup>> executionSlotSharingGroups;
// ExecutionSlotSharingGroup(并行化后的,物理的,ExecutionVertex 抽象层对应的)
// 对应的被分配到本(共享)组的JobVertex, 初始化时设为 empty Map.
private final Map<ExecutionSlotSharingGroup, Set<JobVertexID>> assignedJobVerticesForGroups;
//...// 其他代码占位符
1.3 主要属性的初始化
private ExecutionSlotSharingGroupBuilder(
final SchedulingTopology topology,
final Set<SlotSharingGroup> logicalSlotSharingGroups,
final Set<CoLocationGroup> coLocationGroups) {
this.topology = checkNotNull(topology);
this.slotSharingGroupMap = new HashMap<>();
for (SlotSharingGroup slotSharingGroup : logicalSlotSharingGroups) {
for (JobVertexID jobVertexId : slotSharingGroup.getJobVertexIds()) {
slotSharingGroupMap.put(jobVertexId, slotSharingGroup);
}
}
this.coLocationGroupMap = new HashMap<>();
for (CoLocationGroup coLocationGroup : coLocationGroups) {
for (JobVertexID jobVertexId : coLocationGroup.getVertexIds()) {
coLocationGroupMap.put(jobVertexId, coLocationGroup);
}
}
executionSlotSharingGroupMap = new HashMap<>();
constraintToExecutionSlotSharingGroupMap = new HashMap<>();
executionSlotSharingGroups = new HashMap<>();
assignedJobVerticesForGroups = new IdentityHashMap<>();
}
1.4 自顶向下理解分配的过程--先开始分配的方法起始位置
方法:
build
流程图
- 代码摘要
/**
* Build ExecutionSlotSharingGroups for all vertices in the topology. The
* ExecutionSlotSharingGroup of a vertex is determined in order below:
*
* <p>1. 尝试查找已存在的相应的 co-location 约束对应的组。
*
* <p>2. 如果上游的ExecutionVertex节点与当前节点的 slot sharing group 相同,则尝试查找上游的节点中可用的组
*
* <p>3. 尝试查找其他可用的组.
*
* <p>4. 如果没有找到则自行创建新的可用组.
*/
private Map<ExecutionVertexID, ExecutionSlotSharingGroup> build() {
final LinkedHashMap<JobVertexID, List<SchedulingExecutionVertex>> allVertices =
getExecutionVertices();
// loop on job vertices so that an execution vertex will not be added into a group
// if that group better fits another execution vertex
for (List<SchedulingExecutionVertex> executionVertices : allVertices.values()) {
/**
* 所有的 JobVertex 对应的 并行化后的等待执行的 executionVertex
* 整个过程中,将剩余的未分配到slotSharingGroup 的 EV 作为 remaining 的结果
*/
final List<SchedulingExecutionVertex> remaining =
tryFindOptimalAvailableExecutionSlotSharingGroupFor(executionVertices);
/**
* 对剩余的 EVs 查找可用的 ExecutionSharingGroup,
* 如果查找不到则进行创建
*/
findAvailableOrCreateNewExecutionSlotSharingGroupFor(remaining);
//更新 executionVertices 对应的 Constraint 到 ExecutionSlotSharingGroup 的映射的关系
updateConstraintToExecutionSlotSharingGroupMap(executionVertices);
}
return executionSlotSharingGroupMap;
}
1.4.1 尝试最优化选择可用的共享组
- 方法:
tryFindOptimalAvailableExecutionSlotSharingGroupFor
- 流程图
- 代码摘要
/**
* @param executionVertices 某个 JV 并行化后的所有 EV
* @return
*/
private List<SchedulingExecutionVertex> tryFindOptimalAvailableExecutionSlotSharingGroupFor(
final List<SchedulingExecutionVertex> executionVertices) {
final List<SchedulingExecutionVertex> remaining = new ArrayList<>();
for (SchedulingExecutionVertex executionVertex : executionVertices) {
/**
* 获取 executionVertex 对应的 executionSlotSharingGroup
* executionSlotSharingGroup 是一个持有资源信息 和 一组 EV 的对象
*/
ExecutionSlotSharingGroup group =
tryFindAvailableCoLocatedExecutionSlotSharingGroupFor(executionVertex);
// 如果这个共享组不存在
if (group == null) {
/**
* 查找上游的能否和上游的某个 EV 分配到一个 SlotSharingGroup ,判断条件是 上下游两个 EV 的
* slotSharingGroup 是否一致
**/
group = tryFindAvailableProducerExecutionSlotSharingGroupFor(executionVertex);
}
/**
* 如果不能分配到共享组
* 则将当前的EV即目标EV 添加到 remaining
*/
if (group == null) {
remaining.add(executionVertex);
} else {
/** 如果分配到了共享组
* 将EV添加到指定的共享组,
* 同时更新 executionSlotSharingGroupMap ,添加 EV_ID和对应group即共享组的信息
* 同时更新 assignedJobVerticesForGroups, 在 group 对应的 JobVertex 容器中添加 EV
* 对应的 JobVertex.
* */
addVertexToExecutionSlotSharingGroup(executionVertex, group);
}
}
return remaining;
}
- 小结:
1. 优先选择 colocation 限制进行组共享
2. 如果 1. 无法达成,则优先选在和上游的任务节点(EV)进行组共享
3. 如果以上两条都不能达成,则将无法达成共享组的EV返回到remainning列表
1.4.1.1 选择 colocation 限制进行组共享
- 方法:
tryFindAvailableCoLocatedExecutionSlotSharingGroupFor
- 流程图
- 代码摘要
/**
* 获取 executionVertex 对应的 executionSlotSharingGroup
* executionSlotSharingGroup 是一个持有资源信息 和 一组 EV 的对象
* @param executionVertex 某个 具体的 EV
* @return
*/
private ExecutionSlotSharingGroup tryFindAvailableCoLocatedExecutionSlotSharingGroupFor(
final SchedulingExecutionVertex executionVertex) {
// 获取 EV_ID
final ExecutionVertexID executionVertexId = executionVertex.getId();
// 从 CoLocationMap (map<JV_id, CoLocationGroup>) 获取当前 EV_ID 所属的 JV_ID 对应的 CoLocationGroup
// CoLocationGroup 是一个持有一组 JV 的容器,拥有自己的 ID
final CoLocationGroup coLocationGroup =
coLocationGroupMap.get(executionVertexId.getJobVertexId());
// 如果 EV_ID 对应的 coLocationGroup 存在
if (coLocationGroup != null) {
// 从对应的 coLocationGroup 构建新的 CoLocationConstraint,此处的 CoLocationConstraint
// 已经重写 equals 方法和 hashCode 方法
// CoLocationConstraint 包含 coLocationGroupId 和 constraintIndex 属性;
final CoLocationConstraint constraint =
coLocationGroup.getLocationConstraint(executionVertexId.getSubtaskIndex());
// 从 Map<CoLocationConstraint, ExecutionSlotSharingGroup>
// 获取对应的 executionSlotSharingGroup
// executionSlotSharingGroup 是一个持有资源信息 和 一组 EV 的对象
return constraintToExecutionSlotSharingGroupMap.get(constraint);
} else {
// 否则返回 null
return null;
}
}
- 注意:
同一个JobVertex的不同子task即ExecutionVertex(EV)不能使用同一个共享组,即使是强制限制也不行
1.4.1.2 尝试上游 EV 是否可以进行组共享
- 方法:
tryFindAvailableProducerExecutionSlotSharingGroupFor
- 流程图
- 代码摘要
/**
* 查找上游的能否和上游的某个 EV 分配到一个 SlotSharingGroup ,判断条件是 上下游两个 EV 的
* slotSharingGroup 是否一致
* @param executionVertex 并行化后的 EV
* @return 目标 ExecutionSlotSharingGroup 持有一组 EV 和 Resource 信息的容器,如果有,则返回true,
* 如果没有则返回false
*/
private ExecutionSlotSharingGroup tryFindAvailableProducerExecutionSlotSharingGroupFor(
final SchedulingExecutionVertex executionVertex) {
// 获取目标 EV 的 EV_ID
final ExecutionVertexID executionVertexId = executionVertex.getId();
// 获取目标 EV 的上游的 ResultPartition
for (SchedulingResultPartition partition : executionVertex.getConsumedResults()) {
// 获取 目标 EV 的上游的 ResultPartition 的生产者即与其连接的上游的 EV
final ExecutionVertexID producerVertexId = partition.getProducer().getId();
// 如果目标 EV 和上游 EV 不属于同一个 sharingGroup, 则跳过
if (!inSameLogicalSlotSharingGroup(producerVertexId, executionVertexId)) {
continue;
}
// 如果目标 EV 和上游 EV 属于同一个 sharingGroup, 则获取上游 EV 对应的 sharingGroup
final ExecutionSlotSharingGroup producerGroup =
executionSlotSharingGroupMap.get(producerVertexId);
// 检查上游的 EV 对应的已分配的 ESG executionSlotSharingGroup
// 如果上游的 EV 对应的已分配的 ESG executionSlotSharingGroup为空 ,则抛出异常
checkState(producerGroup != null);
// 如果当前的 executionSlotSharingGroup 对应的 JobVertex 集合中不包含 executionVertexId
//* 所代表的的 JobVertex, 则返回 true,否则 返回 false
if (isGroupAvailableForVertex(producerGroup, executionVertexId)) {
return producerGroup;
}
}
return null;
}
- 注意:
同一个JobVertex的不同子task即ExecutionVertex(EV)不能使用同一个共享组
1.4.2 对剩余的未进行组共享的EV进行共享组分配
- 方法:
findAvailableOrCreateNewExecutionSlotSharingGroupFor
- 流程图
- 代码摘要
/**
* 对剩余的 EVs 查找可用的 ExecutionSharingGorup,
* 如果查找不到则进行创建
*/
private void findAvailableOrCreateNewExecutionSlotSharingGroupFor(
final List<SchedulingExecutionVertex> executionVertices) {
for (SchedulingExecutionVertex executionVertex : executionVertices) {
// 获取当前 EV 对应的JobVertex的 slotSharingGroup
final SlotSharingGroup slotSharingGroup =
getSlotSharingGroup(executionVertex.getId());
/**
* slotSharingGroup->List of ExecutionSlotSharingGroup
* SlotSharingGroup 并行化后为 ExecutionSlotSharingGroup
*/
final List<ExecutionSlotSharingGroup> groups =
executionSlotSharingGroups.computeIfAbsent(
slotSharingGroup.getSlotSharingGroupId(), k -> new ArrayList<>());
ExecutionSlotSharingGroup group = null;
for (ExecutionSlotSharingGroup executionSlotSharingGroup : groups) {
/**
* 如果对于当前的 EV 在 ESSG中可以分配,则进行分配,且停止循环
*/
if (isGroupAvailableForVertex(
executionSlotSharingGroup, executionVertex.getId())) {
group = executionSlotSharingGroup;
break;
}
}
/**
* 如果在 所有的 executionSharingGroup 中都无法分配当前的 EV
* 则重新创建一个 ExecutionSlotSharingGroup
* 并且添加到 executionSlotSharingGroups的字典中 executionSlotSharingGroups :
* SlotSharingGroupId -> list of ExecutionSlotSharingGroup
*
*/
if (group == null) {
group = new ExecutionSlotSharingGroup();
group.setResourceProfile(slotSharingGroup.getResourceProfile());
groups.add(group);
}
// 将EV添加到指定的共享组,
addVertexToExecutionSlotSharingGroup(executionVertex, group);
}
}
1.4.3 更新 Constraint(并行化后的colocationGroupKey)与共享组的对应关系
- 方法:
updateConstraintToExecutionSlotSharingGroupMap
- 流程图
- 代码摘要
/**
* 更新 executionVertices 对应的 Constraint 到 ExecutionSlotSharingGroup 的映射的关系
* @param executionVertices 入参
*/
private void updateConstraintToExecutionSlotSharingGroupMap(
final List<SchedulingExecutionVertex> executionVertices) {
for (SchedulingExecutionVertex executionVertex : executionVertices) {
//* 获取当前的EV_id
final ExecutionVertexID executionVertexId = executionVertex.getId();
//* 获取当前 EV 的 JV 对应的 CoLocationGroup
//* private final Map<JobVertexID, CoLocationGroup> coLocationGroupMap;
final CoLocationGroup coLocationGroup =
coLocationGroupMap.get(executionVertexId.getJobVertexId());
// 如果 当前 EV 的 JV 对应的 CoLocationGroup 存在
if (coLocationGroup != null) {
// 则获取此 executionVertex 对应的 CoLocationConstraint
final CoLocationConstraint constraint =
coLocationGroup.getLocationConstraint(
executionVertexId.getSubtaskIndex());
// Map<CoLocationConstraint, ExecutionSlotSharingGroup>
// 更新此 CoLocationConstraint 对应的 ExecutionSlotSharingGroup
constraintToExecutionSlotSharingGroupMap.put(
constraint, executionSlotSharingGroupMap.get(executionVertexId));
}
}
}