client-go
-
client-go
是Kubernetes的Go语言的官方编程式交互客户端库,提供对Kubernetes API Server服务的交互访问。
- client-go支持3种Client客户端:
-
RESTClient
是最基础的客户端。它对HTTP Request进行了封装。ClientSet、DynamicClient及DiscoveryClient客户端都是基于RESTClient实现的。
-
ClientSet
基于RESTClient,是kubernetes所有内置资源客户端的集合。每一个 Resource和Version都对应一个函数。ClientSet开发者对Kubernetes进行二次开发时最常使用的。
-
DynamicClient
是一种动态的 client,它能访问kubernetes 所有的资源,即内置资源和CRD自定义资源。
clientset
// staging\src\k8s.io\client-go\examples\create-update-delete-deployment\main.go
func main() {
var kubeconfig *string
if home := homedir.HomeDir(); home != "" {
// kubeconfig默认存放在$HOME/.kube/config路径下
kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
} else {
kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
}
flag.Parse()
// 读取kubeconfig配置信息并实例化rest.Config对象。配置信息包括集群、用户、命名空间和身份验证等,
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
if err != nil {
panic(err)
}
// 通过kubeconfig配置信息实例化clientset对象
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err)
}
// 请求v1版本下的deployment资源。
deploymentsClient := clientset.AppsV1().Deployments(apiv1.NamespaceDefault)
deployment := &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: "demo-deployment",
},
Spec: appsv1.DeploymentSpec{
Replicas: int32Ptr(2),
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"app": "demo",
},
},
Template: apiv1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"app": "demo",
},
},
Spec: apiv1.PodSpec{
Containers: []apiv1.Container{
{
Name: "web",
Image: "nginx:1.12",
Ports: []apiv1.ContainerPort{
{
Name: "http",
Protocol: apiv1.ProtocolTCP,
ContainerPort: 80,
},
},
},
},
},
},
},
}
// Create Deployment
fmt.Println("Creating deployment...")
result, err := deploymentsClient.Create(context.TODO(), deployment, metav1.CreateOptions{})
if err != nil {
panic(err)
}
fmt.Printf("Created deployment %q.\n", result.GetObjectMeta().GetName())
// Update Deployment
prompt()
fmt.Println("Updating deployment...")
// You have two options to Update() this Deployment:
//
// 1. Modify the "deployment" variable and call: Update(deployment).
// This works like the "kubectl replace" command and it overwrites/loses changes
// made by other clients between you Create() and Update() the object.
// 2. Modify the "result" returned by Get() and retry Update(result) until
// you no longer get a conflict error. This way, you can preserve changes made
// by other clients between Create() and Update(). This is implemented below
// using the retry utility package included with client-go. (RECOMMENDED)
//
// More Info:
// https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#concurrency-control-and-consistency
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
// Retrieve the latest version of Deployment before attempting update
// RetryOnConflict uses exponential backoff to avoid exhausting the apiserver
result, getErr := deploymentsClient.Get(context.TODO(), "demo-deployment", metav1.GetOptions{})
if getErr != nil {
panic(fmt.Errorf("Failed to get latest version of Deployment: %v", getErr))
}
result.Spec.Replicas = int32Ptr(1) // reduce replica count
result.Spec.Template.Spec.Containers[0].Image = "nginx:1.13" // change nginx version
_, updateErr := deploymentsClient.Update(context.TODO(), result, metav1.UpdateOptions{})
return updateErr
})
if retryErr != nil {
panic(fmt.Errorf("Update failed: %v", retryErr))
}
fmt.Println("Updated deployment...")
// List Deployments
prompt()
fmt.Printf("Listing deployments in namespace %q:\n", apiv1.NamespaceDefault)
list, err := deploymentsClient.List(context.TODO(), metav1.ListOptions{})
if err != nil {
panic(err)
}
for _, d := range list.Items {
fmt.Printf(" * %s (%d replicas)\n", d.Name, *d.Spec.Replicas)
}
// Delete Deployment
prompt()
fmt.Println("Deleting deployment...")
deletePolicy := metav1.DeletePropagationForeground
if err := deploymentsClient.Delete(context.TODO(), "demo-deployment", metav1.DeleteOptions{
PropagationPolicy: &deletePolicy,
}); err != nil {
panic(err)
}
fmt.Println("Deleted deployment.")
}
- Deployments函数是一个资源接口对象,用于Deployment资源对象的管理,对包含Create、Update、Delete、Get、List、Watch、Patch等方法
// staging\src\k8s.io\client-go\kubernetes\typed\apps\v1\deployment.go
// DeploymentInterface has methods to work with Deployment resources.
type DeploymentInterface interface {
Create(ctx context.Context, deployment *v1.Deployment, opts metav1.CreateOptions) (*v1.Deployment, error)
Update(ctx context.Context, deployment *v1.Deployment, opts metav1.UpdateOptions) (*v1.Deployment, error)
UpdateStatus(ctx context.Context, deployment *v1.Deployment, opts metav1.UpdateOptions) (*v1.Deployment, error)
Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error
DeleteCollection(ctx context.Context, opts metav1.DeleteOptions, listOpts metav1.ListOptions) error
Get(ctx context.Context, name string, opts metav1.GetOptions) (*v1.Deployment, error)
List(ctx context.Context, opts metav1.ListOptions) (*v1.DeploymentList, error)
Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error)
Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions, subresources ...string) (result *v1.Deployment, err error)
Apply(ctx context.Context, deployment *appsv1.DeploymentApplyConfiguration, opts metav1.ApplyOptions) (result *v1.Deployment, err error)
ApplyStatus(ctx context.Context, deployment *appsv1.DeploymentApplyConfiguration, opts metav1.ApplyOptions) (result *v1.Deployment, err error)
GetScale(ctx context.Context, deploymentName string, options metav1.GetOptions) (*autoscalingv1.Scale, error)
UpdateScale(ctx context.Context, deploymentName string, scale *autoscalingv1.Scale, opts metav1.UpdateOptions) (*autoscalingv1.Scale, error)
DeploymentExpansion
}
// deployments implements DeploymentInterface
type deployments struct {
client rest.Interface
ns string
}
// Create takes the representation of a deployment and creates it. Returns the server's representation of the deployment, and an error, if there is any.
func (c *deployments) Create(ctx context.Context, deployment *v1.Deployment, opts metav1.CreateOptions) (result *v1.Deployment, err error) {
result = &v1.Deployment{}
err = c.client.Post().
Namespace(c.ns).
Resource("deployments").
VersionedParams(&opts, scheme.ParameterCodec).
Body(deployment).
Do(ctx).
Into(result)
return
}
dynamicClient
- ClientSet需要预先实现每种Resource和Version的操作,其内部的数据都是结构化数据。而DynamicClient的内部数据是通过map[string]interface{}转换的非结构化数据,因此DynamicClient能够处理CRD自定义资源。
- dynamicClient示例:
// staging\src\k8s.io\client-go\examples\dynamic-create-update-delete-deployment\main.go
func main() {
var kubeconfig *string
if home := homedir.HomeDir(); home != "" {
kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
} else {
kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
}
flag.Parse()
namespace := "default"
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
if err != nil {
panic(err)
}
// 通过kubeconfig配置信息实例化dynamicClient对象
client, err := dynamic.NewForConfig(config)
if err != nil {
panic(err)
}
deploymentRes := schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}
deployment := &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "apps/v1",
"kind": "Deployment",
"metadata": map[string]interface{}{
"name": "demo-deployment",
},
"spec": map[string]interface{}{
"replicas": 2,
"selector": map[string]interface{}{
"matchLabels": map[string]interface{}{
"app": "demo",
},
},
"template": map[string]interface{}{
"metadata": map[string]interface{}{
"labels": map[string]interface{}{
"app": "demo",
},
},
"spec": map[string]interface{}{
"containers": []map[string]interface{}{
{
"name": "web",
"image": "nginx:1.12",
"ports": []map[string]interface{}{
{
"name": "http",
"protocol": "TCP",
"containerPort": 80,
},
},
},
},
},
},
},
},
}
// Create Deployment
fmt.Println("Creating deployment...")
result, err := client.Resource(deploymentRes).Namespace(namespace).Create(context.TODO(), deployment, metav1.CreateOptions{})
if err != nil {
panic(err)
}
fmt.Printf("Created deployment %q.\n", result.GetName())
// Update Deployment
prompt()
fmt.Println("Updating deployment...")
// You have two options to Update() this Deployment:
//
// 1. Modify the "deployment" variable and call: Update(deployment).
// This works like the "kubectl replace" command and it overwrites/loses changes
// made by other clients between you Create() and Update() the object.
// 2. Modify the "result" returned by Get() and retry Update(result) until
// you no longer get a conflict error. This way, you can preserve changes made
// by other clients between Create() and Update(). This is implemented below
// using the retry utility package included with client-go. (RECOMMENDED)
//
// More Info:
// https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#concurrency-control-and-consistency
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
// Retrieve the latest version of Deployment before attempting update
// RetryOnConflict uses exponential backoff to avoid exhausting the apiserver
result, getErr := client.Resource(deploymentRes).Namespace(namespace).Get(context.TODO(), "demo-deployment", metav1.GetOptions{})
if getErr != nil {
panic(fmt.Errorf("failed to get latest version of Deployment: %v", getErr))
}
// update replicas to 1
if err := unstructured.SetNestedField(result.Object, int64(1), "spec", "replicas"); err != nil {
panic(fmt.Errorf("failed to set replica value: %v", err))
}
// extract spec containers
containers, found, err := unstructured.NestedSlice(result.Object, "spec", "template", "spec", "containers")
if err != nil || !found || containers == nil {
panic(fmt.Errorf("deployment containers not found or error in spec: %v", err))
}
// update container[0] image
if err := unstructured.SetNestedField(containers[0].(map[string]interface{}), "nginx:1.13", "image"); err != nil {
panic(err)
}
if err := unstructured.SetNestedField(result.Object, containers, "spec", "template", "spec", "containers"); err != nil {
panic(err)
}
_, updateErr := client.Resource(deploymentRes).Namespace(namespace).Update(context.TODO(), result, metav1.UpdateOptions{})
return updateErr
})
if retryErr != nil {
panic(fmt.Errorf("update failed: %v", retryErr))
}
fmt.Println("Updated deployment...")
// List Deployments
prompt()
fmt.Printf("Listing deployments in namespace %q:\n", apiv1.NamespaceDefault)
list, err := client.Resource(deploymentRes).Namespace(namespace).List(context.TODO(), metav1.ListOptions{})
if err != nil {
panic(err)
}
for _, d := range list.Items {
replicas, found, err := unstructured.NestedInt64(d.Object, "spec", "replicas")
if err != nil || !found {
fmt.Printf("Replicas not found for deployment %s: error=%s", d.GetName(), err)
continue
}
fmt.Printf(" * %s (%d replicas)\n", d.GetName(), replicas)
}
// Delete Deployment
prompt()
fmt.Println("Deleting deployment...")
deletePolicy := metav1.DeletePropagationForeground
deleteOptions := metav1.DeleteOptions{
PropagationPolicy: &deletePolicy,
}
if err := client.Resource(deploymentRes).Namespace(namespace).Delete(context.TODO(), "demo-deployment", deleteOptions); err != nil {
panic(err)
}
fmt.Println("Deleted deployment.")
}
func prompt() {
fmt.Printf("-> Press Return key to continue.")
scanner := bufio.NewScanner(os.Stdin)
for scanner.Scan() {
break
}
if err := scanner.Err(); err != nil {
panic(err)
}
fmt.Println()
}
// staging\src\k8s.io\apimachinery\pkg\apis\meta\v1\unstructured\unstructured.go
// Unstructured allows objects that do not have Golang structs registered to be manipulated
// generically. This can be used to deal with the API objects from a plug-in. Unstructured
// objects still have functioning TypeMeta features-- kind, version, etc.
//
// WARNING: This object has accessors for the v1 standard metadata. You *MUST NOT* use this
// type if you are dealing with objects that are not in the server meta v1 schema.
//
// TODO: make the serialization part of this type distinct from the field accessors.
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// +k8s:deepcopy-gen=true
type Unstructured struct {
// Object is a JSON compatible map with string, float, int, bool, []interface{}, or
// map[string]interface{}
// children.
Object map[string]interface{}
}
- dynamicResourceClient管理对Resource的Create、Update、Delete、Get、List、Watch、Patch等操作。
// staging\src\k8s.io\client-go\dynamic\simple.go
type dynamicClient struct {
client *rest.RESTClient
}
var _ Interface = &dynamicClient{}
type dynamicResourceClient struct {
client *dynamicClient
namespace string
resource schema.GroupVersionResource
}
func (c *dynamicClient) Resource(resource schema.GroupVersionResource) NamespaceableResourceInterface {
return &dynamicResourceClient{client: c, resource: resource}
}
func (c *dynamicResourceClient) Namespace(ns string) ResourceInterface {
ret := *c
ret.namespace = ns
return &ret
}
func (c *dynamicResourceClient) Create(ctx context.Context, obj *unstructured.Unstructured, opts metav1.CreateOptions, subresources ...string) (*unstructured.Unstructured, error) {
outBytes, err := runtime.Encode(unstructured.UnstructuredJSONScheme, obj)
if err != nil {
return nil, err
}
name := ""
if len(subresources) > 0 {
accessor, err := meta.Accessor(obj)
if err != nil {
return nil, err
}
name = accessor.GetName()
if len(name) == 0 {
return nil, fmt.Errorf("name is required")
}
}
result := c.client.client.
Post().
AbsPath(append(c.makeURLSegments(name), subresources...)...).
Body(outBytes).
SpecificallyVersionedParams(&opts, dynamicParameterCodec, versionV1).
Do(ctx)
if err := result.Error(); err != nil {
return nil, err
}
retBytes, err := result.Raw()
if err != nil {
return nil, err
}
uncastObj, err := runtime.Decode(unstructured.UnstructuredJSONScheme, retBytes)
if err != nil {
return nil, err
}
return uncastObj.(*unstructured.Unstructured), nil
}
func (c *dynamicResourceClient) makeURLSegments(name string) []string {
url := []string{}
if len(c.resource.Group) == 0 {
url = append(url, "api")
} else {
url = append(url, "apis", c.resource.Group)
}
url = append(url, c.resource.Version)
if len(c.namespace) > 0 {
url = append(url, "namespaces", c.namespace)
}
url = append(url, c.resource.Resource)
if len(name) > 0 {
url = append(url, name)
}
return url
}