Kuberneters源码分析 - Ingress nginx 配置更新

Ingress Nginx主流程一文中,已经介绍了Ingress Nginx的驱动流程,系统的主要驱动逻辑是监控API Server中相关资源的变化,并通过事件回调去驱动系统的运行。本篇主要介绍关键的部分,如何准实时的更新Nginx的配置,与Ingress资源保持同步。

数据驱动回顾

基于SharedIndexInformer,我们可以实时感知到Ingress、CongMap、Secret、Endpoint、Service等资源的变化,并通过事件回调处理,通知到同步任务队列去定期更新Nginx的配置文件。

资源变化,同步事件的生成代码为例子为:

            updateCh.In() <- Event{
                Type: CreateEvent,
                Obj:  obj,
            }

目前支持的事件类型有:

const (
    // CreateEvent event associated with new objects in an informer
    CreateEvent EventType = "CREATE"
    // UpdateEvent event associated with an object update in an informer
    UpdateEvent EventType = "UPDATE"
    // DeleteEvent event associated when an object is removed from an informer
    DeleteEvent EventType = "DELETE"
    // ConfigurationEvent event associated when a controller configuration object is created or updated
    ConfigurationEvent EventType = "CONFIGURATION"
)

Event的Obj成员一般就是发生变化的资源数据。

同步任务队列

同步任务队列负责接收同步事件,并保持Nginx的配置与资源的同步。

n.syncQueue = task.NewTaskQueue(n.syncIngress)
func NewTaskQueue(syncFn func(interface{}) error) *Queue {
    return NewCustomTaskQueue(syncFn, nil)
}

// NewCustomTaskQueue ...
func NewCustomTaskQueue(syncFn func(interface{}) error, fn func(interface{}) (interface{}, error)) *Queue {
    q := &Queue{
        queue:      workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
        sync:       syncFn,
        workerDone: make(chan bool),
        fn:         fn,
    }

    if fn == nil {
        q.fn = q.defaultKeyFunc
    }

    return q
}

同步任务队列负责把为每个入队的元素调用syncFn,而这里的syncFn实际调用NGINXController.syncIngress方法。

这里有一个问题要考虑的就是如果资源变化非常频繁,那会不会造成我们要频繁的去更新Nginx的配置文件,频发的发起Nginx的Reload操作,这个操作对于业务量大的服务来说,还是影响比较大的。

上面这个问题,其实就跟这个任务队列的实现有关了,任务队列实现了流控,并且还会针对变更资源的时间进行一些处理,忽略掉一些变更事件比同步事件旧的变更事件。这里暂时不做分析。

同步处理

同步处理是syncIngress方法,虽然他带了一个参数,但是我们可以看到这个参数基本是忽略的,syncIngress对每类资源的变更都是执行相同的操作。

