1// Copyright 2019 Istio Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15// Package sds implements secret discovery service in NodeAgent. 16package sds 17 18import ( 19 "context" 20 "encoding/json" 21 "fmt" 22 "io" 23 "io/ioutil" 24 "strconv" 25 "sync" 26 "sync/atomic" 27 "time" 28 29 "istio.io/istio/security/pkg/nodeagent/util" 30 31 xdsapi "github.com/envoyproxy/go-control-plane/envoy/api/v2" 32 authapi "github.com/envoyproxy/go-control-plane/envoy/api/v2/auth" 33 core "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" 34 sds "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2" 35 "github.com/golang/protobuf/ptypes" 36 "google.golang.org/grpc" 37 "google.golang.org/grpc/codes" 38 "google.golang.org/grpc/metadata" 39 "google.golang.org/grpc/status" 40 41 "istio.io/istio/security/pkg/nodeagent/cache" 42 "istio.io/istio/security/pkg/nodeagent/model" 43 "istio.io/pkg/log" 44) 45 46const ( 47 // SecretType is used for secret discovery service to construct response. 48 SecretType = "type.googleapis.com/envoy.api.v2.auth.Secret" 49 50 // credentialTokenHeaderKey is the header key in gPRC header which is used to 51 // pass credential token from envoy's SDS request to SDS service. 52 credentialTokenHeaderKey = "authorization" 53 54 // K8sSAJwtTokenHeaderKey is the request header key for k8s jwt token. 55 // Binary header name must has suffix "-bin", according to https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md. 56 // Same value defined in pilot pkg(k8sSAJwtTokenHeaderKey) 57 k8sSAJwtTokenHeaderKey = "istio_sds_credentials_header-bin" 58) 59 60var ( 61 sdsClients = map[cache.ConnKey]*sdsConnection{} 62 staledClientKeys = map[cache.ConnKey]bool{} 63 sdsClientsMutex sync.RWMutex 64 65 // Tracks connections, increment on each new connection. 66 connectionNumber = int64(0) 67 sdsServiceLog = log.RegisterScope("sds", "SDS service debugging", 0) 68) 69 70type discoveryStream interface { 71 Send(*xdsapi.DiscoveryResponse) error 72 Recv() (*xdsapi.DiscoveryRequest, error) 73 grpc.ServerStream 74} 75 76// sdsEvent represents a secret event that results in a push. 77type sdsEvent struct{} 78 79type sdsConnection struct { 80 // Time of connection, for debugging. 81 Connect time.Time 82 83 // The ID of proxy from which the connection comes from. 84 proxyID string 85 86 // The ResourceName of the SDS request. 87 ResourceName string 88 89 // Sending on this channel results in push. 90 pushChannel chan *sdsEvent 91 92 // SDS streams implement this interface. 93 stream discoveryStream 94 95 // The secret associated with the proxy. 96 secret *model.SecretItem 97 98 // Mutex to protect read/write to this connection 99 // TODO(JimmyCYJ): Move all read/write into member function with lock protection to avoid race condition. 100 mutex sync.RWMutex 101 102 // ConID is the connection identifier, used as a key in the connection table. 103 // Currently based on the node name and a counter. 104 conID string 105 106 // Time of the recent SDS push. Will be reset to zero when a new SDS request is received. A 107 // non-zero time indicates that the connection is waiting for SDS request. 108 sdsPushTime time.Time 109} 110 111type sdsservice struct { 112 st cache.SecretManager 113 114 ticker *time.Ticker 115 tickerInterval time.Duration 116 117 // close channel. 118 closing chan bool 119 120 // skipToken indicates whether token is required. 121 skipToken bool 122 123 fileMountedCertsOnly bool 124 125 localJWT bool 126 127 jwtPath string 128 129 outputKeyCertToDir string 130} 131 132// ClientDebug represents a single SDS connection to the ndoe agent 133type ClientDebug struct { 134 ConnectionID string `json:"connection_id"` 135 ProxyID string `json:"proxy"` 136 ResourceName string `json:"resource_name"` 137 138 // fields from secret item 139 CertificateChain string `json:"certificate_chain"` 140 RootCert string `json:"root_cert"` 141 CreatedTime string `json:"created_time"` 142 ExpireTime string `json:"expire_time"` 143} 144 145// Debug represents all clients connected to this node agent endpoint and their supplied secrets 146type Debug struct { 147 Clients []ClientDebug `json:"clients"` 148} 149 150// newSDSService creates Secret Discovery Service which implements envoy v2 SDS API. 151func newSDSService(st cache.SecretManager, skipTokenVerification, localJWT, fileMountedCertsOnly bool, 152 recycleInterval time.Duration, jwtPath, outputKeyCertToDir string) *sdsservice { 153 if st == nil { 154 return nil 155 } 156 157 ret := &sdsservice{ 158 st: st, 159 skipToken: skipTokenVerification, 160 fileMountedCertsOnly: fileMountedCertsOnly, 161 tickerInterval: recycleInterval, 162 closing: make(chan bool), 163 localJWT: localJWT, 164 jwtPath: jwtPath, 165 outputKeyCertToDir: outputKeyCertToDir, 166 } 167 168 go ret.clearStaledClientsJob() 169 170 return ret 171} 172 173// register adds the SDS handle to the grpc server 174func (s *sdsservice) register(rpcs *grpc.Server) { 175 sds.RegisterSecretDiscoveryServiceServer(rpcs, s) 176} 177 178// DebugInfo serializes the current sds client data into JSON for the debug endpoint 179func (s *sdsservice) DebugInfo() (string, error) { 180 sdsClientsMutex.RLock() 181 defer sdsClientsMutex.RUnlock() 182 clientDebug := make([]ClientDebug, 0) 183 for connKey, conn := range sdsClients { 184 // it's possible for the connection to be established without an instantiated secret 185 if conn.secret == nil { 186 continue 187 } 188 189 conn.mutex.RLock() 190 c := ClientDebug{ 191 ConnectionID: connKey.ConnectionID, 192 ProxyID: conn.proxyID, 193 ResourceName: conn.ResourceName, 194 CertificateChain: string(conn.secret.CertificateChain), 195 RootCert: string(conn.secret.RootCert), 196 CreatedTime: conn.secret.CreatedTime.Format(time.RFC3339), 197 ExpireTime: conn.secret.ExpireTime.Format(time.RFC3339), 198 } 199 clientDebug = append(clientDebug, c) 200 conn.mutex.RUnlock() 201 } 202 203 debug := Debug{ 204 Clients: clientDebug, 205 } 206 debugJSON, err := json.MarshalIndent(debug, " ", " ") 207 if err != nil { 208 return "", err 209 } 210 211 return string(debugJSON), nil 212} 213 214func (s *sdsservice) DeltaSecrets(stream sds.SecretDiscoveryService_DeltaSecretsServer) error { 215 return status.Error(codes.Unimplemented, "DeltaSecrets not implemented") 216} 217 218// StreamSecrets serves SDS discovery requests and SDS push requests 219func (s *sdsservice) StreamSecrets(stream sds.SecretDiscoveryService_StreamSecretsServer) error { 220 token := "" 221 ctx := context.Background() 222 223 var receiveError error 224 reqChannel := make(chan *xdsapi.DiscoveryRequest, 1) 225 con := newSDSConnection(stream) 226 227 go receiveThread(con, reqChannel, &receiveError) 228 229 var node *core.Node 230 for { 231 // Block until a request is received. 232 select { 233 case discReq, ok := <-reqChannel: 234 if !ok { 235 // Remote side closed connection. 236 sdsServiceLog.Errorf("Remote side closed connection") 237 return receiveError 238 } 239 240 if discReq.Node == nil { 241 discReq.Node = node 242 } else { 243 node = discReq.Node 244 } 245 246 resourceName, err := getResourceName(discReq) 247 if err != nil { 248 sdsServiceLog.Errorf("Close connection. Error: %v", err) 249 return err 250 } 251 252 if resourceName == "" { 253 sdsServiceLog.Infof("Received empty resource name from %q. No need to respond", discReq.Node.Id) 254 continue 255 } 256 257 var firstRequestFlag bool 258 con.mutex.Lock() 259 if con.conID == "" { 260 // first request 261 if discReq.Node == nil || len(discReq.Node.Id) == 0 { 262 sdsServiceLog.Errorf("%s close connection. Missing Node ID in the first request", 263 sdsLogPrefix(resourceName)) 264 return fmt.Errorf("missing Node ID in the first request") 265 } 266 con.conID = constructConnectionID(discReq.Node.Id) 267 con.proxyID = discReq.Node.Id 268 con.ResourceName = resourceName 269 key := cache.ConnKey{ 270 ResourceName: resourceName, 271 ConnectionID: con.conID, 272 } 273 addConn(key, con) 274 firstRequestFlag = true 275 sdsServiceLog.Infof("%s new connection", sdsLogPrefix(resourceName)) 276 } 277 conID := con.conID 278 279 // Reset SDS push time for new SDS push. 280 con.sdsPushTime = time.Time{} 281 con.mutex.Unlock() 282 defer recycleConnection(conID, resourceName) 283 284 conIDresourceNamePrefix := sdsLogPrefix(resourceName) 285 if s.localJWT { 286 // Running in-process, no need to pass the token from envoy to agent as in-context - use the file 287 tok, err := ioutil.ReadFile(s.jwtPath) 288 if err != nil { 289 sdsServiceLog.Errorf("Failed to get credential token: %v", err) 290 return err 291 } 292 token = string(tok) 293 } else if s.outputKeyCertToDir != "" { 294 // Using existing certs and the new SDS - skipToken case is for the old node agent. 295 } else if !s.skipToken { 296 ctx = stream.Context() 297 t, err := getCredentialToken(ctx) 298 if err != nil { 299 sdsServiceLog.Errorf("%s Close connection. Failed to get credential token from "+ 300 "incoming request: %v", conIDresourceNamePrefix, err) 301 return err 302 } 303 token = t 304 } 305 306 // Update metrics. 307 totalActiveConnCounts.Increment() 308 if discReq.ErrorDetail != nil { 309 totalSecretUpdateFailureCounts.Increment() 310 sdsServiceLog.Errorf("%s received error: %v. Will not respond until next secret update", 311 conIDresourceNamePrefix, discReq.ErrorDetail) 312 continue 313 } 314 // When nodeagent receives StreamSecrets request, if there is cached secret which matches 315 // request's <token, resourceName, Version>, then this request is a confirmation request. 316 // nodeagent stops sending response to envoy in this case. 317 if discReq.VersionInfo != "" && s.st.SecretExist(conID, resourceName, token, discReq.VersionInfo) { 318 sdsServiceLog.Debugf("%s received SDS ACK from proxy %q, version info %q, "+ 319 "error details %s\n", conIDresourceNamePrefix, discReq.Node.Id, discReq.VersionInfo, 320 discReq.ErrorDetail) 321 continue 322 } 323 324 sdsServiceLog.Debugf("%s received SDS request from proxy %q, first request: %v, version info %q, "+ 325 "error details %s\n", conIDresourceNamePrefix, discReq.Node.Id, firstRequestFlag, discReq.VersionInfo, 326 discReq.ErrorDetail) 327 328 // In ingress gateway agent mode, if the first SDS request is received but Ingress gateway secret which is 329 // provisioned as kubernetes secret is not ready, wait for secret before sending SDS response. 330 // If a kubernetes secret was deleted by operator, wait for a new kubernetes secret before sending SDS response. 331 // If workload uses file mounted certs i.e. "FILE_MOUNTED_CERTS" is set to true, workdload loads certificates from 332 // mounted certificate paths and it does not depend on the presence of ingress gateway secret so 333 // we should skip waiting for it in that mode. 334 // File mounted certs for gateways is used in scenarios where an existing PKI infrastuctures delivers certificates 335 // to pods/VMs via files. 336 if s.st.ShouldWaitForIngressGatewaySecret(conID, resourceName, token, s.fileMountedCertsOnly) { 337 sdsServiceLog.Warnf("%s waiting for ingress gateway secret for proxy %q\n", conIDresourceNamePrefix, discReq.Node.Id) 338 continue 339 } else { 340 sdsServiceLog.Infof("Skipping waiting for ingress gateway secret") 341 } 342 343 secret, err := s.st.GenerateSecret(ctx, conID, resourceName, token) 344 if err != nil { 345 sdsServiceLog.Errorf("%s Close connection. Failed to get secret for proxy %q from "+ 346 "secret cache: %v", conIDresourceNamePrefix, discReq.Node.Id, err) 347 return err 348 } 349 350 // Output the key and cert to a directory, if some applications need to read them from local file system. 351 if err = util.OutputKeyCertToDir(s.outputKeyCertToDir, secret.PrivateKey, 352 secret.CertificateChain, secret.RootCert); err != nil { 353 sdsServiceLog.Errorf("(%v, %v) error when output the key and cert: %v", 354 conIDresourceNamePrefix, discReq.Node.Id, err) 355 return err 356 } 357 358 // Remove the secret from cache, otherwise refresh job will process this item(if envoy fails to reconnect) 359 // and cause some confusing logs like 'fails to notify because connection isn't found'. 360 defer s.st.DeleteSecret(conID, resourceName) 361 362 con.mutex.Lock() 363 con.secret = secret 364 con.mutex.Unlock() 365 366 if err := pushSDS(con); err != nil { 367 sdsServiceLog.Errorf("%s Close connection. Failed to push key/cert to proxy %q: %v", 368 conIDresourceNamePrefix, discReq.Node.Id, err) 369 return err 370 } 371 case <-con.pushChannel: 372 con.mutex.RLock() 373 proxyID := con.proxyID 374 conID := con.conID 375 resourceName := con.ResourceName 376 secret := con.secret 377 con.mutex.RUnlock() 378 conIDresourceNamePrefix := sdsLogPrefix(resourceName) 379 sdsServiceLog.Debugf("%s received push channel request for proxy %q", conIDresourceNamePrefix, proxyID) 380 381 if secret == nil { 382 defer func() { 383 recycleConnection(conID, resourceName) 384 s.st.DeleteSecret(conID, resourceName) 385 }() 386 387 // Secret is nil indicates close streaming to proxy, so that proxy 388 // could connect again with updated token. 389 // When nodeagent stops stream by sending envoy error response, it's Ok not to remove secret 390 // from secret cache because cache has auto-evication. 391 sdsServiceLog.Debugf("%s close connection for proxy %q", conIDresourceNamePrefix, proxyID) 392 return fmt.Errorf("%s Close connection to proxy %q", conIDresourceNamePrefix, conID) 393 } 394 395 if err := pushSDS(con); err != nil { 396 sdsServiceLog.Errorf("%s Close connection. Failed to push key/cert to proxy %q: %v", 397 conIDresourceNamePrefix, proxyID, err) 398 return err 399 } 400 sdsServiceLog.Infoa("Dynamic push for secret ", resourceName) 401 } 402 } 403} 404 405// FetchSecrets generates and returns secret from SecretManager in response to DiscoveryRequest 406func (s *sdsservice) FetchSecrets(ctx context.Context, discReq *xdsapi.DiscoveryRequest) (*xdsapi.DiscoveryResponse, error) { 407 token := "" 408 if s.localJWT { 409 // Running in-process, no need to pass the token from envoy to agent as in-context - use the file 410 tok, err := ioutil.ReadFile(s.jwtPath) 411 if err != nil { 412 sdsServiceLog.Errorf("Failed to get credential token: %v", err) 413 return nil, err 414 } 415 token = string(tok) 416 } else if !s.skipToken { 417 t, err := getCredentialToken(ctx) 418 if err != nil { 419 sdsServiceLog.Errorf("Failed to get credential token: %v", err) 420 return nil, err 421 } 422 token = t 423 } 424 425 resourceName, err := getResourceName(discReq) 426 if err != nil { 427 sdsServiceLog.Errorf("Close connection. Error: %v", err) 428 return nil, err 429 } 430 431 connID := constructConnectionID(discReq.Node.Id) 432 secret, err := s.st.GenerateSecret(ctx, connID, resourceName, token) 433 if err != nil { 434 sdsServiceLog.Errorf("Failed to get secret for proxy %q from secret cache: %v", connID, err) 435 return nil, err 436 } 437 438 // Output the key and cert to a directory, if some applications need to read them from local file system. 439 if err = util.OutputKeyCertToDir(s.outputKeyCertToDir, secret.PrivateKey, 440 secret.CertificateChain, secret.RootCert); err != nil { 441 sdsServiceLog.Errorf("(%v) error when output the key and cert: %v", 442 connID, err) 443 return nil, err 444 } 445 return sdsDiscoveryResponse(secret, resourceName) 446} 447 448func (s *sdsservice) Stop() { 449 s.closing <- true 450} 451 452func (s *sdsservice) clearStaledClientsJob() { 453 s.ticker = time.NewTicker(s.tickerInterval) 454 for { 455 select { 456 case <-s.ticker.C: 457 clearStaledClients() 458 case <-s.closing: 459 if s.ticker != nil { 460 s.ticker.Stop() 461 } 462 } 463 } 464} 465 466func clearStaledClients() { 467 sdsServiceLog.Debug("start staled connection cleanup job") 468 sdsClientsMutex.Lock() 469 defer sdsClientsMutex.Unlock() 470 471 for connKey := range staledClientKeys { 472 sdsServiceLog.Debugf("remove staled clients %+v", connKey) 473 delete(sdsClients, connKey) 474 delete(staledClientKeys, connKey) 475 // totalStaleConnCounts should be 0 when the for loop finishes. 476 totalStaleConnCounts.Decrement() 477 } 478} 479 480// NotifyProxy sends notification to proxy about secret update, 481// SDS will close streaming connection if secret is nil. 482func NotifyProxy(connKey cache.ConnKey, secret *model.SecretItem) error { 483 conIDresourceNamePrefix := sdsLogPrefix(connKey.ResourceName) 484 sdsClientsMutex.Lock() 485 conn := sdsClients[connKey] 486 if conn == nil { 487 sdsClientsMutex.Unlock() 488 sdsServiceLog.Errorf("%s NotifyProxy failed. No connection with id %q can be found", 489 conIDresourceNamePrefix, connKey.ConnectionID) 490 return fmt.Errorf("no connection with id %q can be found", connKey.ConnectionID) 491 } 492 conn.mutex.Lock() 493 conn.secret = secret 494 conn.mutex.Unlock() 495 sdsClientsMutex.Unlock() 496 497 conn.pushChannel <- &sdsEvent{} 498 return nil 499} 500 501func recycleConnection(conID, resourceName string) { 502 key := cache.ConnKey{ 503 ConnectionID: conID, 504 ResourceName: resourceName, 505 } 506 507 sdsClientsMutex.Lock() 508 defer sdsClientsMutex.Unlock() 509 510 // Only add connection key to staledClientKeys if it's not there already. 511 // The recycleConnection function may be triggered more than once for each connection key. 512 // https://github.com/istio/istio/issues/15306#issuecomment-509783105 513 if _, found := staledClientKeys[key]; found { 514 return 515 } 516 517 staledClientKeys[key] = true 518 519 totalStaleConnCounts.Increment() 520 totalActiveConnCounts.Decrement() 521} 522 523func getResourceName(discReq *xdsapi.DiscoveryRequest) (string /*resourceName*/, error) { 524 if len(discReq.ResourceNames) == 0 { 525 return "", nil 526 } 527 528 if len(discReq.ResourceNames) == 1 { 529 return discReq.ResourceNames[0], nil 530 } 531 532 return "", fmt.Errorf("discovery request %+v has more than one resourceNames %+v", discReq, discReq.ResourceNames) 533} 534 535func getCredentialToken(ctx context.Context) (string, error) { 536 metadata, ok := metadata.FromIncomingContext(ctx) 537 if !ok { 538 return "", fmt.Errorf("unable to get metadata from incoming context") 539 } 540 541 // Get credential token from request k8sSAJwtTokenHeader(`istio_sds_credentail_header`) if it exists; 542 // otherwise fallback to credentialTokenHeader('authorization'). 543 if h, ok := metadata[k8sSAJwtTokenHeaderKey]; ok { 544 if len(h) != 1 { 545 return "", fmt.Errorf("credential token from %q must have 1 value in gRPC metadata but got %d", k8sSAJwtTokenHeaderKey, len(h)) 546 } 547 if len(h[0]) == 0 { 548 return "", fmt.Errorf("received empty credential for existing header: %s", k8sSAJwtTokenHeaderKey) 549 } 550 return h[0], nil 551 } 552 553 if h, ok := metadata[credentialTokenHeaderKey]; ok { 554 if len(h) != 1 { 555 return "", fmt.Errorf("credential token from %q must have 1 value in gRPC metadata but got %d", credentialTokenHeaderKey, len(h)) 556 } 557 if len(h[0]) == 0 { 558 return "", fmt.Errorf("received empty credential for existing header: %s", credentialTokenHeaderKey) 559 } 560 return h[0], nil 561 } 562 563 return "", fmt.Errorf("no credential token is found") 564} 565 566func addConn(k cache.ConnKey, conn *sdsConnection) { 567 sdsClientsMutex.Lock() 568 defer sdsClientsMutex.Unlock() 569 conIDresourceNamePrefix := sdsLogPrefix(k.ResourceName) 570 sdsServiceLog.Debugf("%s add a new connection", conIDresourceNamePrefix) 571 sdsClients[k] = conn 572} 573 574func pushSDS(con *sdsConnection) error { 575 if con == nil { 576 return fmt.Errorf("sdsConnection passed into pushSDS() should not be nil") 577 } 578 579 con.mutex.Lock() 580 defer con.mutex.Unlock() 581 secret := con.secret 582 resourceName := con.ResourceName 583 sdsPushTime := con.sdsPushTime 584 585 conIDresourceNamePrefix := sdsLogPrefix(resourceName) 586 if !sdsPushTime.IsZero() { 587 sdsServiceLog.Errorf("%s skip multiple push, last push finishes at %s and is "+ 588 "waiting for next SDS request", conIDresourceNamePrefix, sdsPushTime.String()) 589 return nil 590 } 591 592 if secret == nil { 593 return fmt.Errorf("sdsConnection %v passed into pushSDS() contains nil secret", con) 594 } 595 596 response, err := sdsDiscoveryResponse(secret, resourceName) 597 if err != nil { 598 sdsServiceLog.Errorf("%s failed to construct response for SDS push: %v", conIDresourceNamePrefix, err) 599 return err 600 } 601 602 if err = con.stream.Send(response); err != nil { 603 sdsServiceLog.Errorf("%s failed to send response: %v", conIDresourceNamePrefix, err) 604 totalPushErrorCounts.Increment() 605 return err 606 } 607 608 con.sdsPushTime = time.Now() 609 610 // Update metrics after push to avoid adding latency to SDS push. 611 if secret.RootCert != nil { 612 sdsServiceLog.Infof("%s pushed root cert to proxy", conIDresourceNamePrefix) 613 sdsServiceLog.Debugf("%s pushed root cert %+v to proxy", conIDresourceNamePrefix, 614 string(secret.RootCert)) 615 } else { 616 sdsServiceLog.Infof("%s pushed key/cert pair to proxy", conIDresourceNamePrefix) 617 sdsServiceLog.Debugf("%s pushed certificate chain %+v to proxy", 618 conIDresourceNamePrefix, string(secret.CertificateChain)) 619 } 620 totalPushCounts.Increment() 621 return nil 622} 623 624func sdsDiscoveryResponse(s *model.SecretItem, resourceName string) (*xdsapi.DiscoveryResponse, error) { 625 resp := &xdsapi.DiscoveryResponse{ 626 TypeUrl: SecretType, 627 } 628 conIDresourceNamePrefix := sdsLogPrefix(resourceName) 629 if s == nil { 630 sdsServiceLog.Warnf("%s got nil secret for proxy", conIDresourceNamePrefix) 631 return resp, nil 632 } 633 634 resp.VersionInfo = s.Version 635 resp.Nonce = s.Version 636 secret := &authapi.Secret{ 637 Name: s.ResourceName, 638 } 639 if s.RootCert != nil { 640 secret.Type = &authapi.Secret_ValidationContext{ 641 ValidationContext: &authapi.CertificateValidationContext{ 642 TrustedCa: &core.DataSource{ 643 Specifier: &core.DataSource_InlineBytes{ 644 InlineBytes: s.RootCert, 645 }, 646 }, 647 }, 648 } 649 } else { 650 secret.Type = &authapi.Secret_TlsCertificate{ 651 TlsCertificate: &authapi.TlsCertificate{ 652 CertificateChain: &core.DataSource{ 653 Specifier: &core.DataSource_InlineBytes{ 654 InlineBytes: s.CertificateChain, 655 }, 656 }, 657 PrivateKey: &core.DataSource{ 658 Specifier: &core.DataSource_InlineBytes{ 659 InlineBytes: s.PrivateKey, 660 }, 661 }, 662 }, 663 } 664 } 665 666 ms, err := ptypes.MarshalAny(secret) 667 if err != nil { 668 sdsServiceLog.Errorf("%s failed to mashal secret for proxy: %v", conIDresourceNamePrefix, err) 669 return nil, err 670 } 671 resp.Resources = append(resp.Resources, ms) 672 673 return resp, nil 674} 675 676func newSDSConnection(stream discoveryStream) *sdsConnection { 677 return &sdsConnection{ 678 pushChannel: make(chan *sdsEvent, 1), 679 Connect: time.Now(), 680 stream: stream, 681 } 682} 683 684func receiveThread(con *sdsConnection, reqChannel chan *xdsapi.DiscoveryRequest, errP *error) { 685 defer close(reqChannel) // indicates close of the remote side. 686 for { 687 req, err := con.stream.Recv() 688 if err != nil { 689 // Add read lock to avoid race condition with set con.conID in StreamSecrets. 690 con.mutex.RLock() 691 conIDresourceNamePrefix := sdsLogPrefix(con.ResourceName) 692 con.mutex.RUnlock() 693 if status.Code(err) == codes.Canceled || err == io.EOF { 694 sdsServiceLog.Infof("%s connection is terminated: %v", conIDresourceNamePrefix, err) 695 return 696 } 697 *errP = err 698 sdsServiceLog.Errorf("%s connection is terminated with errors %v", conIDresourceNamePrefix, err) 699 return 700 } 701 reqChannel <- req 702 } 703} 704 705func constructConnectionID(proxyID string) string { 706 id := atomic.AddInt64(&connectionNumber, 1) 707 return proxyID + "-" + strconv.FormatInt(id, 10) 708} 709 710// sdsLogPrefix returns a unified log prefix. 711func sdsLogPrefix(resourceName string) string { 712 lPrefix := fmt.Sprintf("resource:%s", resourceName) 713 return lPrefix 714} 715