1. 前言
转载请说明原文出处, 尊重他人劳动成果!
源码位置: https://github.com/nicktming/istio
分支: tming-v1.3.6 (基于1.3.6版本)
1. [istio源码分析][galley] galley之上游(source)
2. [istio源码分析][galley] galley之runtime
3. [istio源码分析][galley] galley之下游(mcp)
在上文 [istio源码分析][galley] galley之runtime 中分析了
galley
整个机制中一个承上启下的组件, 本文将分析该组件的下游部分, 也就是mcp server
端会承担此部分, 所有对接的mcp client
(比如pilot
)将会接收到此信息.
2. server
可以先看看
server
端是如何初始化的.
// galley/pkg/server/components/processing.go
func NewProcessing(a *settings.Args) *Processing {
d := snapshot.New(groups.IndexFunction)
return &Processing{
args: a,
distributor: d,
configzTopic: configz.CreateTopic(d),
}
}
p.distributor
就是snapshot
的一个实例(后面会对snapshot
分析), 接着看Start()
方法.
func (p *Processing) Start() (err error) {
// TODO: cleanup
...
types := p.getMCPTypes()
processorCfg := runtime.Config{
DomainSuffix: p.args.DomainSuffix,
Mesh: mesh,
Schema: types,
SynthesizeServiceEntries: p.args.EnableServiceDiscovery,
}
p.processor = runtime.NewProcessor(src, p.distributor, &processorCfg)
grpcOptions := p.getServerGrpcOptions()
p.stopCh = make(chan struct{})
var checker source.AuthChecker = server.NewAllowAllChecker()
...
grpc.EnableTracing = p.args.EnableGRPCTracing
p.grpcServer = grpc.NewServer(grpcOptions...)
p.reporter = mcpMetricReporter("galley")
options := &source.Options{
Watcher: p.distributor,
Reporter: p.reporter,
CollectionsOptions: source.CollectionOptionsFromSlice(types.Collections()),
ConnRateLimiter: mcprate.NewRateLimiter(time.Second, 100), // TODO(Nino-K): https://github.com/istio/istio/issues/12074
}
md := grpcMetadata.MD{
versionMetadataKey: []string{version.Info.Version},
}
if err := parseSinkMeta(p.args.SinkMeta, md); err != nil {
return err
}
...
serverOptions := &source.ServerOptions{
AuthChecker: checker,
RateLimiter: rate.NewLimiter(rate.Every(time.Second), 100), // TODO(Nino-K): https://github.com/istio/istio/issues/12074
Metadata: md,
}
p.mcpSource = source.NewServer(options, serverOptions)
...
}
关注这几个地方:
1.options
中的Watcher
就是p.distributor
.
2.p.mcpSource = source.NewServer(options, serverOptions)
创建一个mcp server
端.
// pkg/mcp/source/server_source.go
func NewServer(srcOptions *Options, serverOptions *ServerOptions) *Server {
s := &Server{
src: New(srcOptions),
authCheck: serverOptions.AuthChecker,
rateLimiter: serverOptions.RateLimiter,
metadata: serverOptions.Metadata,
}
return s
}
// pkg/mcp/source/source.go
func New(options *Options) *Source {
s := &Source{
watcher: options.Watcher,
collections: options.CollectionsOptions,
reporter: options.Reporter,
requestLimiter: options.ConnRateLimiter,
}
return s
}
可以看到
server
端的src
的对象都是从Processing.options
里面来的.
2.1 EstablishResourceStream
// pkg/mcp/source/server_source.go
func (s *Server) EstablishResourceStream(stream mcp.ResourceSource_EstablishResourceStreamServer) error {
...
err := s.src.ProcessStream(stream)
code := status.Code(err)
if code == codes.OK || code == codes.Canceled || err == io.EOF {
return nil
}
return err
}
主要关注
ProcessStream
方法
2.2 ProcessStream
1. 通过
newConnection
为该stream
建立连接.
2. 异步接收request
, 通过channel
(con.requestC
传递request
进行处理)
3. 循环处理request
, 从con.requestC
中获得request
, 通过processClientRequest
方法处理request
, 从con.queue
读取需要返回给client
端的response
, 所以可想而知processClientRequest
中会组装response
放到con.queue
中.
func (s *Source) ProcessStream(stream Stream) error {
// 为该client建立连接
con := s.newConnection(stream)
defer s.closeConnection(con)
// 接收request
go con.receive()
for {
select {
case <-con.queue.Ready():
collection, item, ok := con.queue.Dequeue()
if !ok {
break
}
resp := item.(*WatchResponse)
w, ok := con.watches[collection]
if !ok {
scope.Errorf("unknown collection in dequeued watch response: %v", collection)
break // bug?
}
// the response may have been cleared before we got to it
if resp != nil {
if err := con.pushServerResponse(w, resp); err != nil {
return err
}
}
case req, more := <-con.requestC:
// 接收request 可想而知
// receive方法主要是把request放到con.requestC中
if !more {
return con.reqError
}
if con.limiter != nil {
if err := con.limiter.Wait(stream.Context()); err != nil {
return err
}
}
// 处理request
if err := con.processClientRequest(req); err != nil {
return err
}
case <-con.queue.Done():
// queue 关闭
scope.Debugf("MCP: connection %v: stream done", con)
return status.Error(codes.Unavailable, "server canceled watch")
}
}
}
先看一下如何建立连接的
// pkg/mcp/source/source.go
func (s *Source) newConnection(stream Stream) *connection {
peerAddr := "0.0.0.0"
peerInfo, ok := peer.FromContext(stream.Context())
if ok {
peerAddr = peerInfo.Addr.String()
} else {
scope.Warnf("No peer info found on the incoming stream.")
peerInfo = nil
}
con := &connection{
stream: stream,
peerAddr: peerAddr,
requestC: make(chan *mcp.RequestResources),
watches: make(map[string]*watch),
watcher: s.watcher,
id: atomic.AddInt64(&s.nextStreamID, 1),
reporter: s.reporter,
limiter: s.requestLimiter.Create(),
queue: internal.NewUniqueScheduledQueue(len(s.collections)),
}
// 为每个collection建立watch
collections := make([]string, 0, len(s.collections))
for i := range s.collections {
collection := s.collections[i]
w := &watch{
ackedVersionMap: make(map[string]string),
incremental: collection.Incremental,
}
con.watches[collection.Name] = w
collections = append(collections, collection.Name)
}
...
return con
}
可以看到主要是为了给每个
collection
建立一个watch
对象. 看一下接收函数如何实现的.
func (con *connection) receive() {
defer close(con.requestC)
for {
// 接收信息
req, err := con.stream.Recv()
if err != nil {
if err == io.EOF {
scope.Infof("MCP: connection %v: TERMINATED %q", con, err)
return
}
con.reporter.RecordRecvError(err, status.Code(err))
scope.Errorf("MCP: connection %v: TERMINATED with errors: %v", con, err)
con.reqError = err
return
}
select {
// 写入到channel con.requestC
case con.requestC <- req:
case <-con.queue.Done():
scope.Debugf("MCP: connection %v: stream done", con)
return
case <-con.stream.Context().Done():
scope.Debugf("MCP: connection %v: stream done, err=%v", con, con.stream.Context().Err())
return
}
}
}
可以看到从
client
端接到request
后会放入con.requestC
这个channel
. 所以现在回到ProcessStream
方法中看看从con.requestC
中收到request
会如何操作, 会调用processClientRequest
处理request
.
2.3 processClientRequest
func (con *connection) processClientRequest(req *mcp.RequestResources) error {
if isTriggerResponse(req) {
return nil
}
collection := req.Collection
con.reporter.RecordRequestSize(collection, con.id, internal.ProtoSize(req))
// 取出watch
w, ok := con.watches[collection]
if !ok {
return status.Errorf(codes.InvalidArgument, "unsupported collection %q", collection)
}
if req.ResponseNonce == "" || w.pending.GetNonce() == req.ResponseNonce {
versionInfo := ""
if w.pending == nil {
// 发送请求(第一次发送)
scope.Infof("MCP: connection %v: inc=%v WATCH for %v", con, req.Incremental, collection)
} else {
// 发送ACK或者NACK (第二次发送)
versionInfo = w.pending.SystemVersionInfo
if req.ErrorDetail != nil {
scope.Warnf("MCP: connection %v: NACK collection=%v version=%q with nonce=%q error=%#v inc=%v", // nolint: lll
con, collection, req.ResponseNonce, versionInfo, req.ErrorDetail, req.Incremental)
con.reporter.RecordRequestNack(collection, con.id, codes.Code(req.ErrorDetail.Code))
} else {
scope.Infof("MCP: connection %v ACK collection=%v with version=%q nonce=%q inc=%v",
con, collection, versionInfo, req.ResponseNonce, req.Incremental)
con.reporter.RecordRequestAck(collection, con.id)
internal.UpdateResourceVersionTracking(w.ackedVersionMap, w.pending)
}
// clear the pending request after we finished processing the corresponding response.
w.pending = nil
}
if w.cancel != nil {
w.cancel()
}
sr := &Request{
SinkNode: req.SinkNode,
Collection: collection,
VersionInfo: versionInfo,
incremental: req.Incremental,
}
// con.watcher = snapshot
// snapshot的Watcher方法中会组装response 并调用queueResponse方法将response入队列
w.cancel = con.watcher.Watch(sr, con.queueResponse, con.peerAddr)
} else {
...
}
return nil
}
func (con *connection) queueResponse(resp *WatchResponse) {
if resp == nil {
con.queue.Close()
} else {
con.queue.Enqueue(resp.Collection, resp)
}
}
关于
mcp
可以参考 https://github.com/istio/api/tree/master/mcp, 这里用此图可以增加理解
从以上图片和
processClientRequest
可以知道:
1. 第一次从client
端发送request
, 以后的内容都会是从server
端push
到client
端.
这里先分析从
client
发送request
然后server
返回response
最后client
发送ACK
的过程. 然后再分析server
是如何主动push
信息到client
端并且client
端返回ACK
.
2. 更新该
collection
对应的watch
对象中的cancel
方法.
3. 关注w.pending
变量的作用.
4.response
信息在snapshot
中组装后放入到了con.queue
中.
可以看一下
snapshot
的watch
方法
func (c *Cache) Watch(request *source.Request, pushResponse source.PushResponseFunc, peerAddr string) source.CancelWatchFunc { // nolint: lll
group := c.groupIndex(request.Collection, request.SinkNode)
c.mu.Lock()
defer c.mu.Unlock()
// 更新status
info := c.fillStatus(group, request, peerAddr)
collection := request.Collection
// return an immediate response if a snapshot is available and the
// requested version doesn't match.
// 这个snapshots会在setSnapshot方法中更新
if snapshot, ok := c.snapshots[group]; ok {
version := snapshot.Version(request.Collection)
scope.Debugf("Found snapshot for group: %q for %v @ version: %q",
group, request.Collection, version)
if version != request.VersionInfo {
scope.Debugf("Responding to group %q snapshot:\n%v\n", group, snapshot)
response := &source.WatchResponse{
Collection: request.Collection,
Version: version,
Resources: snapshot.Resources(request.Collection),
Request: request,
}
// 放入到con.queue中
pushResponse(response)
return nil
}
info.synced[request.Collection][peerAddr] = true
}
c.watchCount++
watchID := c.watchCount
...
info.mu.Lock()
// 更新watches
info.watches[watchID] = &responseWatch{request: request, pushResponse: pushResponse}
info.mu.Unlock()
...
return cancel
}
1. 如果
version
不同的时候会通过pushResponse
方法放入到con.queue
中将response
发送给client
端.
2. 如果snapshots
中没有或者version
没有更新, 则会更新info.watches
, 在setSnapshot
方法中server
端会push
到client
端.
2.4 pushServerResponse
func (con *connection) pushServerResponse(w *watch, resp *WatchResponse) error {
...
if incremental {
added, removed = calculateDelta(resp.Resources, w.ackedVersionMap)
} else {
// resp.Resources就是snapshot快照里面的内容
for _, resource := range resp.Resources {
added = append(added, *resource)
}
}
msg := &mcp.Resources{
SystemVersionInfo: resp.Version,
Collection: resp.Collection,
Resources: added,
RemovedResources: removed,
Incremental: incremental,
}
// increment nonce
con.streamNonce++
msg.Nonce = strconv.FormatInt(con.streamNonce, 10)
if err := con.stream.Send(msg); err != nil {
con.reporter.RecordSendError(err, status.Code(err))
return err
}
scope.Debugf("MCP: connection %v: SEND collection=%v version=%v nonce=%v inc=%v",
con, resp.Collection, resp.Version, msg.Nonce, msg.Incremental)
// 在向client端发送成功后设置w.pending
// 当client端发送ACK/NACK的时候用于验证
w.pending = msg
return nil
}
1. 将
response
组装成mcp.Resources
发送给client
端.
2. 在向client
端发送成功后设置w.pending
, 当client
端发送ACK/NACK
的时候server
端会在processClientRequest
方法用于判断.
如果是ACK
,会调用UpdateResourceVersionTracking(w.ackedVersionMap, w.pending)
方法更新w.ackedVersionMap
,w.ackedVersionMap
记录着client
端目前保存的内容.
2.5 总结
现在来整体说一下整个流程.
1. 第一次由
client
端发送request
.
2. 然后server
端发送数据给client
端.
3. 然后client
端向server
端发送ACK/NACK
,server
端根据反馈情况做处理. 比如反馈ACK
时会更新w.ackedVersionMap
.
接着
server
端会主动给client
端发送数据, 那何时发数据呢?这个时候就与 [istio源码分析][galley] galley之runtime 中分析的有关, 从 [istio源码分析][galley] galley之runtime 中知道上游source
把数据以事件形式交由runtime
处理后交给p.distributor
处理, 从Start()
方法中知道p.distributor
就是snapshot
.
func NewProcessing(a *settings.Args) *Processing {
d := snapshot.New(groups.IndexFunction)
return &Processing{
args: a,
distributor: d,
configzTopic: configz.CreateTopic(d),
}
}
func (p *Processing) Start() (err error) {
...
p.processor = runtime.NewProcessor(src, p.distributor, &processorCfg)
...
return nil
}
p.distributor
会通过SetSnapshot
// galley/pkg/runtime/processor.go
func (p *Processor) Start() error {
case <-p.stateStrategy.Publish:
scope.Debug("Processor.process: publish")
// 将当前state对象内存中保存的对象建立一个快照
s := p.state.buildSnapshot()
// 该快照将交由distributor处理
p.distributor.SetSnapshot(groups.Default, s)
}
}
// pkg/mcp/snapshot/snapshot.go
func (c *Cache) SetSnapshot(group string, snapshot Snapshot) {
c.mu.Lock()
defer c.mu.Unlock()
// update the existing entry
c.snapshots[group] = snapshot
// trigger existing watches for which version changed
if info, ok := c.status[group]; ok {
info.mu.Lock()
defer info.mu.Unlock()
// 遍历所有的watches
for id, watch := range info.watches {
version := snapshot.Version(watch.request.Collection)
if version != watch.request.VersionInfo {
scope.Infof("SetSnapshot(): respond to watch %d for %v @ version %q",
id, watch.request.Collection, version)
response := &source.WatchResponse{
Collection: watch.request.Collection,
Version: version,
Resources: snapshot.Resources(watch.request.Collection),
Request: watch.request,
}
// 调用push方法
// 将response放入到con.queue中 发送给client端
watch.pushResponse(response)
// discard the responseWatch
delete(info.watches, id)
scope.Debugf("SetSnapshot(): watch %d for %v @ version %q complete",
id, watch.request.Collection, version)
}
}
}
}
那
info.watches
是如何产生的呢?在snapshot
的watch
方法中会更新info.watch
, 每次client
发送ACK/NACK
的时候都会更新info.watch
. 所以当上游有事件产生的时候都会触发SetSnapshot
进而向client
端push
信息.
3. 总结
1.
istio 1.3.6源码
2. https://cloud.tencent.com/developer/article/1409159