Ingress Nginx的系统架构
Ingress Nginx的主流程逻辑
- 解析命令行参数
一个常见的命令行如下所示
/nginx-ingress-controller --default-backend-service=ingress-nginx/default-http-backend --configmap=ingress-nginx/nginx-configuration --tcp-services-configmap=ingress-nginx/tcp-services --udp-services-configmap=ingress-nginx/udp-services --publish-service=ingress-nginx/ingress-nginx --annotations-prefix=nginx.ingress.kubernetes.io - 显示nginx的版本号
实际调用命令为:nginx -v或者nginx -V - 创建 API Server客户端
有两种模式来获取APIServer的客户端对象:
第一种是指定配置APIServerHost、配置KubeConfigFie;
第二种是InCluster模式,这种一般是基于k8s调用Ingress Nginx运行,作为容器启动,会从环境变量中读取相关参数:
1)KUBERNETES_SERVICE_HOST 例如:KUBERNETES_PORT=tcp://10.96.0.1:443
2)/var/run/secrets/kubernetes.io/serviceaccount 目录下的token, ca.crt文件 - 解析并验证是否存在指定的缺省后端服务的名字空间和Service名称
这个是ingress-nginx的默认后端,用来将未知请求全部负载到这个默认后端上,这个默认后端会返回404页面。 - 创建伪SSL证书
TODO:具体用处待分析 - 普罗米修斯监控初始化
- 创建并启动NGINXController对象,后面
- 启动HTTP服务,主要用于健康检查、指标查看、Profile功能
创建NGINXController对象
这个功能在NewNGINXController方法中完成,代码的分布解析如下:
- 创建并启动事件广播
eventBroadcaster := record.NewBroadcaster() // 创建事件广播对象
eventBroadcaster.StartLogging(glog.Infof) // 启动事件日志记录功能
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{ // 启动日志sink功能,同步到API Server
Interface: config.Client.CoreV1().Events(config.Namespace),
})
上面的代码创建并启动了事件广播对象,事件产生器是在NginxController对象构建中创建
n := &NGINXController{
......
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{
Component: "nginx-ingress-controller",
})
......
}
// 产生事件的代码,譬如创建了一个Ingress
recorder.Eventf(ing, corev1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", ing.Namespace, ing.Name))
- 接下来会获取系统名字服务器的IP信息,具体调用
h, err := dns.GetSystemNameServers()
从 /etc/resolv.conf 文件中读取dns的resolve ip信息
- 创建NGINXController对象
n := &NGINXController{
isIPV6Enabled: ing_net.IsIPv6Enabled(),
resolver: h,
cfg: config,
syncRateLimiter: flowcontrol.NewTokenBucketRateLimiter(config.SyncRateLimit, 1),
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{
Component: "nginx-ingress-controller",
}),
stopCh: make(chan struct{}),
updateCh: channels.NewRingChannel(1024),
stopLock: &sync.Mutex{},
fileSystem: fs,
runningConfig: new(ingress.Configuration), // 运行时配置,刚开始为空,会在同步Ingress信息的时候,填充完整,见syncIngress方法
Proxy: &TCPProxy{},
metricCollector: mc,
}
syncRateLimiter 成员是流控对象,k8s流控依赖于golang.org/x/time/rate中的频率限制模块,流量控制的接口如下:
type RateLimiter interface {
// TryAccept returns true if a token is taken immediately. Otherwise,
// it returns false.
TryAccept() bool
// Accept returns once a token becomes available.
Accept()
// Stop stops the rate limiter, subsequent calls to CanAccept will return false
Stop()
// QPS returns QPS of this rate limiter
QPS() float32
}
实现流量控制,一般我们会设置流控的QPS,在执行操作执行,我们先调用一下流控的Accept函数,等待令牌满足,才接着往下执行,这样就达到了流控的效果。
updateCh成员是RingChannel实例对象,它实现了一个永远不会阻塞写操作的Channel接口。比如:如果当我们写入RingChannel时,RingChannel缓存满了,那么Buffer中最老的数据就会被丢弃。
store.Storer
Storer 是一个接口,它封装了一个获取ingress、service、secrets和ingress annotations的方法。
type Storer interface {
// GetBackendConfiguration returns the nginx configuration stored in a configmap
GetBackendConfiguration() ngx_config.Configuration
// GetConfigMap returns the ConfigMap matching key.
GetConfigMap(key string) (*corev1.ConfigMap, error)
// GetSecret returns the Secret matching key.
GetSecret(key string) (*corev1.Secret, error)
// GetService returns the Service matching key.
GetService(key string) (*corev1.Service, error)
// GetServiceEndpoints returns the Endpoints of a Service matching key.
GetServiceEndpoints(key string) (*corev1.Endpoints, error)
// GetIngress returns the Ingress matching key.
GetIngress(key string) (*extensions.Ingress, error)
// ListIngresses returns a list of all Ingresses in the store.
ListIngresses() []*extensions.Ingress
// GetIngressAnnotations returns the parsed annotations of an Ingress matching key.
GetIngressAnnotations(key string) (*annotations.Ingress, error)
// GetLocalSSLCert returns the local copy of a SSLCert
GetLocalSSLCert(name string) (*ingress.SSLCert, error)
// ListLocalSSLCerts returns the list of local SSLCerts
ListLocalSSLCerts() []*ingress.SSLCert
// GetAuthCertificate resolves a given secret name into an SSL certificate.
// The secret must contain 3 keys named:
// ca.crt: contains the certificate chain used for authentication
GetAuthCertificate(string) (*resolver.AuthSSLCert, error)
// GetDefaultBackend returns the default backend configuration
GetDefaultBackend() defaults.Backend
// Run initiates the synchronization of the controllers
Run(stopCh chan struct{})
}
Storer对象是与API Server沟通的入口,所以这块是系统的关键,所有的数据变更都是从API Server过来,所以,Storer是驱动系统运行的关键模块。Storer的具体的实现类为k8sStore,它的定义如下:
type k8sStore struct {
isOCSPCheckEnabled bool
// backendConfig contains the running configuration from the configmap
// this is required because this rarely changes but is a very expensive
// operation to execute in each OnUpdate invocation
backendConfig ngx_config.Configuration
// informer contains the cache Informers
informers *Informer // 封装了所有关心的组件的通知机制
// listers contains the cache.Store interfaces used in the ingress controller
listers *Lister // 从通知中获取的对应的只读存储信息
// sslStore local store of SSL certificates (certificates used in ingress)
// this is required because the certificates must be present in the
// container filesystem
sslStore *SSLCertTracker
annotations annotations.Extractor // 提供了Ingress Annotations的名字与提取方法对
// secretIngressMap contains information about which ingress references a
// secret in the annotations.
secretIngressMap ObjectRefMap // 保存了每个Ingress引用了哪些secrets的信息
filesystem file.Filesystem
// updateCh
updateCh *channels.RingChannel
// mu protects against simultaneous invocations of syncSecret
mu *sync.Mutex
defaultSSLCertificate string
}
在k8sStore中封装了几个关键的对象Informers、Listener、updateCh, 其中Informers是关键驱动逻辑,它实时从API Server获取各种资源的变化信息,针对不同类型的资源变化进行相应的回调处理,最终回调函数都会形成事件放入updateCh队列中去处理。
Informer
Informer封装了ingress需要的SharedIndexInformers,用于与API Server交互,它是在Storer构建时创建的,由于篇幅比较中,所以这里单独拉出来探讨。
SharedIndexInformer是基于一种共享的数据通知机制,共享数据通知对象的构建是基于一个Factory来创建和返回。Factory会缓存创建过的对象,下次再次获取同样的对象时,会从缓存中换回。SharedIndexInformer基于两项:底层数据(一般是API Server)和缓存数据,当数据发生变更时,在更新缓存的同时,可以同时向多个侦听器发送通知回调处理。
type Informer struct {
Ingress cache.SharedIndexInformer
Endpoint cache.SharedIndexInformer
Service cache.SharedIndexInformer
Secret cache.SharedIndexInformer
ConfigMap cache.SharedIndexInformer
}
从Informer的定义我们可以看出,系统关心的资源有:Ingress、EndPoint、Service、Secret、ConfigMap。
SharedIndexInformer的创建
所有的SharedIndexInformer都是基于factory创建,如下所示:
// create informers factory, enable and assign required informers
infFactory := informers.NewFilteredSharedInformerFactory(client, resyncPeriod, namespace, func(*metav1.ListOptions) {})
store.informers.Ingress = infFactory.Extensions().V1beta1().Ingresses().Informer()
store.listers.Ingress.Store = store.informers.Ingress.GetStore()
store.informers.Endpoint = infFactory.Core().V1().Endpoints().Informer()
store.listers.Endpoint.Store = store.informers.Endpoint.GetStore()
store.informers.Secret = infFactory.Core().V1().Secrets().Informer()
store.listers.Secret.Store = store.informers.Secret.GetStore()
store.informers.ConfigMap = infFactory.Core().V1().ConfigMaps().Informer()
store.listers.ConfigMap.Store = store.informers.ConfigMap.GetStore()
store.informers.Service = infFactory.Core().V1().Services().Informer()
store.listers.Service.Store = store.informers.Service.GetStore()
每个资源都添加了一个事件处理器,负责处理资源变更事件,这里限于篇幅,只举例Ingress的资源进行说明:
ingEventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
ing := obj.(*extensions.Ingress)
if !class.IsValid(ing) {
a, _ := parser.GetStringAnnotation(class.IngressKey, ing)
glog.Infof("ignoring add for ingress %v based on annotation %v with value %v", ing.Name, class.IngressKey, a)
return
}
recorder.Eventf(ing, corev1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", ing.Namespace, ing.Name)) // 产生一个创建Ingress的事件通知
store.extractAnnotations(ing) // 提取Annotation信息
store.updateSecretIngressMap(ing) // 注意这里,保存的是该Ingress引用的Secrets信息
store.syncSecrets(ing) // 把Ingress相应的Secrets信息同步到对应的文件系统中,主要是TLS Secrets(包括证书和Key)
updateCh.In() <- Event{ // 放入队列,用于更新配置文件
Type: CreateEvent,
Obj: obj,
}
},
DeleteFunc: func(obj interface{}) {
ing, ok := obj.(*extensions.Ingress)
if !ok {
// If we reached here it means the ingress was deleted but its final state is unrecorded.
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
glog.Errorf("couldn't get object from tombstone %#v", obj)
return
}
ing, ok = tombstone.Obj.(*extensions.Ingress)
if !ok {
glog.Errorf("Tombstone contained object that is not an Ingress: %#v", obj)
return
}
}
if !class.IsValid(ing) {
glog.Infof("ignoring delete for ingress %v based on annotation %v", ing.Name, class.IngressKey)
return
}
recorder.Eventf(ing, corev1.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", ing.Namespace, ing.Name))
store.listers.IngressAnnotation.Delete(ing)
key := k8s.MetaNamespaceKey(ing)
store.secretIngressMap.Delete(key)
updateCh.In() <- Event{
Type: DeleteEvent,
Obj: obj,
}
},
UpdateFunc: func(old, cur interface{}) {
oldIng := old.(*extensions.Ingress)
curIng := cur.(*extensions.Ingress)
validOld := class.IsValid(oldIng)
validCur := class.IsValid(curIng)
if !validOld && validCur {
glog.Infof("creating ingress %v based on annotation %v", curIng.Name, class.IngressKey)
recorder.Eventf(curIng, corev1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
} else if validOld && !validCur {
glog.Infof("removing ingress %v based on annotation %v", curIng.Name, class.IngressKey)
recorder.Eventf(curIng, corev1.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
} else if validCur && !reflect.DeepEqual(old, cur) {
recorder.Eventf(curIng, corev1.EventTypeNormal, "UPDATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
}
store.extractAnnotations(curIng)
store.updateSecretIngressMap(curIng)
store.syncSecrets(curIng)
updateCh.In() <- Event{
Type: UpdateEvent,
Obj: cur,
}
},
}
// 添加事件处理器
store.informers.Ingress.AddEventHandler(ingEventHandler)
资源事件变化处理是驱动Ingress Nginx运行的关键,这里先不打算在本篇文章中详细描述了,如果有必要,将会另外写一篇文章去讲述。
启动Informer
Informer的启动在k8sStore的Run方法中驱动,启动。
// Run initiates the synchronization of the informers and the initial
// synchronization of the secrets.
func (s k8sStore) Run(stopCh chan struct{}) {
// start informers
s.informers.Run(stopCh)
if s.isOCSPCheckEnabled {
go wait.Until(s.checkSSLChainIssues, 60*time.Second, stopCh)
}
}
// Run initiates the synchronization of the informers against the API server.
func (i *Informer) Run(stopCh chan struct{}) {
go i.Endpoint.Run(stopCh)
go i.Service.Run(stopCh)
go i.Secret.Run(stopCh)
go i.ConfigMap.Run(stopCh)
// wait for all involved caches to be synced before processing items
// from the queue
if !cache.WaitForCacheSync(stopCh, // 需要等待除了Ingress之外的各种资源同步完成采取处理里面的
i.Endpoint.HasSynced,
i.Service.HasSynced,
i.Secret.HasSynced,
i.ConfigMap.HasSynced,
) {
runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
}
// in big clusters, deltas can keep arriving even after HasSynced
// functions have returned 'true'
time.Sleep(1 * time.Second)
// we can start syncing ingress objects only after other caches are
// ready, because ingress rules require content from other listers, and
// 'add' events get triggered in the handlers during caches population.
go i.Ingress.Run(stopCh) // 启动Ingress Informer
if !cache.WaitForCacheSync(stopCh,
i.Ingress.HasSynced,
) {
runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
}
}
Informers的启动代码比较清晰,首先启动除了Ingress Informer之外的其他资源的Informer,并且等待cache同步完成后,采取启动Ingress Informer,因为Ingress的规则需要其他资源的内容。