1. 前言
转载请说明原文出处, 尊重他人劳动成果!
源码位置: https://github.com/nicktming/istio
分支: tming-v1.3.6 (基于1.3.6版本)
前面两篇文章 [istio源码分析][citadel] citadel之istio_ca 和 [istio源码分析][citadel] citadel之istio_ca(grpc server) 分析了
istio_ca
的serviceaccount controller
和 自定义签名, 以及istio_ca
提供的一个grpc server
服务.
本文将分析
node_agent_k8s
组件的一个使用场景ingressgateway sds
.
2. 例子
例子参考官网的 Secure Gateways(SDS) .
创建完mygateway
和httpbin-credential
之后. 运行一下脚本查看配置文件:
podname=istio-ingressgateway-85bb5b4c57-l2pcz
ns=istio-system
rm lds.json rds.json cds.json eds.json sds.json
istioctl proxy-config listener $podname -n $ns -o json > lds.json
istioctl proxy-config route $podname -n $ns -o json > rds.json
istioctl proxy-config cluster $podname -n $ns -o json > cds.json
istioctl proxy-config endpoint $podname -n $ns -o json > eds.json
istioctl proxy-config secret $podname -n $ns -o json > sds.json
查看
lds.json
{
"name": "0.0.0.0_443",
"address": {
"socketAddress": {
"address": "0.0.0.0",
"portValue": 443
}
},
"filterChains": [
{
"filterChainMatch": {
"serverNames": [
"httpbin.example.com"
]
},
"tlsContext": {
"commonTlsContext": {
"tlsCertificateSdsSecretConfigs": [
{
"name": "httpbin-credential",
"sdsConfig": {
"apiConfigSource": {
"apiType": "GRPC",
"grpcServices": [
{
"googleGrpc": {
"targetUri": "unix:/var/run/ingress_gateway/sds",
"statPrefix": "sdsstat"
}
}
]
}
}
}
],
...
},
"requireClientCertificate": false
},
...
}
然后查看
sds.json
{
"dynamicActiveSecrets": [
{
"name": "httpbin-credential",
"versionInfo": "2020-02-07 06:35:36.286120317 +0000 UTC m=+69968.244929345",
"lastUpdated": "2020-02-07T06:35:36.287Z",
"secret": {
"name": "httpbin-credential",
"tlsCertificate": {
"certificateChain": {
"inlineBytes": "LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUZVekNDQXp1Z0F3SUJBZ0lERUFJU01BMEdDU3FHU0liM0RRRUJDd1VBTUVveEN6QUpCZ05WQkFZVEFsVlQKTVE4d0RRWURWUVFJREFaRVpXNXBZV3d4RERBS0JnTlZCQW9NQTBScGN6RWNNQm9HQTFVRUF3d1RhSFIwY0dKcApiaTVsZUdGdGNHeGxMbU52YlRBZUZ3MHlNREF5TURjd05UUTRNRFZhRncweU1UQXlNVFl3TlRRNE1EVmFNR0F4CkN6QUpCZ05WQkFZVEFsVlRNUTh3RFFZRFZRUUlEQVpFWlc1cFlXd3hGREFTQmdOVkJBY01DMU53Y21sdVoyWnAKWld4a01Rd3dDZ1lEVlFRS0RBTkVhWE14SERBYUJnTlZCQU1NRTJoMGRIQmlhVzR1WlhoaGJYQnNaUzVqYjIwdwpnZ0VpTUEwR0NTcUdTSWIzRFFFQkFRVUFBNElCRHdBd2dnRUtBb0lCQVFDcTVySC9CYnZwZXlacFNmdUU2d09TCi83ZDFIYU1Dek1mQ3E2NmxyYmZVSUhueG5xd0kzemh0OExMVzU1OTVKNk1wZ1FGdEZoNFA4KzhkQVF1TkxQa2IKNkpTZjdIOFpCWnY1NlRKS2pvNEYrVG1aTTFHTExmMzdBZXBsSnQwandFMUxXK3BmODliWHhvYVVSMHg5K2o5ZQpObncyK3RjUEdHSFZNdndEWVVzbGYyM3Z1RnpKckpFZWpudWRTK0FSTTRTL1krY1IreXF3aDVueTJRcjFZS3E4CkVNeWNyS0NGT3JpaElCT3Y1bERRSmRya05ZMFdMRG5QZWNIYTlvd0Y1Nk5BSnJhM2dGQWJuTHpiZ2xOa2NWWjEKTC9rUjF0NGMzV3FURHJhRXRwRHpBTXlMT0RHWkN1N1JBaGdCM2g2b1dkVW9KSGZ3N0hUWXltWWY1VHVtU3F3dApBZ01CQUFHamdnRXFNSUlCSmpBSkJnTlZIUk1FQWpBQU1CRUdDV0NHU0FHRytFSUJBUVFFQXdJR1FEQXpCZ2xnCmhrZ0JodmhDQVEwRUpoWWtUM0JsYmxOVFRDQkhaVzVsY21GMFpXUWdVMlZ5ZG1WeUlFTmxjblJwWm1sallYUmwKTUIwR0ExVWREZ1FXQkJRYmxqaDJaRlZwbWMxRTkzQnpLZS9NdjFMSnNEQ0JqQVlEVlIwakJJR0VNSUdCZ0JRVQpoYzdkdVdrbkdsRUYrdEN4RXFqWU1VNVZBYUZrcEdJd1lERUxNQWtHQTFVRUJoTUNWVk14RHpBTkJnTlZCQWdNCkJrUmxibWxoYkRFVU1CSUdBMVVFQnd3TFUzQnlhVzVuWm1sbGJHUXhEREFLQmdOVkJBb01BMFJwY3pFY01Cb0cKQTFVRUF3d1RhSFIwY0dKcGJpNWxlR0Z0Y0d4bExtTnZiWUlERUFJU01BNEdBMVVkRHdFQi93UUVBd0lGb0RBVApCZ05WSFNVRUREQUtCZ2dyQmdFRkJRY0RBVEFOQmdrcWhraUc5dzBCQVFzRkFBT0NBZ0VBZlVQSjJGYk9vY3h1CmFFQ0tBbVNoeTB4eWJ1RXBCeXV4UGdVWkhnaG1wRmluNVJwUmJoOGMxcTlwd3duVGthcGEyb2lscTBqSmMrZmcKL3Q0TnFVYTZER2RHOHAxZnNlcnB5eXNaQlVXU1JNMktJOWJWTVpzNmNValVIekJ2MVZFekxKdmdUT0k4QjRjUwpyak5jcUc3TXcyVkNRMDl4TE00MVQrOC91R0FkT1IvSEpQOGNKdUp1L0huTjFyOFU5N1JqUnRjMUlHMXlOMlowClBMdGpCd1pGdXpBVGlwN0lnWDFqKzZkaXdiV3VFZjQ4bWNuQ3BNbExjQ2Y1S1o3Z0gzejEwWkcvcFoxRnNURHgKeWlBZndHZDUyMzNmMk5xL1Q3dXIxbWxWY1prSy8zc2QrMjgxRUU1cWpqY05GVDdWY2hDL21FYVNHK2F5UGpJdApLNzdBenlzcnNnOGlyMHhLU2VuSVF2NHRHbytHL09aNzhtSU4vWmQreEVOVnNDazVRN3NQNFloYkZKKzc4WFIvCkJNWTNFaUNEbXlZMm1sbytlL2hjN0ZIM3Uxb3VJc0s0UnJrLzJSRWtYSCtsUk1RYjNCbzJhYkFrY09RaU93ZHkKb2lvVnFOOVZUdkEwMDh6eEp6Mm11Qmp5MzJobnYyRUV4eDRLMUplKzVtZ3lwYkp4MnNOQ2xoMCtTTDE0OWVkcApPMUt3bXNsdGhLTEZhT2RrMEF4b2dxVEpnbjZpYUsrRFRFRk9IMjIxSzUwQld5ZTUvbDJsZHozL0hXd3VMZlMzCndlN0h6dWxIRGV5aXdJdCtqVkhUTkF0Z0VLOGMyTFJkTS9xR3lwTEwrN3AxdjhZYUdJeG1Hc3pYRnFxOGdhREwKZG5IWEY4U3czT2NSRUhxVlA4ai9sbTlXWFY1QVRKUT0KLS0tLS1FTkQgQ0VSVElGSUNBVEUtLS0tLQo="
},
"privateKey": {
"inlineString": "[redacted]"
}
}
}
}
]
}
3. 分析
查看
istio-system/ingressgateway
的详细pod
信息, 可以看到其运行参数如下:
apiVersion: apps/v1
kind: Deployment
...
name: istio-ingressgateway
...
spec:
...
containers:
- env:
- name: ENABLE_WORKLOAD_SDS
value: "false"
- name: ENABLE_INGRESS_GATEWAY_SDS
value: "true"
- name: INGRESS_GATEWAY_NAMESPACE
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: metadata.namespace
image: docker.io/istio/node-agent-k8s:1.4.3
name: ingress-sds
...
volumeMounts:
- mountPath: /var/run/ingress_gateway
name: ingressgatewaysdsudspath
...
volumes:
- emptyDir: {}
name: ingressgatewaysdsudspath
可以看一下两个参数的意义是什么
// The workload SDS mode allows node agent to provision credentials to workload proxy by sending
// CSR to CA.
enableWorkloadSDS = "ENABLE_WORKLOAD_SDS"
enableWorkloadSDSFlag = "enableWorkloadSDS"
// The ingress gateway SDS mode allows node agent to provision credentials to ingress gateway
// proxy by watching kubernetes secrets.
enableIngressGatewaySDS = "ENABLE_INGRESS_GATEWAY_SDS"
enableIngressGatewaySDSFlag = "enableIngressGatewaySDS"
1.
enableWorkloadSDS
是表示agent
可以通过向CA
发送CSR
获得签名从而给workload
身份(key and cert
). 简明的意思是自己生成key
, 然后向citadel
发送签名请求获得签名证书.
2.enableIngressGatewaySDS
是表示agent
可以通过监控secret
从而给ingress gateway
身份(key and cert
). 简明意思是key and cert
需要从k8s
中获得.
本文分析
enableIngressGatewaySDS
的情况.
3. sds service
此处和 [istio源码分析][pilot] pilot之ads 中内容类似.
有一个测试的客户端
security/pkg/testing/sdsc/sdsclient.go
, 所以这里就以客户端为切入点.
// security/pkg/testing/sdsc/sdsclient.go
func constructSDSRequestContext() (context.Context, error) {
// Read from the designated location for Kubernetes JWT.
content, err := ioutil.ReadFile(authn_model.K8sSATrustworthyJwtFileName)
...
// 注意是/var/run/secrets/tokens/istio-token里的内容
md := metadata.New(map[string]string{
authn_model.K8sSAJwtTokenHeaderKey: string(content),
})
return metadata.NewOutgoingContext(context.Background(), md), nil
}
func NewClient(opt ClientOptions) (*Client, error) {
conn, err := grpc.Dial(opt.ServerAddress, grpc.WithInsecure())
...
client := sds.NewSecretDiscoveryServiceClient(conn)
ctx, err := constructSDSRequestContext()
...
stream, err := client.StreamSecrets(ctx)
...
return &Client{
stream: stream,
conn: conn,
updateChan: make(chan xdsapi.DiscoveryResponse, 1),
serverAddress: opt.ServerAddress,
}, nil
}
看
server
端:
func (s *sdsservice) StreamSecrets(stream sds.SecretDiscoveryService_StreamSecretsServer) error {
...
reqChannel := make(chan *xdsapi.DiscoveryRequest, 1)
con := newSDSConnection(stream)
// 从客户端接收信息
go receiveThread(con, reqChannel, &receiveError)
var node *core.Node
for {
// Block until a request is received.
select {
case discReq, ok := <-reqChannel:
...
if con.conID == "" {
// first request
...
// 添加到sdsclient
addConn(key, con)
}
...
// 如果nodeagent的cache secret可以匹配请求中的内容<token, resourceName, Version> 那表明这是一个ACK请求
if discReq.VersionInfo != "" && s.st.SecretExist(conID, resourceName, token, discReq.VersionInfo) {
sdsServiceLog.Debugf("%s received SDS ACK from proxy %q, version info %q, "+
"error details %s\n", conIDresourceNamePrefix, discReq.Node.Id, discReq.VersionInfo,
discReq.ErrorDetail.GoString())
continue
}
...
// 在ingress gateway agent模式, 如果第一个sds请求已经收到但是k8s中还没有对应的secret, 在发送response之前先等待一下这个secret
if s.st.ShouldWaitForIngressGatewaySecret(conID, resourceName, token) {
...
continue
}
// 获取secret 如果是ingress gateway agent模式, 那该secret是从k8s中获得
// 如果不是 则自己生成并请求签名
secret, err := s.st.GenerateSecret(ctx, conID, resourceName, token)
...
con.mutex.Lock()
con.secret = secret
con.mutex.Unlock()
// 向客户端发送response
if err := pushSDS(con); err != nil {
...
return err
}
case <-con.pushChannel:
// server端主动向client端push的信号
...
// 向客户端发送response
if err := pushSDS(con); err != nil {
...
return err
}
}
}
}
这里主要有两个分支:
1.<-reqChannel:
这里的来源是通过receiveThread
得到client
端的请求并放入到reqChannel
中.1.1 如果是第一次请求, 则通过
addConn
方法加入到sdsclient
中.
1.2 如果nodeagent
的cache secret
可以匹配请求中的内容<token, resourceName, Version>
那表明这是一个ACK
请求.
1.3 在ingress gateway agent
模式, 如果sds
请求已经收到但是k8s
中还没有对应的secret
或者被删除, 在发送response
给client
端之前先等待一下这个secret
被创建.
1.4 通过s.st.GenerateSecret
获得对应的secret
, 如果是ingress gateway agent
模式, 那该secret
是从k8s
中获得. 如果不是, 则自己生成并请求签名.
1.5 调用pushSDS
向客户端发送response
.
2.
<-con.pushChannel:
代表的是server
端主动向client
端push
的信号, 然后直接通过pushSDS
发送当前存在在con
里的内容.
func NotifyProxy(connKey cache.ConnKey, secret *model.SecretItem) error {
conIDresourceNamePrefix := sdsLogPrefix(connKey.ConnectionID, connKey.ResourceName)
sdsClientsMutex.Lock()
conn := sdsClients[connKey]
...
conn.secret = secret
conn.pushChannel <- &sdsEvent{}
return nil
}
可以看到
NotifyProxy
方法传入了secret
, 所以上游可以根据secret
的变化来调用NotifyProxy
方法来主动push
信息到client
端.
4. secretFetcher 和 secretCache
4.1 secretFetcher
func NewSecretFetcher(ingressGatewayAgent bool, endpoint, caProviderName string, tlsFlag bool,
tlsRootCert []byte, vaultAddr, vaultRole, vaultAuthPath, vaultSignCsrPath string) (*SecretFetcher, error) {
ret := &SecretFetcher{}
if ingressGatewayAgent {
// 如果是ingress gateway模式, 则监控k8s中的secret并从中获取信息
ret.UseCaClient = false
cs, err := kube.CreateClientset("", "")
...
ret.FallbackSecretName = ingressFallbackSecret
secretFetcherLog.Debugf("SecretFetcher set fallback secret name %s", ret.FallbackSecretName)
ret.InitWithKubeClient(cs.CoreV1())
} else {
// 如果是workload agent模式, 则创建ca client 从citadel中获得签名证书等
caClient, err := ca.NewCAClient(endpoint, caProviderName, tlsFlag, tlsRootCert,
vaultAddr, vaultRole, vaultAuthPath, vaultSignCsrPath)
...
ret.UseCaClient = true
ret.CaClient = caClient
}
return ret, nil
}
func (sf *SecretFetcher) InitWithKubeClient(core corev1.CoreV1Interface) { // nolint:interfacer
...
sf.scrtStore, sf.scrtController =
cache.NewInformer(scrtLW, &v1.Secret{}, resyncPeriod, cache.ResourceEventHandlerFuncs{
AddFunc: sf.scrtAdded,
DeleteFunc: sf.scrtDeleted,
UpdateFunc: sf.scrtUpdated,
})
...
}
scrtAdded
,scrtDeleted
和scrtUpdated
在获取secret
的key and cert
信息后会通过sf.AddCache
,sf.DeleteCache
和sf.UpdateCache
来保存到cache
中.
var (
...
rootCmd = &cobra.Command{
Use: "nodeagent",
Short: "Citadel agent",
RunE: func(c *cobra.Command, args []string) error {
...
workloadSecretCache, gatewaySecretCache := newSecretCache(serverOptions)
...
server, err := sds.NewServer(serverOptions, workloadSecretCache, gatewaySecretCache)
defer server.Stop()
...
},
}
)
func newSecretCache(serverOptions sds.Options) (workloadSecretCache, gatewaySecretCache *cache.SecretCache) {
...
if serverOptions.EnableIngressGatewaySDS {
gSecretFetcher, err := secretfetcher.NewSecretFetcher(true, "", "", false, nil, "", "", "", "")
...
gatewaySecretChan = make(chan struct{})
gSecretFetcher.Run(gatewaySecretChan)
gatewaySecretCache = cache.NewSecretCache(gSecretFetcher, sds.NotifyProxy, gatewaySdsCacheOptions)
} else {
gatewaySecretCache = nil
}
return workloadSecretCache, gatewaySecretCache
}
func NewSecretCache(fetcher *secretfetcher.SecretFetcher, notifyCb func(ConnKey, *model.SecretItem) error, options Options) *SecretCache {
...
fetcher.AddCache = ret.UpdateK8sSecret
fetcher.DeleteCache = ret.DeleteK8sSecret
fetcher.UpdateCache = ret.UpdateK8sSecret
...
return ret
}
func (sc *SecretCache) UpdateK8sSecret(secretName string, ns model.SecretItem) {
...
sc.secrets.Range(func(k interface{}, v interface{}) bool {
...
if connKey.ResourceName == secretName {
...
go func() {
...
sc.callbackWithTimeout(connKey, newSecret)
}()
return false
}
return true
})
...
}
func (sc *SecretCache) callbackWithTimeout(connKey ConnKey, secret *model.SecretItem) {
...
go func() {
...
if sc.notifyCallback != nil {
if err := sc.notifyCallback(connKey, secret); err != nil {
...
}
} ...
}()
select {
...
}
}
k8s-apiserver
->SecretFetcher.scrtAdded
->SecretCache.UpdateK8sSecret(SecretFetcher.AddCache)
->sc.callbackWithTimeout
->sc.notifyCallback(NotifyProxy)
.