// syncIngress collects all the pieces required to assemble the NGINX
// configuration file and passes the resulting data structures to the backend
// (OnUpdate) when a reload is deemed necessary.
func (n *NGINXController) syncIngress(interface{}) error {
    n.syncRateLimiter.Accept() // 流控处理,防止更新太频繁

    if n.syncQueue.IsShuttingDown() {
        return nil
    }

    // sort Ingresses using the ResourceVersion field
    ings := n.store.ListIngresses() // 从缓存中获取所有的Ingress服务
    sort.SliceStable(ings, func(i, j int) bool {
        ir := ings[i].ResourceVersion
        jr := ings[j].ResourceVersion
        return ir < jr
    })

    upstreams, servers := n.getBackendServers(ings) // 从ings中提取所有的upstreams和servers
// 这样做的主要原因在于:相同的upstream可能被多个servers共享(条件:namespace、service、port相同即可)
    var passUpstreams []*ingress.SSLPassthroughBackend

    for _, server := range servers {
        if !server.SSLPassthrough {
            continue
        }

        for _, loc := range server.Locations {
            if loc.Path != rootLocation {
                glog.Warningf("Ignoring SSL Passthrough for location %q in server %q", loc.Path, server.Hostname)
                continue
            }
            passUpstreams = append(passUpstreams, &ingress.SSLPassthroughBackend{
                Backend:  loc.Backend,
                Hostname: server.Hostname,
                Service:  loc.Service,
                Port:     loc.Port,
            })
            break
        }
    }

    pcfg := &ingress.Configuration{ // 生成新的ingress配置
        Backends:              upstreams,
        Servers:               servers,
        TCPEndpoints:          n.getStreamServices(n.cfg.TCPConfigMapName, apiv1.ProtocolTCP),
        UDPEndpoints:          n.getStreamServices(n.cfg.UDPConfigMapName, apiv1.ProtocolUDP),
        PassthroughBackends:   passUpstreams,
        BackendConfigChecksum: n.store.GetBackendConfiguration().Checksum,
    }

    if n.runningConfig.Equal(pcfg) { // 判断新旧配置是否相同,相同则不用继续处理
        glog.V(3).Infof("No configuration change detected, skipping backend reload.")
        return nil
    }

        // 是否启动了动态配置支持并且目前的变更能够支持动态配置
    if n.cfg.DynamicConfigurationEnabled && n.IsDynamicConfigurationEnough(pcfg) {
        glog.Infof("Changes handled by the dynamic configuration, skipping backend reload.") // 符合动态条件,就不用变更配置文件
    } else { // 不符合,就需要变更配置文件
        glog.Infof("Configuration changes detected, backend reload required.")

        hash, _ := hashstructure.Hash(pcfg, &hashstructure.HashOptions{
            TagName: "json",
        }) // 生成配置文件的hash值,看起来主要是做metric监控收集使用

        pcfg.ConfigurationChecksum = fmt.Sprintf("%v", hash) 

        err := n.OnUpdate(*pcfg) // 变更Nginx配置文件,后面详细分析
        if err != nil {
            n.metricCollector.IncReloadErrorCount()
            n.metricCollector.ConfigSuccess(hash, false)
            glog.Errorf("Unexpected failure reloading the backend:\n%v", err)
            return err
        }

        glog.Infof("Backend successfully reloaded.")
        n.metricCollector.ConfigSuccess(hash, true)
        n.metricCollector.IncReloadCount()
        n.metricCollector.SetSSLExpireTime(servers)
    }

    if n.cfg.DynamicConfigurationEnabled { // 这里走到动态配置变更功能
        isFirstSync := n.runningConfig.Equal(&ingress.Configuration{})
        go func(isFirstSync bool) {
            if isFirstSync {
                glog.Infof("Initial synchronization of the NGINX configuration.")

                // it takes time for NGINX to start listening on the configured ports
                time.Sleep(1 * time.Second)
            }
            err := configureDynamically(pcfg, n.cfg.ListenPorts.Status) // 动态配置处理,这块也会在后面分析
            if err == nil {
                glog.Infof("Dynamic reconfiguration succeeded.")
            } else {
                glog.Warningf("Dynamic reconfiguration failed: %v", err)
            }
        }(isFirstSync)
    }

    ri := getRemovedIngresses(n.runningConfig, pcfg)
    re := getRemovedHosts(n.runningConfig, pcfg)
    n.metricCollector.RemoveMetrics(ri, re)

    n.runningConfig = pcfg // 修改更新后的配置

    return nil
}

syncIngress方法的逻辑比较清晰,主要的功能都写在注释中了。下面详细讲述OnUpdate方法,该方法实现了Nginx配置文件的更新和reload。

OnUpdate

OnUpdate是在同步操作时发现需要变更Nginx配置文件时,就会被调用。后端配置与ConfigMap的配置会合并之后再创建最终的配置文件。

  • PassthroughBackends
    TODO:
