https://lailin.xyz/post/operator-03-kubebuilder-tutorial.html
Kubebuilder 是一个用于构建 Kubernetes 自定义控制器(Controller)和自定义资源定义(CRD)的一套工具和框架。它简化了开发 Kubernetes 扩展的过程,使开发者能够更容易地编写和管理 Kubernetes 控制器。
以下是一些 Kubebuilder 的主要特点:
- 代码生成:Kubebuilder 提供了代码生成工具,可以自动生成项目结构、样板代码和 CRD 定义,减少了手动编写的工作量。
- 基于 Controller-Runtime:Kubebuilder 构建在 Kubernetes 的 controller-runtime 库之上,这个库提供了一些常用的控制器模式和工具,帮助开发者更高效地编写控制器逻辑。
- 项目结构:Kubebuilder 提供了一种标准化的项目结构,使得项目更易于维护和理解。
- 测试支持:Kubebuilder 提供了一些工具和框架,帮助开发者编写和运行测试,确保控制器和 CRD 的正确性。
- 文档和示例:Kubebuilder 提供了丰富的文档和示例,帮助开发者快速上手和理解如何构建 Kubernetes 扩展。
Reconcile
方法的签名
在 Kubebuilder 中,Reconcile
方法通常是一个实现了reconcile.Reconciler
接口的结构体的方法。其签名如下:
<pre class="sl-blockcode" data-lang="markup">func(r *YourReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error)
</pre>
参数解释
-
ctx context.Context
:上下文对象,用于控制请求的生命周期,可以传递取消信号和超时信息。 -
req ctrl.Request
:包含了需要调和的对象的名称和命名空间。
返回值解释
-
ctrl.Result
:控制器的返回结果,决定了是否以及何时再次调度Reconcile
方法。常见的返回结果包括立即重新调度、延迟重新调度或不再调度。 -
error
:如果在调和过程中发生错误,返回该错误,控制器管理器会根据错误类型决定如何处理(例如重试)。
Reconcile
方法的工作流程
-
获取资源:首先,从 Kubernetes API 中获取当前需要调和的资源对象。通常使用客户端来获取对象,如
r.Client.Get(ctx, req.NamespacedName, &yourResource)
。 -
检查删除标记:如果资源对象被标记为删除(即存在
deletionTimestamp
),通常需要执行清理逻辑。 - 同步状态:将当前状态与期望状态进行比较,并执行必要的操作来使其一致。这可能包括创建、更新或删除其他 Kubernetes 资源。
- 更新状态:在调和过程中,可能需要更新资源对象的状态字段,以反映当前的状态。
-
返回结果:返回
ctrl.Result
和error
。如果需要重新调和,可以设置Requeue
或RequeueAfter
。
示例代码
以下是一个简单的Reconcile
方法示例:
<pre class="sl-blockcode" data-lang="markup">func(r *YourReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// 获取资源对象
var yourResource v1alpha1.YourResource
if err := r.Client.Get(ctx, req.NamespacedName, &yourResource); err != nil {
if apierrors.IsNotFound(err) {
// 资源已经被删除
return ctrl.Result{}, nil
}
// 获取资源时出错
return ctrl.Result{}, err
}
// 检查删除标记
if !yourResource.ObjectMeta.DeletionTimestamp.IsZero() {
// 执行清理逻辑
return ctrl.Result{}, nil
}
// 同步状态
// 比较当前状态与期望状态,并执行必要的操作
// ...
// 更新状态
if err := r.Client.Status().Update(ctx, &yourResource); err != nil {
return ctrl.Result{}, err
}
// 返回结果
return ctrl.Result{}, nil
}
</pre>
总结
Reconcile
方法是 Kubernetes 控制器的核心,它负责将集群的实际状态与期望状态进行协调。通过实现Reconcile
方法,你可以定义控制器的业务逻辑,确保 Kubernetes 资源按照预期运行。
Reconcile
方法中启动了一个新的manager
,这是为了动态地管理和监控多个 Kubernetes 集群中的资源。每个新的manager
实例都会连接到一个不同的 Kubernetes 集群,并启动相应的控制器来监控和管理该集群中的资源。这种做法通常用于多集群管理场景。
具体的实现步骤
- 获取和检查资源:
- 从 Kubernetes API 中获取
Cluster
资源对象。 - 检查资源是否存在删除标记,如果存在则执行清理逻辑。
- 检查集群数据缓存:
- 检查
clusterDataMap
中是否已经存在该集群的数据缓存。 - 如果缓存存在且资源的
Spec
和Labels
没有变化,则不需要重建集群管理器。
- 构建集群配置和客户端:
- 使用集群的
KubeConfig
构建rest.Config
。 - 使用
rest.Config
构建kubeClient
。
-
启动新的
**manager**
:
- 创建并启动一个新的
manager
实例,该实例连接到目标集群。 - 在新的
manager
上设置控制器,并启动控制器以监控目标集群中的资源。
- 缓存管理:
- 将新的集群数据缓存到
clusterDataMap
中,以便后续使用。
示例代码分析
以下是代码中的关键部分和解释:
获取和检查资源
<pre class="sl-blockcode" data-lang="markup">cluster := &openapiv1.Cluster{}if err := r.client.Get(ctx, req.NamespacedName, cluster); err != nil {
if apierr.IsNotFound(err) {
logger.Error(err, "Could not find cluster", "ClusterName", cluster.Name)
return reconcile.Result{}, nil
}
}
</pre>
处理删除情况
<pre class="sl-blockcode" data-lang="markup">if !cluster.ObjectMeta.DeletionTimestamp.IsZero() {
iflen(cluster.Finalizers) == 0 {
r.delFromClusterSet(cluster)
}
return reconcile.Result{}, nil
}
</pre>
检查集群数据缓存
<pre class="sl-blockcode" data-lang="markup">loadedData, ok := r.clusterDataMap.Load(obj.Name)if ok {
clusterData := loadedData.(*ClusterData)
if equality.Semantic.DeepEqual(clusterData.cachedObj.Spec, obj.Spec) &&
equality.Semantic.DeepEqual(clusterData.cachedObj.ObjectMeta.Labels, obj.ObjectMeta.Labels) {
logger.Info("cluster not need rebuild", "clusterId", obj.Name)
return ctrl.Result{}, nil
}
}
</pre>
构建集群配置和客户端
<pre class="sl-blockcode" data-lang="markup">clusterConfig, err := buildRestConfig(obj.Spec.Connection.KubeConfig)if err != nil {
logger.Error(err, "ClusterController build kubeclient fail", "clusterId", obj.Name)
return ctrl.Result{}, err
}
kubeClient, err := client.New(clusterConfig, client.Options{Scheme: memberScheme})if err != nil || kubeClient == nil {
logger.Error(err, "ClusterController build kubeclient fail", "clusterId", obj.Name)
return ctrl.Result{}, errors.Errorf("build cluster client fail, %s", obj.Name)
}
</pre>
启动新的manager
<pre class="sl-blockcode" data-lang="markup">mgr, err := ctrl.NewManager(clusterConfig, ctrl.Options{
Logger: logger,
Scheme: memberScheme,
HealthProbeBindAddress: "0",
LeaderElection: false,
})if err != nil {
return ctrl.Result{}, err
}
gofunc() {
err = mgr.Start(watchCtx)
reterr = err
if err != nil {
logger.Error(err, "remote cluster manager stopped")
}
logger.Info("remote cluster manager stopped")
}()
</pre>
设置和启动控制器
<pre class="sl-blockcode" data-lang="markup">mcvReconcile := &ManagedClusterVGPUInstanceReconcile{
Client: kubeClient,
ClusterId: obj.Name,
}
c, err := controller.NewUnmanaged(obj.Name, mgr, controller.Options{MaxConcurrentReconciles: 10, Reconciler: mcvReconcile})if err := c.Watch(
source.Kind(mgr.GetCache(), &v1beta1.VGPUInstance{}),
&handler.EnqueueRequestForObject{},
&predicate.ResourceVersionChangedPredicate{},
); err != nil {
reterr = err
logger.Error(err, "failed to watch nodes")
}
gofunc(watchCtx context.Context, cancelFunc context.CancelFunc) {
r.clusterDataMap.Store(obj.Name, &ClusterData{
KubeClient: kubeClient,
cachedObj: obj.DeepCopy(),
clusterConfig: clusterConfig,
})
for {
select {
case <-watchCtx.Done():
r.delFromClusterSet(obj)
logger.Info("cache deleted")
return
default:
logger.Info("start node watch controller")
err := c.Start(watchCtx)
reterr = err
if err != nil {
logger.Error(err, "failed to start controller")
}
logger.Info("node watcher stopped")
time.Sleep(10 * time.Second)
}
}
}(watchCtx, cancelFunc)
</pre>
总结
在Reconcile
方法中启动新的manager
是为了动态地管理和监控多个 Kubernetes 集群中的资源。通过这种方式,可以在运行时根据需要启动和停止对不同集群的监控,从而实现灵活的多集群管理。这种做法虽然复杂,但在多集群环境中非常有用。需要注意的是,这种方法会增加资源开销和复杂度,因此需要谨慎使用并确保正确处理并发和错误。
了解manager
和controller
之间的关系对于理解 Kubernetes 控制器模式非常重要。
Manager
manager
是一个控制器管理器,它负责管理和运行多个控制器。它提供了一个统一的框架来启动和管理控制器的生命周期。manager
的主要职责包括:
- 管理控制器的生命周期:启动、停止和重启控制器。
- 共享资源:提供共享的客户端、缓存和其他资源,以便多个控制器可以高效地协同工作。
- 健康检查和探针:提供健康检查接口,以便外部系统可以监控控制器的健康状态。
- Leader Election:在分布式环境中,通过选举一个 Leader 来避免多个实例同时执行相同的操作。
在代码中,manager
是通过controller-runtime
包中的ctrl.Manager
实现的。它提供了一个统一的接口来管理控制器的生命周期和共享资源。
Controller
controller
是实际执行逻辑的组件。每个控制器负责监控一个或多个 Kubernetes 资源,并根据这些资源的状态执行相应的操作。控制器的主要职责包括:
- 监听资源变化:通过 Watch 机制监听 Kubernetes 资源的变化事件。
- 执行业务逻辑:根据资源的变化执行相应的业务逻辑,例如创建、更新或删除资源。
- 更新资源状态:将处理结果更新回 Kubernetes API 服务器。
在代码中,controller
是通过controller-runtime
包中的controller.Controller
实现的。每个控制器都有一个 Reconcile 方法,这个方法包含了控制器的具体业务逻辑。
Manager 和 Controller 的关系
-
管理和调度:
manager
负责管理和调度多个controller
的运行。它会启动每个controller
,并确保它们在整个程序生命周期内正常运行。 -
资源共享:
manager
提供了共享的客户端、缓存和其他资源,这些资源可以被多个controller
共享使用,从而提高资源利用效率。 -
统一配置:
manager
提供了一些统一的配置选项,例如 Leader Election、健康检查等,这些配置可以应用于所有由该manager
管理的controller
。
代码示例中的关系
在你的代码示例中,manager
和controller
的关系如下:
- 创建 Manager:
<pre class="sl-blockcode" data-lang="markup">mgr, err := ctrl.NewManager(clusterConfig, ctrl.Options{
Logger: logger,
Scheme: memberScheme,
HealthProbeBindAddress: "0",
LeaderElection: false,
})if err != nil {
return ctrl.Result{}, err
}
</pre>
- 这里创建了一个新的
manager
,它将管理和运行控制器。 - 启动 Manager:
<pre class="sl-blockcode" data-lang="markup">gofunc() {
err = mgr.Start(watchCtx)
reterr = err
if err != nil {
logger.Error(err, "remote cluster manager stopped")
}
logger.Info("remote cluster manager stopped")
}()
</pre>
- 启动
manager
,它将开始管理和调度控制器的运行。 - 创建 Controller:
<pre class="sl-blockcode" data-lang="markup">mcvReconcile := &ManagedClusterVGPUInstanceReconcile{
Client: kubeClient,
ClusterId: obj.Name,
}
c, err := controller.NewUnmanaged(obj.Name, mgr, controller.Options{MaxConcurrentReconciles: 10, Reconciler: mcvReconcile})if err := c.Watch(
source.Kind(mgr.GetCache(), &v1beta1.VGPUInstance{}),
&handler.EnqueueRequestForObject{},
&predicate.ResourceVersionChangedPredicate{},
); err != nil {
reterr = err
logger.Error(err, "failed to watch nodes")
}
</pre>
- 这里创建了一个新的
controller
,并将其注册到manager
中。这个controller
将负责监听和处理VGPUInstance
资源的变化。 - 启动 Controller:
<pre class="sl-blockcode" data-lang="markup">gofunc(watchCtx context.Context, cancelFunc context.CancelFunc) {
for {
select {
case <-watchCtx.Done():
r.delFromClusterSet(obj)
logger.Info("cache deleted")
return
default:
logger.Info("start node watch controller")
err := c.Start(watchCtx)
reterr = err
if err != nil {
logger.Error(err, "failed to start controller")
}
logger.Info("node watcher stopped")
time.Sleep(10 * time.Second)
}
}
}(watchCtx, cancelFunc)
</pre>
- 这里启动了
controller
,并确保它在整个程序生命周期内持续运行。
总结
manager
和controller
的关系可以总结为:manager
是控制器的管理器,负责管理和调度多个controller
的运行,并提供共享资源和统一配置。controller
是实际执行业务逻辑的组件,负责监听资源变化并执行相应的操作。通过这种分工,可以实现高效和灵活的 Kubernetes 控制器架构。
NewControllerManagedBy
是 Kubebuilder 提供的一种便捷方法,用于设置和管理控制器。在底层,它会配置和启动一个 Informer 来监听指定资源的变化,并在资源发生变化时触发控制器的Reconcile
方法。
深入解析NewControllerManagedBy
NewControllerManagedBy
是一个链式调用方法,它通过一系列配置步骤来设置控制器。以下是一个示例调用链:
<pre class="sl-blockcode" data-lang="markup">func(r *MyResourceReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&myv1.MyResource{}). // 指定要监控的资源类型
Complete(r)
}
</pre>
主要步骤解析
- NewControllerManagedBy:
- 这个方法返回一个
Builder
对象,该对象用于配置控制器的各种参数。
- For:
-
For
方法指定了控制器要监控的资源类型。在这个例子中,控制器将监控MyResource
资源。 - 这个方法会创建并配置一个 Informer,以便监听
MyResource
资源的变化。
- Complete:
-
Complete
方法接收控制器的实现(即Reconciler
接口的实现),并将其与前面配置的 Informer 关联起来。 - 这个方法最终会将控制器添加到管理器(
Manager
)中,并确保在资源发生变化时调用Reconcile
方法。
Informer 的设置
在For
方法中,实际上是通过内部调用来设置 Informer 的。下面是一个简化的内部工作流程:
- 创建 Informer:
-
For
方法会调用内部的controller.NewUnstructuredInformer
或类似的方法来创建一个 Informer。这个 Informer 会监听指定资源的变化。 - Informer 会使用 List-Watch 机制来获取和监听资源的变化。
- 配置事件处理:
- Informer 会配置事件处理程序(Event Handlers),这些处理程序会在资源发生变化时调用控制器的
Reconcile
方法。 - 事件处理程序通常包括
AddFunc
、UpdateFunc
和DeleteFunc
,分别对应资源的创建、更新和删除事件。
- 启动 Informer:
- 当控制器被添加到管理器后,管理器会启动 Informer,以便开始监听资源的变化。
代码示例
以下是一个简化的内部实现示例,展示了NewControllerManagedBy
和For
方法如何设置 Informer:
<pre class="sl-blockcode" data-lang="markup">func(b *Builder) For(object runtime.Object, opts ...ForOption) *Builder {
// 创建一个新的 Informer
informer, err := b.mgr.GetCache().GetInformer(context.TODO(), object)
if err != nil {
b.err = err
return b
}
// 配置事件处理程序
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
// 触发 Reconcile 方法
b.enqueueRequest(obj)
},
UpdateFunc: func(oldObj, newObj interface{}) {
// 触发 Reconcile 方法
b.enqueueRequest(newObj)
},
DeleteFunc: func(obj interface{}) {
// 触发 Reconcile 方法
b.enqueueRequest(obj)
},
})
return b
}
func(b *Builder) Complete(r reconcile.Reconciler) error {
// 创建并启动控制器
c, err := controller.New(b.name, b.mgr, controller.Options{Reconciler: r})
if err != nil {
return err
}
// 将控制器添加到管理器
return b.mgr.Add(c)
}
</pre>
总结
-
NewControllerManagedBy
方法通过链式调用配置控制器,并最终设置 Informer 来监听指定资源的变化。 -
For
方法通过创建和配置 Informer,使控制器能够监听资源的创建、更新和删除事件。 -
Complete
方法将控制器与 Informer 关联,并将其添加到管理器中,确保在资源变化时调用Reconcile
方法。
通过这种方式,Kubebuilder 框架简化了控制器和 Informer 的设置,使开发者能够专注于实现业务逻辑,而无需处理底层的事件监听和处理机制。