- 引言
- 问题分析
- Tekton 的实现
数据结构
构造 Graph
获取调度节点 - 总结
引言
Tekton 在执行用户定义的任务后,根据任务中的先后顺序,来执行不同的任务,但是多个任务之间可能会存在较多个关联和顺序关系,这个时候对所有的 task 进行排序并且调度下一个 task 就会变得比较复杂。我们一起看下 Tekton 的开发者们是如何解决这个问题的。
问题分析
在看 Tekton 如何解决这件事情之前,我们先自己来大概分析和理解下这个调度问题。
一个比较实际的场景是,用户在编排流水线的时候,会通过在 pipeline 中定义的 runAfter 完成任务顺序的指定,而且如果 A task 中的 resource.input 依赖于 B task 的 resourc.output,这样的话我们也需要保证 B task 应该在 A task 之后再执行。
如此想来,这个调度问题就可以简化为我们有多个节点,节点间存在依赖关系和顺序关系,需要保证所有的节点都能按照依赖关系和顺序关系来进行排序,并且可以根据当前所有的节点状态返回下次需要执行的节点。
Tekton 的实现
Tekton 使用了 DAG(Directed acyclic graph)也就是有向无环图来解决了这个问题。
通过遍历所有的任务,分析任务中的 dep,来将所有的任务插入到图中,并且在需要进行任务调度的时候,分析遍历该图,取出合适的 task 进行调度。
下面来了解下这个实现方式。
数据结构
Tekton 在实现中定义了 Node 还有 Graph 两个 struct,分别对应了有向无环图中的节点还有有向无环图。
Node 的定义有些像二叉树,结构比较简单,存储了一个当前的 task,以及在 Prev 中存储了在执行该节点之前,所有需要完成的节点。Next 中则相反存储的是所有在该节点之后执行的节点。
Graph 是一个针对所有节点的一个 Map。
// Node represents a Task in a pipeline.
type Node struct {
// Task represent the PipelineTask in Pipeline
Task Task
// Prev represent all the Previous task Nodes for the current Task
Prev []*Node
// Next represent all the Next task Nodes for the current Task
Next []*Node
}
// Graph represents the Pipeline Graph
type Graph struct {
// Nodes represent map of PipelineTask name to Node in Pipeline Graph
Nodes map[string]*Node
}
构造 Graph
这个就主要对应了该文件中的一个 Build 方法,该方法接收 pipeline 中所有的task,以及存储了所有 task 依赖信息的一个 map[string][]string 结构体,该结构体中存储了所有的 task 以及对应的依赖。
该方法通过这两个参数来完成整个图的构造。
// Build returns a valid pipeline Graph. Returns error if the pipeline is invalid
func Build(tasks Tasks, deps map[string][]string) (*Graph, error) {
d := newGraph()
// Add all Tasks mentioned in the `PipelineSpec`
// 增加所有的 node 进入 graph 中
for _, pt := range tasks.Items() {
if _, err := d.addPipelineTask(pt); err != nil {
return nil, fmt.Errorf("task %s is already present in Graph, can't add it again: %w", pt.HashKey(), err)
}
}
// Process all from and runAfter constraints to add task dependency
// 将所有的 dep 塞到对应的节点中
for pt, taskDeps := range deps {
for _, previousTask := range taskDeps {
// 将每一个 pre 节点 都处理下 放到每个 node 的里面
if err := addLink(pt, previousTask, d.Nodes); err != nil {
return nil, fmt.Errorf("couldn't add link between %s and %s: %w", pt, previousTask, err)
}
}
}
return d, nil
}
在构造这个图的时候,我们先考虑下需要实现的目标有哪些。
我们应该将 Graph 中所有节点中的 Pre 和 Next 节点都补充上,并且需要保证我们在补充节点的时候,不会导致出现环状图,因此在插入节点的时候还需要做环的检测。
其中主要处理的逻辑如下:
// prev:deps 中的key next:deps 中的某个key节点对应的所有节点中的某个
func linkPipelineTasks(prev *Node, next *Node) error {
// Check for self cycle
if prev.Task.HashKey() == next.Task.HashKey() {
return fmt.Errorf("cycle detected; task %q depends on itself", next.Task.HashKey())
}
// Check if we are adding cycles.
path := []string{next.Task.HashKey(), prev.Task.HashKey()}
if err := lookForNode(prev.Prev, path, next.Task.HashKey()); err != nil {
return fmt.Errorf("cycle detected: %w", err)
}
// 为 dep 中的 key 节点增加 prev 节点,增加的就是 dep 中 deps 的某个节点
next.Prev = append(next.Prev, prev)
// 为 dep 中的 deps 节点增加 next 节点,增加的也就是 dep 中的 key 节点
prev.Next = append(prev.Next, next)
return nil
}
// nodes: 理解为校验节点中的所有 Pre 节点 path:当前路径 用于错误打印 next: 当前校验节点
func lookForNode(nodes []*Node, path []string, next string) error {
// 对该 node 的 prevs 进行处理
for _, n := range nodes {
path = append(path, n.Task.HashKey())
if n.Task.HashKey() == next {
return errors.New(getVisitedPath(path))
}
// 因为涉及到环的检测 所以需要对所有的 pre 中的 pre 也要做处理
// 确保我们的 prev 在其他的节点中是不存在的 否则就是出现了环
if err := lookForNode(n.Prev, path, next); err != nil {
return err
}
}
return nil
}
linkPipelineTasks 主要调用 lookForNode 来检测是否存在的环,如果没有存在环,则对 dep 中的 key 节点增加对应的 deps 中的某个节点作为 pre 节点,并且为 deps 中的对应节点增加 dep 中的 key 节点作为 next 节点。
lookForNode 对环的检测也比较有意思,通过递归的方式来完成,主要是检查当前处理节点中的所有 Pre 节点中,是否存在和当前节点相同的节点,通过递归,不仅会检查当前节点的 Pre 节点是否存在,还会对 Pre 节点中的 Pre 节点进行校验,保证从 Root 节点到该节点不会存在相同的节点,也就相当于做了环的检测。
在对所有的节点循环处理结束后,Graph 中所有的节点的 Pre 节点以及 Next 节点都会包含我们最开始期望的所有应该存在的节点。
获取调度节点
Graph 构造完成,在任务调度的时候,我们需要从 Graph 中依次获取到下个需要执行的任务。
大概的思考一下,如果我们来实现这个功能,应该会怎么做呢?
大体思路应该是遍历所有的节点,将节点中 Pre 节点的个数为 0 的节点或者 Pre 节点中的 task 都结束的这种节点拿出来,这个节点就是应该执行的节点。Tekton 的实现思路也大概类似,我们来看下 Tekton 的具体实现。
在Tekton的实现中, GetSchedulable 负责获取下一个节点的工作,为了简单起见,我省略了一些检测的代码。
// GetSchedulable returns a map of PipelineTask that can be scheduled (keyed
// by the name of the PipelineTask) given a list of successfully finished doneTasks.
// It returns tasks which have all dependencies marked as done, and thus can be scheduled. If the
// specified doneTasks are invalid (i.e. if it is indicated that a Task is
// done, but the previous Tasks are not done), an error is returned.
func GetSchedulable(g *Graph, doneTasks ...string) (sets.String, error) {
roots := getRoots(g)
tm := sets.NewString(doneTasks...)
d := sets.NewString()
visited := sets.NewString()
// 对所有的根 进行循环处理 因为可能会存在较多的可以调度的节点
for _, root := range roots {
// 找出可以调度的节点出来
schedulable := findSchedulable(root, visited, tm)
for _, task := range schedulable {
d.Insert(task.HashKey())
}
}
...
return d, nil
}
我们可以看出,该函数会获取到 DAG 图和对应的已完成任务的列表,通过这两个信息来获取应该调度的任务。
在这里先获取到了所有的 root,这里是个数组(因为有可能某个 pipeline 可能会存在多个 root 的节点,比如第一个步骤就是一个并行的步骤),之后会对该 root 调用 findSchedulable 方法,这个方法主要是完成了节点的查找。
这个方法也用到了递归的方式,处理逻辑也比较简单明确,最终返回结果就是参数中 n 节点下,所有的可以调度的节点。
// n: 处理节点 visited: 已经访问过的节点 doneTasks: 标记为已完成的节点
func findSchedulable(n *Node, visited sets.String, doneTasks sets.String) []Task {
if visited.Has(n.Task.HashKey()) {
return []Task{}
}
// 为了防止重复的拜访
visited.Insert(n.Task.HashKey())
// 如果这个节点已经标记为结束了 则去查找 next 节点 从 next 节点中查找到对应的应该调度的节点
// 这里是为了对下面的所有的 next node 都进行处理 所以使用递归是最合适的方式
if doneTasks.Has(n.Task.HashKey()) {
schedulable := []Task{}
// This one is done! Take note of it and look at the next candidate
for _, next := range n.Next {
if _, ok := visited[next.Task.HashKey()]; !ok {
schedulable = append(schedulable, findSchedulable(next, visited, doneTasks)...)
}
}
return schedulable
}
// 如果这个节点没有被标记为结束
// 这里的处理 才是递归结束的地方
// This one isn't done! Return it if it's schedulable
if isSchedulable(doneTasks, n.Prev) {
// FIXME(vdemeester)
return []Task{n.Task}
}
// This one isn't done, but it also isn't ready to schedule
return []Task{}
}
因为节点被访问后,无需再被处理,因此传入了 visited 来标记出哪些节点已经访问过,已经访问过的节点不再访问,防止在递归访问中出现多次访问,导致最后返回可调度节点中出现重复的节点。
之后就是我们需要找到的 递归继续的条件 和 递归的终止条件。
递归的继续条件是,如果当前节点已经被标记为完成,则需要对该节点的 next 节点进行递归查询,直到最终将该 root 下所有的可调度节点都返回回来。
递归的终止条件就是如果当前节点,他的所有 Pre 节点都已经完成,则该节点应该就是可以被调度的节点,这种情况下我们直接返回该节点。否则返回为空节点。
通过该方式,可以保证每次都能拿到需要调度的节点,实现的简洁又漂亮。
总结
数据结构和算法的威力在这里得到了体现,这里用到了 DAG 这种数据数据结构,并且配合递归完美了解决了流水线调度的问题,代码简短容易理解,基本上所有的处理逻辑跟 DAG 中的遍历逻辑大体一致。
在源代码库中,也配置了比较全面的测试文件,对于阅读源代码,调试代码也有很大的帮助。推荐大家可以阅读下代码,自己运行测试来体验下。