func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error {
    cfg := n.store.GetBackendConfiguration()
    cfg.Resolver = n.resolver

    if n.cfg.EnableSSLPassthrough {
        servers := []*TCPServer{}
        for _, pb := range ingressCfg.PassthroughBackends {
            svc := pb.Service
            if svc == nil {
                glog.Warningf("Missing Service for SSL Passthrough backend %q", pb.Backend)
                continue
            }
            port, err := strconv.Atoi(pb.Port.String())
            if err != nil {
                for _, sp := range svc.Spec.Ports {
                    if sp.Name == pb.Port.String() {
                        port = int(sp.Port)
                        break
                    }
                }
            } else {
                for _, sp := range svc.Spec.Ports {
                    if sp.Port == int32(port) {
                        port = int(sp.Port)
                        break
                    }
                }
            }

            // TODO: Allow PassthroughBackends to specify they support proxy-protocol
            servers = append(servers, &TCPServer{
                Hostname:      pb.Hostname,
                IP:            svc.Spec.ClusterIP,
                Port:          port,
                ProxyProtocol: false,
            })
        }

        n.Proxy.ServerList = servers
    }
    .......
}
  • ServerNameHash
    if cfg.ServerNameHashBucketSize == 0 {
        nameHashBucketSize := nginxHashBucketSize(longestName)
        glog.V(3).Infof("Adjusting ServerNameHashBucketSize variable to %q", nameHashBucketSize)
        cfg.ServerNameHashBucketSize = nameHashBucketSize
    }
    serverNameHashMaxSize := nextPowerOf2(serverNameBytes)
    if cfg.ServerNameHashMaxSize < serverNameHashMaxSize {
        glog.V(3).Infof("Adjusting ServerNameHashMaxSize variable to %q", serverNameHashMaxSize)
        cfg.ServerNameHashMaxSize = serverNameHashMaxSize
    }
  • Headers
    Headers包括setHeaders和addHeaders两种,他们的都是放在configMap中。
    这里再提一下configMap是通过参数--configmap=ingress-nginx/nginx-configuration带入的,举例说明:
#  cat configmap.yaml 
apiVersion: v1
data:
  proxy-set-headers: "ingress-nginx/custom-headers"
  add-headers: "ingress-nginx/custom-headers"
kind: ConfigMap
metadata:
  name: nginx-configuration
  namespace: ingress-nginx
  labels:
    app: ingress-nginx

我们在上面的configmap中,指定了proxy-set-headers和add-headers的值都是ingrss-nginx/custom-headers,这里的ingress-nginx/custom-headers也是一个configmap,创建customer-headers的脚本如下:

curl https://raw.githubusercontent.com/kubernetes/ingress-nginx/master/docs/examples/customization/custom-headers/custom-headers.yaml \
    | kubectl apply -f -

custom-headres.yaml的实际内容为:

apiVersion: v1
data:
  X-Different-Name: "true"
  X-Request-Start: t=${msec}
  X-Using-Nginx-Controller: "true"
kind: ConfigMap
metadata:
  name: custom-headers
  namespace: ingress-nginx

下面是处理setHeaders和addHeaders的代码:

        setHeaders := map[string]string{}
    if cfg.ProxySetHeaders != "" {
        cmap, err := n.store.GetConfigMap(cfg.ProxySetHeaders)
        if err != nil {
            glog.Warningf("Error reading ConfigMap %q from local store: %v", cfg.ProxySetHeaders, err)
        }

        setHeaders = cmap.Data
    }

    addHeaders := map[string]string{}
    if cfg.AddHeaders != "" {
        cmap, err := n.store.GetConfigMap(cfg.AddHeaders)
        if err != nil {
            glog.Warningf("Error reading ConfigMap %q from local store: %v", cfg.AddHeaders, err)
        }

        addHeaders = cmap.Data
    }

通过上面的代码,基于我们的例子配置来说,cfg.ProxySetHeaders值为“ingress-nginx/custom-headers”,通过n.store.GetConfigMap调用,最终得到setHeaders值为

map[string]string{
  "X-Different-Name": "true",
  "X-Request-Start": "t=${msec}",
  "X-Using-Nginx-Controller": "true"
}
  • 生成配置文件内容

