1. 前言
转载请说明原文出处, 尊重他人劳动成果!
源码位置: https://github.com/nicktming/istio
分支: tming-v1.3.6 (基于1.3.6版本)
1. [istio源码分析][galley] galley之上游(source)
2. [istio源码分析][galley] galley之runtime
3. [istio源码分析][galley] galley之下游(mcp)
在前面几篇文章中已经分析了galley
的整个流程,galley
中最终把从source
(fs
,k8s
) 中获得的数据会从mcp server
中push
到mcp client
, 那本文将会分析pilot
中configController
是如何使用mcp client
来接收数据并如何处理的.
2. ConfigController
先看一下
configController
在pilot
是如何初始化的.
// pilot/cmd/pilot-discovery/main.go
var (
serverArgs = bootstrap.PilotArgs{
CtrlZOptions: ctrlz.DefaultOptions(),
KeepaliveOptions: keepalive.DefaultOption(),
}
...
)
...
discoveryServer, err := bootstrap.NewServer(serverArgs)
...
// pilot/pkg/bootstrap/server.go
func NewServer(args PilotArgs) (*Server, error) {
if err := s.initMesh(&args); err != nil {
return nil, fmt.Errorf("mesh: %v", err)
}
...
if err := s.initConfigController(&args); err != nil {
return nil, fmt.Errorf("config controller: %v", err)
}
}
对分析不影响的代码直接删减了.
func (s *Server) initConfigController(args *PilotArgs) error {
if len(s.mesh.ConfigSources) > 0 {
// 如果有config source的配置 则配置mcp client
if err := s.initMCPConfigController(args); err != nil {
return err
}
}
...
// Create the config store.
s.istioConfigStore = model.MakeIstioStore(s.configController)
return nil
}
1. 可以看到
s.istioConfigStore
实质上就是s.configController
.
2. 主要关注mcp configuration
, 关于mesh
配置信息可以参考 [istio源码分析] istio源码开发调试版简单安装 .
2.1 initMCPConfigController
func (s *Server) initMCPConfigController(args *PilotArgs) error {
clientNodeID := ""
collections := make([]sink.CollectionOptions, len(model.IstioConfigTypes))
for i, t := range model.IstioConfigTypes {
// 都是istio crd资源 没有原生的k8s资源 比如pod, service等
collections[i] = sink.CollectionOptions{Name: t.Collection, Incremental: false}
}
options := coredatamodel.Options{
DomainSuffix: args.Config.ControllerOptions.DomainSuffix,
// 后面会用到
ClearDiscoveryServerCache: func() {
s.EnvoyXdsServer.ConfigUpdate(&model.PushRequest{Full: true})
},
}
...
for _, configSource := range s.mesh.ConfigSources {
if strings.Contains(configSource.Address, fsScheme+"://") {
...
}
// 设置安全访问的情况 在以后分析policy的时候会用到
securityOption := grpc.WithInsecure()
if configSource.TlsSettings != nil &&
configSource.TlsSettings.Mode != istio_networking_v1alpha3.TLSSettings_DISABLE {
...
}
...
conn, err := grpc.DialContext(
ctx, configSource.Address,
securityOption, msgSizeOption, keepaliveOption, initialWindowSizeOption, initialConnWindowSizeOption)
...
// 创建一个controller
mcpController := coredatamodel.NewController(options)
sinkOptions := &sink.Options{
CollectionOptions: collections,
Updater: mcpController,
ID: clientNodeID,
Reporter: reporter,
}
// 创建mcp client
cl := mcpapi.NewResourceSourceClient(conn)
mcpClient := sink.NewClient(cl, sinkOptions)
configz.Register(mcpClient)
clients = append(clients, mcpClient)
conns = append(conns, conn)
// 将该controller加入到configStores
configStores = append(configStores, mcpController)
}
...
// Wrap the config controller with a cache.
aggregateMcpController, err := configaggregate.MakeCache(configStores)
if err != nil {
return err
}
s.configController = aggregateMcpController
return nil
}
1. 关注
options.ClearDiscoveryServerCache
, 后面会用到.
2.coredatamodel.NewController(options)
创建一个controller
.
3.sink.NewClient(cl, sinkOptions)
创建一个mcp client
, 注意sinkOptions.Updater
就是2.中创建的controller
. 另外mcp server
端在galley
.
4.configaggregate.MakeCache(configStores)
是将所有的controller
按照其支持的collection
(比如virtualService
对应了哪些controller
)进行分类起来.
2.2 mcp client
// pkg/mcp/sink/client_sink.go
func NewClient(client mcp.ResourceSourceClient, options *Options) *Client {
return &Client{
Sink: New(options),
reporter: options.Reporter,
client: client,
}
}
// pkg/mcp/sink/sink.go
func New(options *Options) *Sink { // nolint: lll
nodeInfo := &mcp.SinkNode{
Id: options.ID,
Annotations: options.Metadata,
}
state := make(map[string]*perCollectionState)
// state来自options.CollectionOptions
for _, collection := range options.CollectionOptions {
state[collection.Name] = &perCollectionState{
versions: make(map[string]string),
requestIncremental: collection.Incremental,
}
}
return &Sink{
...
}
}
这里需要关注如下:
1. 可以看到Sink
中的state
来自options.CollectionOptions
, 往上追溯到initMCPConfigController
的model.IstioConfigTypes
.
IstioConfigTypes = ConfigDescriptor{
VirtualService,
Gateway,
ServiceEntry,
DestinationRule,
EnvoyFilter,
Sidecar,
HTTPAPISpec,
HTTPAPISpecBinding,
QuotaSpec,
QuotaSpecBinding,
AuthenticationPolicy,
AuthenticationMeshPolicy,
ServiceRole,
ServiceRoleBinding,
RbacConfig,
ClusterRbacConfig,
}
可以看到
model.IstioConfigTypes
中看到的都是istio
中的一些crd
资源, 也就是说从galley
中得到的config resource
都是这些资源, 没有k8s
中的原生资源, 比如Pod
等.
2.3 mcp client Start
// pkg/mcp/sink/client_sink.go
func (c *Client) Run(ctx context.Context) {
...
for {
// 建立连接
for {
...
stream, err := c.client.EstablishResourceStream(ctx)
...
}
// 处理
err := c.ProcessStream(c.stream)
...
}
}
// pkg/mcp/sink/sink.go
func (sink *Sink) ProcessStream(stream Stream) error {
// send initial requests for each supported type
// 为每一个支持的类型发送一个初始的请求
initialRequests := sink.createInitialRequests()
for {
var req *mcp.RequestResources
if len(initialRequests) > 0 {
// 发送初始request
req = initialRequests[0]
initialRequests = initialRequests[1:]
} else {
// 从server端接收response
resources, err := stream.Recv()
if err != nil {
if err != io.EOF {
sink.reporter.RecordRecvError(err, status.Code(err))
scope.Errorf("Error receiving MCP resource: %v", err)
}
return err
}
// client端处理后需要发送ACK/NACK
// 所以处理response后组装了一个request
req = sink.handleResponse(resources)
}
sink.journal.RecordRequestResources(req)
// 向server端发送request
if err := stream.Send(req); err != nil {
sink.reporter.RecordSendError(err, status.Code(err))
scope.Errorf("Error sending MCP request: %v", err)
return err
}
}
}
关于
mcp
中的client
和server
之间的交互在 [istio源码分析][galley] galley之下游(mcp) 中已经有介绍, 这里再次说明一下.
关于
mcp
可以参考 https://github.com/istio/api/tree/master/mcp, 这里用此图可以增加理解
对比此图和
ProcessStream
来进行说明:
1.client
端为每一个支持的类型initialRequests
发送一个初始的请求.
2.server
端会返回一个response
.
3.client
端需要返回一个ACK/NACK
, 所以ProcessStream
中的sink.handleResponse(resources)
中处理完response
又构造了一个新的request
来返回给server
端.
所以先看一下都发了哪些类型:
func (sink *Sink) createInitialRequests() []*mcp.RequestResources {
sink.mu.Lock()
initialRequests := make([]*mcp.RequestResources, 0, len(sink.state))
// sink.state 来源自 initMCPConfigController中的model.IstioConfigTypes
for collection, state := range sink.state {
var initialResourceVersions map[string]string
if state.requestIncremental {
...
}
req := &mcp.RequestResources{
SinkNode: sink.nodeInfo,
Collection: collection,
InitialResourceVersions: initialResourceVersions,
Incremental: state.requestIncremental,
}
initialRequests = append(initialRequests, req)
}
sink.mu.Unlock()
return initialRequests
}
可以看到发送的类型就是
initMCPConfigController
中的model.IstioConfigTypes
.
2.4 handleResponse
func (sink *Sink) handleResponse(resources *mcp.Resources) *mcp.RequestResources {
if handleResponseDoneProbe != nil {
defer handleResponseDoneProbe()
}
// 必须是支持的类型
state, ok := sink.state[resources.Collection]
if !ok {
errDetails := status.Errorf(codes.Unimplemented, "unsupported collection %v", resources.Collection)
return sink.sendNACKRequest(resources, errDetails)
}
change := &Change{
Collection: resources.Collection,
Objects: make([]*Object, 0, len(resources.Resources)),
Removed: resources.RemovedResources,
Incremental: resources.Incremental,
SystemVersionInfo: resources.SystemVersionInfo,
}
for _, resource := range resources.Resources {
var dynamicAny types.DynamicAny
if err := types.UnmarshalAny(resource.Body, &dynamicAny); err != nil {
return sink.sendNACKRequest(resources, err)
}
// TODO - use galley metadata to verify collection and type_url match?
object := &Object{
TypeURL: resource.Body.TypeUrl,
Metadata: resource.Metadata,
Body: dynamicAny.Message,
}
change.Objects = append(change.Objects, object)
}
if err := sink.updater.Apply(change); err != nil {
// 发送NACK
errDetails := status.Error(codes.InvalidArgument, err.Error())
return sink.sendNACKRequest(resources, errDetails)
}
...
// ACK
sink.reporter.RecordRequestAck(resources.Collection, 0)
req := &mcp.RequestResources{
SinkNode: sink.nodeInfo,
Collection: resources.Collection,
ResponseNonce: resources.Nonce,
Incremental: useIncremental,
}
return req
}
1. 根据
response
发送回来的数据组装成change
, 并将该change
作为参数调用sink.updater.Apply
方法.
2. 有任何错误会发送NACK
给server
端, 如果没有错误就发送ACK
给server
端.
那
sink.updater
是什么呢? 在initMCPConfigController
中可以看到:
options := coredatamodel.Options{
DomainSuffix: args.Config.ControllerOptions.DomainSuffix,
// 后面会用到
ClearDiscoveryServerCache: func() {
s.EnvoyXdsServer.ConfigUpdate(&model.PushRequest{Full: true})
},
}
...
mcpController := coredatamodel.NewController(options)
sinkOptions := &sink.Options{
CollectionOptions: collections,
Updater: mcpController,
ID: clientNodeID,
Reporter: reporter,
}
// 创建mcp client
cl := mcpapi.NewResourceSourceClient(conn)
mcpClient := sink.NewClient(cl, sinkOptions)
sink.updater
就是mcpController
.
3. Controller
func NewController(options Options) CoreDataModel {
descriptorsByMessageName := make(map[string]model.ProtoSchema, len(model.IstioConfigTypes))
synced := make(map[string]bool)
for _, descriptor := range model.IstioConfigTypes {
// don't register duplicate descriptors for the same collection
if _, ok := descriptorsByMessageName[descriptor.Collection]; !ok {
descriptorsByMessageName[descriptor.Collection] = descriptor
synced[descriptor.Collection] = false
}
}
return &Controller{
...
}
}
关注一下
descriptorsByMessageName
是如何生成的即可.
3.1 Apply
func (c *Controller) Apply(change *sink.Change) error {
descriptor, ok := c.descriptorsByCollection[change.Collection]
if !ok {
return fmt.Errorf("apply type not supported %s", change.Collection)
}
schema, valid := c.ConfigDescriptor().GetByType(descriptor.Type)
if !valid {
return fmt.Errorf("descriptor type not supported %s", descriptor.Type)
}
c.syncedMu.Lock()
c.synced[change.Collection] = true
c.syncedMu.Unlock()
// innerStore is [namespace][name]
innerStore := make(map[string]map[string]*model.Config)
// 根据change的信息生成以innerStore
for _, obj := range change.Objects {
//构造innerStore
}
var prevStore map[string]map[string]*model.Config
c.configStoreMu.Lock()
prevStore = c.configStore[descriptor.Type]
c.configStore[descriptor.Type] = innerStore
c.configStoreMu.Unlock()
if descriptor.Type == model.ServiceEntry.Type {
c.serviceEntryEvents(innerStore, prevStore)
} else {
c.options.ClearDiscoveryServerCache()
}
return nil
}
1. 根据
change
构造innerStore
, 进而更新该类型在c.configStore
中的内容.
2. 根据旧内容prevStore
和新内容innerStore
来做分发工作.
2.1 如果是
ServiceEntry
, 调用serviceEntryEvents
方法.
func (c *Controller) serviceEntryEvents(currentStore, prevStore map[string]map[string]*model.Config) {
dispatch := func(model model.Config, event model.Event) {}
if handlers, ok := c.eventHandlers[model.ServiceEntry.Type]; ok {
dispatch = func(model model.Config, event model.Event) {
log.Debugf("MCP event dispatch: key=%v event=%v", model.Key(), event.String())
for _, handler := range handlers {
handler(model, event)
}
}
}
// add/update
for namespace, byName := range currentStore {
for name, config := range byName {
if prevByNamespace, ok := prevStore[namespace]; ok {
if prevConfig, ok := prevByNamespace[name]; ok {
if config.ResourceVersion != prevConfig.ResourceVersion {
dispatch(*config, model.EventUpdate)
}
} else {
dispatch(*config, model.EventAdd)
}
} else {
dispatch(*config, model.EventAdd)
}
}
}
...
}
func (c *Controller) RegisterEventHandler(typ string, handler func(model.Config, model.Event)) {
c.eventHandlers[typ] = append(c.eventHandlers[typ], handler)
}
通过注册好了的
handler
来处理这些生成的事件.
2.2 如果不是
ServiceEntry
, 调用ClearDiscoveryServerCache
方法.
s.EnvoyXdsServer.ConfigUpdate(&model.PushRequest{Full: true})
所以这个放到以后分析.
4. 总结
从
mcp server
中接收数据后通过handleResponse
调用controller
的Apply
方法, 通过类型来进行处理. 处理完向server
端返回ACK/NACK
.
5. 参考
1.
istio 1.3.6源码
2. https://cloud.tencent.com/developer/article/1409159