1. 前言
转载请说明原文出处, 尊重他人劳动成果!
源码位置: https://github.com/nicktming/istio
分支: tming-v1.3.6 (基于1.3.6版本)
在前一篇文章 [istio源码分析][pilot] pilot之DiscoveryServer 中已经分析了
DiscoveryServer
拿到galley
和k8s
的数据后向每一个连接的client
端(其实是envoy
) 发送了一个XdsEvent
到client.pushChannel
.
本文将分析从
client
到discoveryServer
端之间的联系. 主要是分析整体的流程, 关于细节方面的东西可以在具体问题中进行调试即可.
2. adsc
adsc
是istio
模拟的一个client
端, 真正的客户端是sidecar
中的envoy
. 这里为了方便, 因此用adsc
进行分析.
// pkg/adsc/adsc.go
func (a *ADSC) Run() error {
var err error
if len(a.certDir) > 0 {
...
a.conn, err = grpc.Dial(a.url, opts...)
...
} else {
a.conn, err = grpc.Dial(a.url, grpc.WithInsecure())
...
}
// 建立连接
xds := ads.NewAggregatedDiscoveryServiceClient(a.conn)
edsstr, err := xds.StreamAggregatedResources(context.Background())
if err != nil {
return err
}
a.stream = edsstr
// 从discovery server接收信息
go a.handleRecv()
return nil
}
func (a *ADSC) handleRecv() {
for {
// 从server端获得信息
msg, err := a.stream.Recv()
...
listeners := []*xdsapi.Listener{}
clusters := []*xdsapi.Cluster{}
routes := []*xdsapi.RouteConfiguration{}
eds := []*xdsapi.ClusterLoadAssignment{}
// 为获得的数据分类
for _, rsc := range msg.Resources { // Any
a.VersionInfo[rsc.TypeUrl] = msg.VersionInfo
valBytes := rsc.Value
if rsc.TypeUrl == listenerType {
ll := &xdsapi.Listener{}
_ = proto.Unmarshal(valBytes, ll)
listeners = append(listeners, ll)
} ...
}
...
// 向server端发送ACK
a.ack(msg)
...
// 客户端处理listener, cluster, endpoint, route
...
}
}
func (a *ADSC) ack(msg *xdsapi.DiscoveryResponse) {
_ = a.stream.Send(&xdsapi.DiscoveryRequest{
ResponseNonce: msg.Nonce,
TypeUrl: msg.TypeUrl,
Node: a.node(),
VersionInfo: msg.VersionInfo,
})
}
1. 与
discoveryServer
建立连接.
2. 通过handleRecv
方法与server
端交流.
3. 将获得的数据分类(endpoint, listener, cluster, route
)
4. 往server
端发送ack
.
5. 客户端自己处理数据.
那
envoy
就是数据生成的配置文件了,xds
协议.
3. server端
// StreamAggregatedResources implements the ADS interface.
func (s *DiscoveryServer) StreamAggregatedResources(stream ads.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error {
peerInfo, ok := peer.FromContext(stream.Context())
peerAddr := "0.0.0.0"
if ok {
peerAddr = peerInfo.Addr.String()
}
t0 := time.Now()
err := s.globalPushContext().InitContext(s.Env)
...
// 创建一个XdsConnection
con := newXdsConnection(peerAddr, stream)
var receiveError error
reqChannel := make(chan *xdsapi.DiscoveryRequest, 1)
// 接收con对应的client的请求
go receiveThread(con, reqChannel, &receiveError)
node := &core.Node{}
for {
// Block until either a request is received or a push is triggered.
select {
case discReq, ok := <-reqChannel:
if !ok {
// Remote side closed connection.
return receiveError
}
// This should be only set for the first request. Guard with ID check regardless.
if discReq.Node != nil && discReq.Node.Id != "" {
node = discReq.Node
err = s.initConnectionNode(discReq.Node, con)
if err != nil {
return err
}
}
switch discReq.TypeUrl {
case ClusterType:
...
case ListenerType:
...
case RouteType:
...
case EndpointType:
...
default:
adsLog.Warnf("ADS: Unknown watched resources %s", discReq.String())
}
con.mu.Lock()
if !con.added {
con.added = true
con.mu.Unlock()
// 添加到xdsclient中
s.addCon(con.ConID, con)
defer s.removeCon(con.ConID, con)
} else {
con.mu.Unlock()
}
case pushEv := <-con.pushChannel:
err := s.pushConnection(con, pushEv)
pushEv.done()
if err != nil {
return nil
}
}
}
}
可以看到
select
中有两个case
.
1.discReq, ok := <-reqChannel:
是从client
来的,receiveThread
方法会看到.
2.pushEv := <-con.pushChannel:
是从k8s
和galley
中来的, 具体可以参考上文 [istio源码分析][pilot] pilot之DiscoveryServer .
3.1 第一个分支
这里先看一下第一个分支的操作.
receiveThread
func receiveThread(con *XdsConnection, reqChannel chan *xdsapi.DiscoveryRequest, errP *error) {
defer close(reqChannel) // indicates close of the remote side.
for {
// 从client端接收信息
req, err := con.stream.Recv()
...
select {
// 将req转到reqChannel中
case reqChannel <- req:
case <-con.stream.Context().Done():
adsLog.Errorf("ADS: %q %s terminated with stream closed", con.PeerAddr, con.ConID)
return
}
}
}
1. 从
client
端接收的req
直接放入到reqChannel
中, 所以会进入到StreamAggregatedResources
中的第一个分支.
2. 以ClusterType
为例:
case ClusterType:
if con.CDSWatch {
// Already received a cluster watch request, this is an ACK
if discReq.ErrorDetail != nil {
adsLog.Warnf("ADS:CDS: ACK ERROR %v %s (%s) %v", peerAddr, con.ConID, con.modelNode.ID, discReq.String())
errCode := codes.Code(discReq.ErrorDetail.Code)
incrementXDSRejects(cdsReject, node.Id, errCode.String())
} else if discReq.ResponseNonce != "" {
con.ClusterNonceAcked = discReq.ResponseNonce
}
adsLog.Debugf("ADS:CDS: ACK %s %s (%s) %s %s", peerAddr, con.ConID, con.modelNode.ID, discReq.VersionInfo, discReq.ResponseNonce)
continue
}
// CDS REQ is the first request an envoy makes. This shows up
// immediately after connect. It is followed by EDS REQ as
// soon as the CDS push is returned.
adsLog.Infof("ADS:CDS: REQ %v %s %v version:%s", peerAddr, con.ConID, time.Since(t0), discReq.VersionInfo)
con.CDSWatch = true
err := s.pushCds(con, s.globalPushContext(), versionInfo())
if err != nil {
return err
}
1. 可以看到除了是第一次会调用
pushCds
方法, 后面都是打印ACK/NACK
信息. 通过一个变量con.CDSWatch
进行控制.
2. 另外通过s.addCon(con.ConID, con)
将此连接加入到adsClients
, 这个在startPush
方法需要分发给所有的client
端的时候用到的. 具体可以参考上文 [istio源码分析][pilot] pilot之DiscoveryServer .
pushCds
// pilot/pkg/proxy/envoy/v2/cds.go
func (s *DiscoveryServer) pushCds(con *XdsConnection, push *model.PushContext, version string) error {
pushStart := time.Now()
// 发送给该envoy (con.modelNode)
// 内容在push中
// 根据client信息和内容生成要发送的clusters集合
rawClusters := s.generateRawClusters(con.modelNode, push)
...
// 构造response并发送给该client(envoy)
response := con.clusters(rawClusters)
err := con.send(response)
...
return nil
}
func (s *DiscoveryServer) generateRawClusters(node *model.Proxy, push *model.PushContext) []*xdsapi.Cluster {
rawClusters := s.ConfigGenerator.BuildClusters(s.Env, node, push)
...
return rawClusters
}
1.
client
端(envoy
)信息是con.modelNode
, 内容为push
, 根据此两个信息传入到generateRawClusters
中生成cluster
集合. 实际上是通过s.ConfigGenerator.BuildClusters
方法,
type ConfigGenerator interface {
BuildListeners(env *model.Environment, node *model.Proxy, push *model.PushContext) []*v2.Listener
BuildClusters(env *model.Environment, node *model.Proxy, push *model.PushContext) []*v2.Cluster
BuildHTTPRoutes(env *model.Environment, node *model.Proxy, push *model.PushContext, routeNames []string) []*v2.RouteConfiguration
}
这个部分的内容就是根据当前的内容生成
envoy
所接受的cluster
,endpoint
和route
.
2. 向
client
端(envoy
)发送数据.
3.2 第二个分支
这里分析
pushEv := <-con.pushChannel:
, 这个分支是从configController
和ServiceController
过来的, 具体可以参考上文 [istio源码分析][pilot] pilot之DiscoveryServer .
func (s *DiscoveryServer) pushConnection(con *XdsConnection, pushEv *XdsEvent) error {
...
if con.CDSWatch {
err := s.pushCds(con, pushEv.push, currentVersion)
...
}
if len(con.Clusters) > 0 {
err := s.pushEds(pushEv.push, con, currentVersion, nil)
...
}
if con.LDSWatch {
err := s.pushLds(con, pushEv.push, currentVersion)
...
}
if len(con.Routes) > 0 {
err := s.pushRoute(con, pushEv.push, currentVersion)
...
}
...
return nil
}
很多细节部分省略了很多, 可以看到最终是往此
client
端发送cluster
,endpoint
,route
和listener
.
4. 总结
5. 参考
1.
istio 1.3.6源码