代码如下所示,这块主要使用了text/template模块的功能来生成配置文件。主要功能在于模板的内容,后面再具体分析这块内容。

        // 构建模板配置参数
    tc := ngx_config.TemplateConfig{
        ProxySetHeaders:             setHeaders,
        AddHeaders:                  addHeaders,
        MaxOpenFiles:                maxOpenFiles,
        BacklogSize:                 sysctlSomaxconn(),
        Backends:                    ingressCfg.Backends,
        PassthroughBackends:         ingressCfg.PassthroughBackends,
        Servers:                     ingressCfg.Servers,
        TCPBackends:                 ingressCfg.TCPEndpoints,
        UDPBackends:                 ingressCfg.UDPEndpoints,
        HealthzURI:                  ngxHealthPath,
        CustomErrors:                len(cfg.CustomHTTPErrors) > 0,
        Cfg:                         cfg,
        IsIPV6Enabled:               n.isIPV6Enabled && !cfg.DisableIpv6,
        NginxStatusIpv4Whitelist:    cfg.NginxStatusIpv4Whitelist,
        NginxStatusIpv6Whitelist:    cfg.NginxStatusIpv6Whitelist,
        RedirectServers:             redirectServers,
        IsSSLPassthroughEnabled:     n.cfg.EnableSSLPassthrough,
        ListenPorts:                 n.cfg.ListenPorts,
        PublishService:              n.GetPublishService(),
        DynamicConfigurationEnabled: n.cfg.DynamicConfigurationEnabled,
        DisableLua:                  n.cfg.DisableLua,
    }

    tc.Cfg.Checksum = ingressCfg.ConfigurationChecksum
        // 调用模板,生成配置文本内容
    content, err := n.t.Write(tc)
    if err != nil {
        return err
    }
  • 生成opentracing配置文件
    如果启动了opentracing,就需要生成opentracing配置文件,opentracing的配置文件为/etc/nginx/opentracing.json,它支持两种zipkin、jaeger两种opentracing系统。它们的模板文件如下所示:
const zipkinTmpl = `{
  "service_name": "{{ .ZipkinServiceName }}",
  "collector_host": "{{ .ZipkinCollectorHost }}",
  "collector_port": {{ .ZipkinCollectorPort }},
  "sample_rate": {{ .ZipkinSampleRate }}
}`

const jaegerTmpl = `{
  "service_name": "{{ .JaegerServiceName }}",
  "sampler": {
    "type": "{{ .JaegerSamplerType }}",
    "param": {{ .JaegerSamplerParam }}
  },
  "reporter": {
    "localAgentHostPort": "{{ .JaegerCollectorHost }}:{{ .JaegerCollectorPort }}"
  }
}`
  • 测试配置文件的正确性
    由NGINXController.testTemplate方法实现,主要功能为:
    1)生成测试配置文件
    2)调用nginx -c ${Config} -t来测试配置文件的正确性
  • 生成配置文件并重载配置文件
    生成/etc/nginx/nginx.conf文件,调用nginx -s reload触发配置加载。

Ingress的更新分析完了,到这里我们还遗留了两个问题:
1)动态配置功能
2)配置文件模板

动态配置功能

动态配置把一个Backend封装成JSON格式,并把内容POST到内部的被Lua处理的HTTP服务。

Backend描述了关联到service的一个或者多个远程服务(endpoints)。
譬如,下面是一个ingress的yaml文件,对应的backend的概念就是对应的rules->host:foo.bar.com->paths->path->backend

apiVersion: extensions/v1beta1
kind: Ingress
metadata:
  name: test-ingress
  annotations:
    ingress.kubernetes.io/rewrite-target: /
  namespace: ingress-nginx
spec:
  rules:
  -  host: foo.bar.com
     http:
      paths:
      - path: /
        backend:
          serviceName: my-service
          servicePort: 1005

动态配置代码如下:

