1. 前言
转载请说明原文出处, 尊重他人劳动成果!
源码位置: https://github.com/nicktming/istio
分支: tming-v1.3.6 (基于1.3.6版本)
1. [istio源码分析][pilot] pilot之configController (mcp client)
2. [istio源码分析][pilot] pilot之ServiceController
前面两篇文章分析了
configController
和ServiceController
,configController
可以从galley
获得资源(VirtualService
等)信息,ServiceController
可以从k8s
中获得pod
,Service
等信息并转成istio
的类型.
本文将分析DiscoveryServer
的作用.
2. DiscoveryService
看一下
DiscoveryService
是如何初始化的.
// pilot/cmd/pilot-discovery/main.go
var (
serverArgs = bootstrap.PilotArgs{
CtrlZOptions: ctrlz.DefaultOptions(),
KeepaliveOptions: keepalive.DefaultOption(),
}
...
)
...
discoveryServer, err := bootstrap.NewServer(serverArgs)
...
// pilot/pkg/bootstrap/server.go
func NewServer(args PilotArgs) (*Server, error) {
...
if err := s.initDiscoveryService(&args); err != nil {
return nil, fmt.Errorf("discovery service: %v", err)
}
...
}
调用
initDiscoveryService
初始化.
2.1 initDiscoveryService
// pilot/pkg/bootstrap/server.go
func (s *Server) initDiscoveryService(args *PilotArgs) error {
environment := &model.Environment{
Mesh: s.mesh,
MeshNetworks: s.meshNetworks,
IstioConfigStore: s.istioConfigStore,
ServiceDiscovery: s.ServiceController,
PushContext: model.NewPushContext(),
}
// 构造一个DiscoveryService 主要是提供给一些api供用户查看
discovery, err := envoy.NewDiscoveryService(
environment,
args.DiscoveryOptions,
)
if err != nil {
return fmt.Errorf("failed to create discovery service: %v", err)
}
s.mux = discovery.RestContainer.ServeMux
// 创建DiscoveryServer并赋值给s.EnvoyXdsServer
s.EnvoyXdsServer = envoyv2.NewDiscoveryServer(environment,
istio_networking.NewConfigGenerator(args.Plugins),
s.ServiceController, s.kubeRegistry, s.configController)
s.EnvoyXdsServer.InitDebug(s.mux, s.ServiceController)
if s.kubeRegistry != nil {
s.kubeRegistry.Env = environment
s.kubeRegistry.InitNetworkLookup(s.meshNetworks)
s.kubeRegistry.XDSUpdater = s.EnvoyXdsServer
}
// 启动DiscoveryServer
s.addStartFunc(func(stop <-chan struct{}) error {
s.EnvoyXdsServer.Start(stop)
return nil
})
...
return nil
}
1. 构造了一个
model.Environment
对象.
2. 构造一个DiscoveryService
, 主要是提供给一些api
供用户查看.
3. 创建DiscoveryServer
并赋值给s.EnvoyXdsServer
.
4. 启动DiscoveryServer
.
5. 启动http grpc
服务.
2.2 v2/discovery
// pilot/pkg/proxy/envoy/v2/discovery.go
func NewDiscoveryServer(
env *model.Environment,
generator core.ConfigGenerator,
ctl model.Controller,
kubeController *controller.Controller,
configCache model.ConfigStoreCache) *DiscoveryServer {
out := &DiscoveryServer{
Env: env,
ConfigGenerator: generator,
ConfigController: configCache,
KubeController: kubeController,
EndpointShardsByService: map[string]map[string]*EndpointShards{},
WorkloadsByID: map[string]*Workload{},
concurrentPushLimit: make(chan struct{}, features.PushThrottle),
pushChannel: make(chan *model.PushRequest, 10),
pushQueue: NewPushQueue(),
}
// Flush cached discovery responses whenever services, service
// instances, or routing configuration changes.
// 注册一个serviceHandler
serviceHandler := func(*model.Service, model.Event) { out.clearCache() }
if err := ctl.AppendServiceHandler(serviceHandler); err != nil {
return nil
}
// 注册一个instanceHandler
instanceHandler := func(*model.ServiceInstance, model.Event) { out.clearCache() }
if err := ctl.AppendInstanceHandler(instanceHandler); err != nil {
return nil
}
authn_model.JwtKeyResolver.PushFunc = out.ClearCache
if configCache != nil {
configHandler := func(model.Config, model.Event) { out.clearCache() }
for _, descriptor := range model.IstioConfigTypes {
// 每一个支持的类型都注册一个handler
configCache.RegisterEventHandler(descriptor.Type, configHandler)
}
}
...
return out
}
这里可以看到所有资源注册的
handler
都是out.clearCache
方法, 所以很有必要看一下clearCache
的操作.
func (s *DiscoveryServer) clearCache() {
s.ConfigUpdate(&model.PushRequest{Full: true})
}
func (s *DiscoveryServer) ConfigUpdate(req *model.PushRequest) {
inboundConfigUpdates.Increment()
s.pushChannel <- req
}
1. 构造了一个
model.PushRequest{Full: true}
并且写入到s.pushChannel
这个channel
中.
2. 从 [istio源码分析][pilot] pilot之configController (mcp client) 中可以知道如果类型不是ServiceEntry
的时候会调用s.EnvoyXdsServer.ConfigUpdate(&model.PushRequest{Full: true})
.
3. 从 [istio源码分析][pilot] pilot之ServiceController 中可以知道调用AppendXXXHandler
的时候在调用完c.XDSUpdater.XXXUpdate
后会调用注册的方法, 也就是ClearCache
.
那最终谁会从
s.pushChannel
中读取数据并做什么的操作呢?
start
// pilot/pkg/proxy/envoy/v2/discovery.go
func (s *DiscoveryServer) Start(stopCh <-chan struct{}) {
go s.handleUpdates(stopCh)
go s.periodicRefresh(stopCh)
go s.periodicRefreshMetrics(stopCh)
go s.sendPushes(stopCh)
}
查看
DiscoveryServer
的start
方法
handleUpdates
// pilot/pkg/proxy/envoy/v2/discovery.go
func (s *DiscoveryServer) handleUpdates(stopCh <-chan struct{}) {
debounce(s.pushChannel, stopCh, s.Push)
}
func debounce(ch chan *model.PushRequest, stopCh <-chan struct{}, pushFn func(req *model.PushRequest)) {
// 做了一些控制性的事情 最终会落到Push方法
...
}
func (s *DiscoveryServer) Push(req *model.PushRequest) {
if !req.Full {
req.Push = s.globalPushContext()
go s.AdsPushAll(versionInfo(), req)
return
}
...
// 创建一个新的PushContext
push := model.NewPushContext()
// 从当前状态中获得信息(istioConfigStore,ServiceController)
// 也就是galley,k8s中获取的内容
err := push.InitContext(s.Env)
...
s.updateMutex.Lock()
// 更新PushContext
s.Env.PushContext = push
s.updateMutex.Unlock()
...
req.Push = push
go s.AdsPushAll(versionLocal, req)
}
1.
handleUpdates
调用debounce
,debounce
做了一些控制性的事情, 从s.pushChannel
中获得req
, 然后调用Push
方法处理.
2. 关于Push
注意以下几点:2.1 如果
req.Full = false
, 则使用当前的pushcontext
调用AdsPushAll
方法.
2. 如果req.Full = true
, 则生成新的push context
, 通过InitContext
为新的push
填充数据, 数据从s.Env
中得到, 因为s.Env
中包含了istioConfigStore,ServiceController
, 也就是galley
,k8s
中的资源数据.
3. 调用AdsPushAll
方法, 此时的req.push
已经被填充数据了.
AdsPushAll
// pilot/pkg/proxy/envoy/v2/ads.go
func (s *DiscoveryServer) AdsPushAll(version string, req *model.PushRequest) {
if !req.Full {
s.edsIncremental(version, req.Push, req)
return
}
...
s.startPush(req)
}
func (s *DiscoveryServer) startPush(req *model.PushRequest) {
adsClientsMutex.RLock()
// Create a temp map to avoid locking the add/remove
pending := []*XdsConnection{}
// 所有的ads client端
for _, v := range adsClients {
pending = append(pending, v)
}
adsClientsMutex.RUnlock()
currentlyPending := s.pushQueue.Pending()
if currentlyPending != 0 {
adsLog.Infof("Starting new push while %v were still pending", currentlyPending)
}
req.Start = time.Now()
for _, p := range pending {
// 为每一个ads client端都添加了这个req
s.pushQueue.Enqueue(p, req)
}
}
1. 最终到
startPush
为每个ads client
端都添加了这个req
到s.pushQueue
中.
那
s.pushQueue
中的内容什么时候会取出来运行呢?
sendPushes
func doSendPushes(stopCh <-chan struct{}, semaphore chan struct{}, queue *PushQueue) {
for {
select {
case <-stopCh:
return
default:
...
client, info := queue.Dequeue()
...
go func() {
...
select {
// 组装成一个XdsEvent 往XdsConnection类型中的pushChannel中
case client.pushChannel <- &XdsEvent{
push: info.Push,
edsUpdatedServices: edsUpdates,
done: doneFunc,
start: info.Start,
targetNamespaces: info.TargetNamespaces,
}:
return
...
}()
}
}
}
func (s *DiscoveryServer) sendPushes(stopCh <-chan struct{}) {
doSendPushes(stopCh, s.concurrentPushLimit, s.pushQueue)
}
1. 从
queue.Dequeue()
取出来运行, 放到XdsConnection类型
中的pushChannel
. 其实就是往各个注册的client
中发送数据.
2.3 总结
3. 参考
1.
istio 1.3.6源码