func configureDynamically(pcfg *ingress.Configuration, port int) error {
        // 生成新的backends,TODO:为什么这里要生成新的,不直接用pcfg.Backends???
        // 看起来是去掉了service字段信息
    backends := make([]*ingress.Backend, len(pcfg.Backends))

    for i, backend := range pcfg.Backends {
        luaBackend := &ingress.Backend{
            Name:            backend.Name,   // <namespace>-<name>-<port>
            Port:            backend.Port,
            Secure:          backend.Secure,
            SSLPassthrough:  backend.SSLPassthrough,
            SessionAffinity: backend.SessionAffinity,
            UpstreamHashBy:  backend.UpstreamHashBy,
            LoadBalancing:   backend.LoadBalancing,
        }

        var endpoints []ingress.Endpoint
        for _, endpoint := range backend.Endpoints {
            endpoints = append(endpoints, ingress.Endpoint{
                Address:     endpoint.Address,
                FailTimeout: endpoint.FailTimeout,
                MaxFails:    endpoint.MaxFails,
                Port:        endpoint.Port,
            })
        }

        luaBackend.Endpoints = endpoints
        backends[i] = luaBackend
    }

        // 把所有的backends打包成json
    buf, err := json.Marshal(backends)
    if err != nil {
        return err
    }

    glog.V(2).Infof("Posting backends configuration: %s", buf)
        // post backends数据到nginx的http服务,让lua去处理
    url := fmt.Sprintf("http://localhost:%d/configuration/backends", port)
    resp, err := http.Post(url, "application/json", bytes.NewReader(buf))
    if err != nil {
        return err
    }
        ......
    return nil
}

对于configuration/backends的endpoint的处理逻辑,我们可以看一下nginx.tmpl文件中的片段:

        {{ if $all.DynamicConfigurationEnabled }}
        location /configuration {
            {{ if $cfg.EnableOpentracing }}
            opentracing off;
            {{ end }}

            allow 127.0.0.1;
            {{ if $IsIPV6Enabled }}
            allow ::1;
            {{ end }}
            deny all;

            # this should be equals to configuration_data dict
            client_max_body_size                    "10m";
            proxy_buffering                         off;

            content_by_lua_block {
              configuration.call()
            }
        }
        {{ end }}

最终调用了lua代码configuration.call()。

配置文件模板

nginx的配置文件模板是基于text/template模块实现的,这里就不详细介绍text/template了。

前面我们知道,模板中使用的变量是TemplateConfig结构,TemplateConfig的结构定义如下:

type TemplateConfig struct {
    ProxySetHeaders             map[string]string // SetHeaders中的内容
    AddHeaders                  map[string]string    // AddHeaders中的内容
    MaxOpenFiles                int              // 最大打开文件数
    BacklogSize                 int
    Backends                    []*ingress.Backend  // 所以的backends
    PassthroughBackends         []*ingress.SSLPassthroughBackend
    Servers                     []*ingress.Server  // servers信息
    TCPBackends                 []ingress.L4Service  // TCP反向代理
    UDPBackends                 []ingress.L4Service  // UDP反向代理
    HealthzURI                  string
    CustomErrors                bool
    Cfg                         Configuration
    IsIPV6Enabled               bool
    IsSSLPassthroughEnabled     bool
    NginxStatusIpv4Whitelist    []string
    NginxStatusIpv6Whitelist    []string
    RedirectServers             map[string]string
    ListenPorts                 *ListenPorts
    PublishService              *apiv1.Service
    DynamicConfigurationEnabled bool
    DisableLua                  bool
}

nginx的配置文件模板为/etc/nginx/template/nginx.tmpl,在前面动态配置分析中,我们列举了一部分内容。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,607评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,047评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,496评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,405评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,400评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,479评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,883评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,535评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,743评论 1 295
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,544评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,612评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,309评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,881评论 3 306
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,891评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,136评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,783评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,316评论 2 342

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,579评论 18 139
  • 大多数 Nginx 新手都会频繁遇到这样一个困惑,那就是当同一个location配置块使用了多个 Nginx 模块...
    SkTj阅读 7,572评论 0 12
  • Ingress Nginx的系统架构 Ingress Nginx的主流程逻辑 解析命令行参数一个常见的命令行如下所...
    何约什阅读 2,743评论 0 0
  • feisky云计算、虚拟化与Linux技术笔记posts - 1014, comments - 298, trac...
    不排版阅读 3,813评论 0 5
  • 夜下种子 书评︳德伯家的苔丝 假如生活欺骗了你 (俄)普希金 假如生活欺骗了你 不要悲伤,不要心急 忧郁的日子里需...
    Tina大婷阅读 436评论 0 3