1// Copyright 2019 Istio Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// Package sds implements secret discovery service in NodeAgent.
16package sds
17
18import (
19	"context"
20	"encoding/json"
21	"fmt"
22	"io"
23	"io/ioutil"
24	"strconv"
25	"sync"
26	"sync/atomic"
27	"time"
28
29	"istio.io/istio/security/pkg/nodeagent/util"
30
31	xdsapi "github.com/envoyproxy/go-control-plane/envoy/api/v2"
32	authapi "github.com/envoyproxy/go-control-plane/envoy/api/v2/auth"
33	core "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
34	sds "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2"
35	"github.com/golang/protobuf/ptypes"
36	"google.golang.org/grpc"
37	"google.golang.org/grpc/codes"
38	"google.golang.org/grpc/metadata"
39	"google.golang.org/grpc/status"
40
41	"istio.io/istio/security/pkg/nodeagent/cache"
42	"istio.io/istio/security/pkg/nodeagent/model"
43	"istio.io/pkg/log"
44)
45
46const (
47	// SecretType is used for secret discovery service to construct response.
48	SecretType = "type.googleapis.com/envoy.api.v2.auth.Secret"
49
50	// credentialTokenHeaderKey is the header key in gPRC header which is used to
51	// pass credential token from envoy's SDS request to SDS service.
52	credentialTokenHeaderKey = "authorization"
53
54	// K8sSAJwtTokenHeaderKey is the request header key for k8s jwt token.
55	// Binary header name must has suffix "-bin", according to https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md.
56	// Same value defined in pilot pkg(k8sSAJwtTokenHeaderKey)
57	k8sSAJwtTokenHeaderKey = "istio_sds_credentials_header-bin"
58)
59
60var (
61	sdsClients       = map[cache.ConnKey]*sdsConnection{}
62	staledClientKeys = map[cache.ConnKey]bool{}
63	sdsClientsMutex  sync.RWMutex
64
65	// Tracks connections, increment on each new connection.
66	connectionNumber = int64(0)
67	sdsServiceLog    = log.RegisterScope("sds", "SDS service debugging", 0)
68)
69
70type discoveryStream interface {
71	Send(*xdsapi.DiscoveryResponse) error
72	Recv() (*xdsapi.DiscoveryRequest, error)
73	grpc.ServerStream
74}
75
76// sdsEvent represents a secret event that results in a push.
77type sdsEvent struct{}
78
79type sdsConnection struct {
80	// Time of connection, for debugging.
81	Connect time.Time
82
83	// The ID of proxy from which the connection comes from.
84	proxyID string
85
86	// The ResourceName of the SDS request.
87	ResourceName string
88
89	// Sending on this channel results in  push.
90	pushChannel chan *sdsEvent
91
92	// SDS streams implement this interface.
93	stream discoveryStream
94
95	// The secret associated with the proxy.
96	secret *model.SecretItem
97
98	// Mutex to protect read/write to this connection
99	// TODO(JimmyCYJ): Move all read/write into member function with lock protection to avoid race condition.
100	mutex sync.RWMutex
101
102	// ConID is the connection identifier, used as a key in the connection table.
103	// Currently based on the node name and a counter.
104	conID string
105
106	// Time of the recent SDS push. Will be reset to zero when a new SDS request is received. A
107	// non-zero time indicates that the connection is waiting for SDS request.
108	sdsPushTime time.Time
109}
110
111type sdsservice struct {
112	st cache.SecretManager
113
114	ticker         *time.Ticker
115	tickerInterval time.Duration
116
117	// close channel.
118	closing chan bool
119
120	// skipToken indicates whether token is required.
121	skipToken bool
122
123	fileMountedCertsOnly bool
124
125	localJWT bool
126
127	jwtPath string
128
129	outputKeyCertToDir string
130}
131
132// ClientDebug represents a single SDS connection to the ndoe agent
133type ClientDebug struct {
134	ConnectionID string `json:"connection_id"`
135	ProxyID      string `json:"proxy"`
136	ResourceName string `json:"resource_name"`
137
138	// fields from secret item
139	CertificateChain string `json:"certificate_chain"`
140	RootCert         string `json:"root_cert"`
141	CreatedTime      string `json:"created_time"`
142	ExpireTime       string `json:"expire_time"`
143}
144
145// Debug represents all clients connected to this node agent endpoint and their supplied secrets
146type Debug struct {
147	Clients []ClientDebug `json:"clients"`
148}
149
150// newSDSService creates Secret Discovery Service which implements envoy v2 SDS API.
151func newSDSService(st cache.SecretManager, skipTokenVerification, localJWT, fileMountedCertsOnly bool,
152	recycleInterval time.Duration, jwtPath, outputKeyCertToDir string) *sdsservice {
153	if st == nil {
154		return nil
155	}
156
157	ret := &sdsservice{
158		st:                   st,
159		skipToken:            skipTokenVerification,
160		fileMountedCertsOnly: fileMountedCertsOnly,
161		tickerInterval:       recycleInterval,
162		closing:              make(chan bool),
163		localJWT:             localJWT,
164		jwtPath:              jwtPath,
165		outputKeyCertToDir:   outputKeyCertToDir,
166	}
167
168	go ret.clearStaledClientsJob()
169
170	return ret
171}
172
173// register adds the SDS handle to the grpc server
174func (s *sdsservice) register(rpcs *grpc.Server) {
175	sds.RegisterSecretDiscoveryServiceServer(rpcs, s)
176}
177
178// DebugInfo serializes the current sds client data into JSON for the debug endpoint
179func (s *sdsservice) DebugInfo() (string, error) {
180	sdsClientsMutex.RLock()
181	defer sdsClientsMutex.RUnlock()
182	clientDebug := make([]ClientDebug, 0)
183	for connKey, conn := range sdsClients {
184		// it's possible for the connection to be established without an instantiated secret
185		if conn.secret == nil {
186			continue
187		}
188
189		conn.mutex.RLock()
190		c := ClientDebug{
191			ConnectionID:     connKey.ConnectionID,
192			ProxyID:          conn.proxyID,
193			ResourceName:     conn.ResourceName,
194			CertificateChain: string(conn.secret.CertificateChain),
195			RootCert:         string(conn.secret.RootCert),
196			CreatedTime:      conn.secret.CreatedTime.Format(time.RFC3339),
197			ExpireTime:       conn.secret.ExpireTime.Format(time.RFC3339),
198		}
199		clientDebug = append(clientDebug, c)
200		conn.mutex.RUnlock()
201	}
202
203	debug := Debug{
204		Clients: clientDebug,
205	}
206	debugJSON, err := json.MarshalIndent(debug, " ", "	")
207	if err != nil {
208		return "", err
209	}
210
211	return string(debugJSON), nil
212}
213
214func (s *sdsservice) DeltaSecrets(stream sds.SecretDiscoveryService_DeltaSecretsServer) error {
215	return status.Error(codes.Unimplemented, "DeltaSecrets not implemented")
216}
217
218// StreamSecrets serves SDS discovery requests and SDS push requests
219func (s *sdsservice) StreamSecrets(stream sds.SecretDiscoveryService_StreamSecretsServer) error {
220	token := ""
221	ctx := context.Background()
222
223	var receiveError error
224	reqChannel := make(chan *xdsapi.DiscoveryRequest, 1)
225	con := newSDSConnection(stream)
226
227	go receiveThread(con, reqChannel, &receiveError)
228
229	var node *core.Node
230	for {
231		// Block until a request is received.
232		select {
233		case discReq, ok := <-reqChannel:
234			if !ok {
235				// Remote side closed connection.
236				sdsServiceLog.Errorf("Remote side closed connection")
237				return receiveError
238			}
239
240			if discReq.Node == nil {
241				discReq.Node = node
242			} else {
243				node = discReq.Node
244			}
245
246			resourceName, err := getResourceName(discReq)
247			if err != nil {
248				sdsServiceLog.Errorf("Close connection. Error: %v", err)
249				return err
250			}
251
252			if resourceName == "" {
253				sdsServiceLog.Infof("Received empty resource name from %q. No need to respond", discReq.Node.Id)
254				continue
255			}
256
257			var firstRequestFlag bool
258			con.mutex.Lock()
259			if con.conID == "" {
260				// first request
261				if discReq.Node == nil || len(discReq.Node.Id) == 0 {
262					sdsServiceLog.Errorf("%s close connection. Missing Node ID in the first request",
263						sdsLogPrefix(resourceName))
264					return fmt.Errorf("missing Node ID in the first request")
265				}
266				con.conID = constructConnectionID(discReq.Node.Id)
267				con.proxyID = discReq.Node.Id
268				con.ResourceName = resourceName
269				key := cache.ConnKey{
270					ResourceName: resourceName,
271					ConnectionID: con.conID,
272				}
273				addConn(key, con)
274				firstRequestFlag = true
275				sdsServiceLog.Infof("%s new connection", sdsLogPrefix(resourceName))
276			}
277			conID := con.conID
278
279			// Reset SDS push time for new SDS push.
280			con.sdsPushTime = time.Time{}
281			con.mutex.Unlock()
282			defer recycleConnection(conID, resourceName)
283
284			conIDresourceNamePrefix := sdsLogPrefix(resourceName)
285			if s.localJWT {
286				// Running in-process, no need to pass the token from envoy to agent as in-context - use the file
287				tok, err := ioutil.ReadFile(s.jwtPath)
288				if err != nil {
289					sdsServiceLog.Errorf("Failed to get credential token: %v", err)
290					return err
291				}
292				token = string(tok)
293			} else if s.outputKeyCertToDir != "" {
294				// Using existing certs and the new SDS - skipToken case is for the old node agent.
295			} else if !s.skipToken {
296				ctx = stream.Context()
297				t, err := getCredentialToken(ctx)
298				if err != nil {
299					sdsServiceLog.Errorf("%s Close connection. Failed to get credential token from "+
300						"incoming request: %v", conIDresourceNamePrefix, err)
301					return err
302				}
303				token = t
304			}
305
306			// Update metrics.
307			totalActiveConnCounts.Increment()
308			if discReq.ErrorDetail != nil {
309				totalSecretUpdateFailureCounts.Increment()
310				sdsServiceLog.Errorf("%s received error: %v. Will not respond until next secret update",
311					conIDresourceNamePrefix, discReq.ErrorDetail)
312				continue
313			}
314			// When nodeagent receives StreamSecrets request, if there is cached secret which matches
315			// request's <token, resourceName, Version>, then this request is a confirmation request.
316			// nodeagent stops sending response to envoy in this case.
317			if discReq.VersionInfo != "" && s.st.SecretExist(conID, resourceName, token, discReq.VersionInfo) {
318				sdsServiceLog.Debugf("%s received SDS ACK from proxy %q, version info %q, "+
319					"error details %s\n", conIDresourceNamePrefix, discReq.Node.Id, discReq.VersionInfo,
320					discReq.ErrorDetail)
321				continue
322			}
323
324			sdsServiceLog.Debugf("%s received SDS request from proxy %q, first request: %v, version info %q, "+
325				"error details %s\n", conIDresourceNamePrefix, discReq.Node.Id, firstRequestFlag, discReq.VersionInfo,
326				discReq.ErrorDetail)
327
328			// In ingress gateway agent mode, if the first SDS request is received but Ingress gateway secret which is
329			// provisioned as kubernetes secret is not ready, wait for secret before sending SDS response.
330			// If a kubernetes secret was deleted by operator, wait for a new kubernetes secret before sending SDS response.
331			// If workload uses file mounted certs i.e. "FILE_MOUNTED_CERTS" is set to true, workdload loads certificates from
332			// mounted certificate paths and it does not depend on the presence of ingress gateway secret so
333			// we should skip waiting for it in that mode.
334			// File mounted certs for gateways is used in scenarios where an existing PKI infrastuctures delivers certificates
335			// to pods/VMs via files.
336			if s.st.ShouldWaitForIngressGatewaySecret(conID, resourceName, token, s.fileMountedCertsOnly) {
337				sdsServiceLog.Warnf("%s waiting for ingress gateway secret for proxy %q\n", conIDresourceNamePrefix, discReq.Node.Id)
338				continue
339			} else {
340				sdsServiceLog.Infof("Skipping waiting for ingress gateway secret")
341			}
342
343			secret, err := s.st.GenerateSecret(ctx, conID, resourceName, token)
344			if err != nil {
345				sdsServiceLog.Errorf("%s Close connection. Failed to get secret for proxy %q from "+
346					"secret cache: %v", conIDresourceNamePrefix, discReq.Node.Id, err)
347				return err
348			}
349
350			// Output the key and cert to a directory, if some applications need to read them from local file system.
351			if err = util.OutputKeyCertToDir(s.outputKeyCertToDir, secret.PrivateKey,
352				secret.CertificateChain, secret.RootCert); err != nil {
353				sdsServiceLog.Errorf("(%v, %v) error when output the key and cert: %v",
354					conIDresourceNamePrefix, discReq.Node.Id, err)
355				return err
356			}
357
358			// Remove the secret from cache, otherwise refresh job will process this item(if envoy fails to reconnect)
359			// and cause some confusing logs like 'fails to notify because connection isn't found'.
360			defer s.st.DeleteSecret(conID, resourceName)
361
362			con.mutex.Lock()
363			con.secret = secret
364			con.mutex.Unlock()
365
366			if err := pushSDS(con); err != nil {
367				sdsServiceLog.Errorf("%s Close connection. Failed to push key/cert to proxy %q: %v",
368					conIDresourceNamePrefix, discReq.Node.Id, err)
369				return err
370			}
371		case <-con.pushChannel:
372			con.mutex.RLock()
373			proxyID := con.proxyID
374			conID := con.conID
375			resourceName := con.ResourceName
376			secret := con.secret
377			con.mutex.RUnlock()
378			conIDresourceNamePrefix := sdsLogPrefix(resourceName)
379			sdsServiceLog.Debugf("%s received push channel request for proxy %q", conIDresourceNamePrefix, proxyID)
380
381			if secret == nil {
382				defer func() {
383					recycleConnection(conID, resourceName)
384					s.st.DeleteSecret(conID, resourceName)
385				}()
386
387				// Secret is nil indicates close streaming to proxy, so that proxy
388				// could connect again with updated token.
389				// When nodeagent stops stream by sending envoy error response, it's Ok not to remove secret
390				// from secret cache because cache has auto-evication.
391				sdsServiceLog.Debugf("%s close connection for proxy %q", conIDresourceNamePrefix, proxyID)
392				return fmt.Errorf("%s Close connection to proxy %q", conIDresourceNamePrefix, conID)
393			}
394
395			if err := pushSDS(con); err != nil {
396				sdsServiceLog.Errorf("%s Close connection. Failed to push key/cert to proxy %q: %v",
397					conIDresourceNamePrefix, proxyID, err)
398				return err
399			}
400			sdsServiceLog.Infoa("Dynamic push for secret ", resourceName)
401		}
402	}
403}
404
405// FetchSecrets generates and returns secret from SecretManager in response to DiscoveryRequest
406func (s *sdsservice) FetchSecrets(ctx context.Context, discReq *xdsapi.DiscoveryRequest) (*xdsapi.DiscoveryResponse, error) {
407	token := ""
408	if s.localJWT {
409		// Running in-process, no need to pass the token from envoy to agent as in-context - use the file
410		tok, err := ioutil.ReadFile(s.jwtPath)
411		if err != nil {
412			sdsServiceLog.Errorf("Failed to get credential token: %v", err)
413			return nil, err
414		}
415		token = string(tok)
416	} else if !s.skipToken {
417		t, err := getCredentialToken(ctx)
418		if err != nil {
419			sdsServiceLog.Errorf("Failed to get credential token: %v", err)
420			return nil, err
421		}
422		token = t
423	}
424
425	resourceName, err := getResourceName(discReq)
426	if err != nil {
427		sdsServiceLog.Errorf("Close connection. Error: %v", err)
428		return nil, err
429	}
430
431	connID := constructConnectionID(discReq.Node.Id)
432	secret, err := s.st.GenerateSecret(ctx, connID, resourceName, token)
433	if err != nil {
434		sdsServiceLog.Errorf("Failed to get secret for proxy %q from secret cache: %v", connID, err)
435		return nil, err
436	}
437
438	// Output the key and cert to a directory, if some applications need to read them from local file system.
439	if err = util.OutputKeyCertToDir(s.outputKeyCertToDir, secret.PrivateKey,
440		secret.CertificateChain, secret.RootCert); err != nil {
441		sdsServiceLog.Errorf("(%v) error when output the key and cert: %v",
442			connID, err)
443		return nil, err
444	}
445	return sdsDiscoveryResponse(secret, resourceName)
446}
447
448func (s *sdsservice) Stop() {
449	s.closing <- true
450}
451
452func (s *sdsservice) clearStaledClientsJob() {
453	s.ticker = time.NewTicker(s.tickerInterval)
454	for {
455		select {
456		case <-s.ticker.C:
457			clearStaledClients()
458		case <-s.closing:
459			if s.ticker != nil {
460				s.ticker.Stop()
461			}
462		}
463	}
464}
465
466func clearStaledClients() {
467	sdsServiceLog.Debug("start staled connection cleanup job")
468	sdsClientsMutex.Lock()
469	defer sdsClientsMutex.Unlock()
470
471	for connKey := range staledClientKeys {
472		sdsServiceLog.Debugf("remove staled clients %+v", connKey)
473		delete(sdsClients, connKey)
474		delete(staledClientKeys, connKey)
475		// totalStaleConnCounts should be 0 when the for loop finishes.
476		totalStaleConnCounts.Decrement()
477	}
478}
479
480// NotifyProxy sends notification to proxy about secret update,
481// SDS will close streaming connection if secret is nil.
482func NotifyProxy(connKey cache.ConnKey, secret *model.SecretItem) error {
483	conIDresourceNamePrefix := sdsLogPrefix(connKey.ResourceName)
484	sdsClientsMutex.Lock()
485	conn := sdsClients[connKey]
486	if conn == nil {
487		sdsClientsMutex.Unlock()
488		sdsServiceLog.Errorf("%s NotifyProxy failed. No connection with id %q can be found",
489			conIDresourceNamePrefix, connKey.ConnectionID)
490		return fmt.Errorf("no connection with id %q can be found", connKey.ConnectionID)
491	}
492	conn.mutex.Lock()
493	conn.secret = secret
494	conn.mutex.Unlock()
495	sdsClientsMutex.Unlock()
496
497	conn.pushChannel <- &sdsEvent{}
498	return nil
499}
500
501func recycleConnection(conID, resourceName string) {
502	key := cache.ConnKey{
503		ConnectionID: conID,
504		ResourceName: resourceName,
505	}
506
507	sdsClientsMutex.Lock()
508	defer sdsClientsMutex.Unlock()
509
510	// Only add connection key to staledClientKeys if it's not there already.
511	// The recycleConnection function may be triggered more than once for each connection key.
512	// https://github.com/istio/istio/issues/15306#issuecomment-509783105
513	if _, found := staledClientKeys[key]; found {
514		return
515	}
516
517	staledClientKeys[key] = true
518
519	totalStaleConnCounts.Increment()
520	totalActiveConnCounts.Decrement()
521}
522
523func getResourceName(discReq *xdsapi.DiscoveryRequest) (string /*resourceName*/, error) {
524	if len(discReq.ResourceNames) == 0 {
525		return "", nil
526	}
527
528	if len(discReq.ResourceNames) == 1 {
529		return discReq.ResourceNames[0], nil
530	}
531
532	return "", fmt.Errorf("discovery request %+v has more than one resourceNames %+v", discReq, discReq.ResourceNames)
533}
534
535func getCredentialToken(ctx context.Context) (string, error) {
536	metadata, ok := metadata.FromIncomingContext(ctx)
537	if !ok {
538		return "", fmt.Errorf("unable to get metadata from incoming context")
539	}
540
541	// Get credential token from request k8sSAJwtTokenHeader(`istio_sds_credentail_header`) if it exists;
542	// otherwise fallback to credentialTokenHeader('authorization').
543	if h, ok := metadata[k8sSAJwtTokenHeaderKey]; ok {
544		if len(h) != 1 {
545			return "", fmt.Errorf("credential token from %q must have 1 value in gRPC metadata but got %d", k8sSAJwtTokenHeaderKey, len(h))
546		}
547		if len(h[0]) == 0 {
548			return "", fmt.Errorf("received empty credential for existing header: %s", k8sSAJwtTokenHeaderKey)
549		}
550		return h[0], nil
551	}
552
553	if h, ok := metadata[credentialTokenHeaderKey]; ok {
554		if len(h) != 1 {
555			return "", fmt.Errorf("credential token from %q must have 1 value in gRPC metadata but got %d", credentialTokenHeaderKey, len(h))
556		}
557		if len(h[0]) == 0 {
558			return "", fmt.Errorf("received empty credential for existing header: %s", credentialTokenHeaderKey)
559		}
560		return h[0], nil
561	}
562
563	return "", fmt.Errorf("no credential token is found")
564}
565
566func addConn(k cache.ConnKey, conn *sdsConnection) {
567	sdsClientsMutex.Lock()
568	defer sdsClientsMutex.Unlock()
569	conIDresourceNamePrefix := sdsLogPrefix(k.ResourceName)
570	sdsServiceLog.Debugf("%s add a new connection", conIDresourceNamePrefix)
571	sdsClients[k] = conn
572}
573
574func pushSDS(con *sdsConnection) error {
575	if con == nil {
576		return fmt.Errorf("sdsConnection passed into pushSDS() should not be nil")
577	}
578
579	con.mutex.Lock()
580	defer con.mutex.Unlock()
581	secret := con.secret
582	resourceName := con.ResourceName
583	sdsPushTime := con.sdsPushTime
584
585	conIDresourceNamePrefix := sdsLogPrefix(resourceName)
586	if !sdsPushTime.IsZero() {
587		sdsServiceLog.Errorf("%s skip multiple push, last push finishes at %s and is "+
588			"waiting for next SDS request", conIDresourceNamePrefix, sdsPushTime.String())
589		return nil
590	}
591
592	if secret == nil {
593		return fmt.Errorf("sdsConnection %v passed into pushSDS() contains nil secret", con)
594	}
595
596	response, err := sdsDiscoveryResponse(secret, resourceName)
597	if err != nil {
598		sdsServiceLog.Errorf("%s failed to construct response for SDS push: %v", conIDresourceNamePrefix, err)
599		return err
600	}
601
602	if err = con.stream.Send(response); err != nil {
603		sdsServiceLog.Errorf("%s failed to send response: %v", conIDresourceNamePrefix, err)
604		totalPushErrorCounts.Increment()
605		return err
606	}
607
608	con.sdsPushTime = time.Now()
609
610	// Update metrics after push to avoid adding latency to SDS push.
611	if secret.RootCert != nil {
612		sdsServiceLog.Infof("%s pushed root cert to proxy", conIDresourceNamePrefix)
613		sdsServiceLog.Debugf("%s pushed root cert %+v to proxy", conIDresourceNamePrefix,
614			string(secret.RootCert))
615	} else {
616		sdsServiceLog.Infof("%s pushed key/cert pair to proxy", conIDresourceNamePrefix)
617		sdsServiceLog.Debugf("%s pushed certificate chain %+v to proxy",
618			conIDresourceNamePrefix, string(secret.CertificateChain))
619	}
620	totalPushCounts.Increment()
621	return nil
622}
623
624func sdsDiscoveryResponse(s *model.SecretItem, resourceName string) (*xdsapi.DiscoveryResponse, error) {
625	resp := &xdsapi.DiscoveryResponse{
626		TypeUrl: SecretType,
627	}
628	conIDresourceNamePrefix := sdsLogPrefix(resourceName)
629	if s == nil {
630		sdsServiceLog.Warnf("%s got nil secret for proxy", conIDresourceNamePrefix)
631		return resp, nil
632	}
633
634	resp.VersionInfo = s.Version
635	resp.Nonce = s.Version
636	secret := &authapi.Secret{
637		Name: s.ResourceName,
638	}
639	if s.RootCert != nil {
640		secret.Type = &authapi.Secret_ValidationContext{
641			ValidationContext: &authapi.CertificateValidationContext{
642				TrustedCa: &core.DataSource{
643					Specifier: &core.DataSource_InlineBytes{
644						InlineBytes: s.RootCert,
645					},
646				},
647			},
648		}
649	} else {
650		secret.Type = &authapi.Secret_TlsCertificate{
651			TlsCertificate: &authapi.TlsCertificate{
652				CertificateChain: &core.DataSource{
653					Specifier: &core.DataSource_InlineBytes{
654						InlineBytes: s.CertificateChain,
655					},
656				},
657				PrivateKey: &core.DataSource{
658					Specifier: &core.DataSource_InlineBytes{
659						InlineBytes: s.PrivateKey,
660					},
661				},
662			},
663		}
664	}
665
666	ms, err := ptypes.MarshalAny(secret)
667	if err != nil {
668		sdsServiceLog.Errorf("%s failed to mashal secret for proxy: %v", conIDresourceNamePrefix, err)
669		return nil, err
670	}
671	resp.Resources = append(resp.Resources, ms)
672
673	return resp, nil
674}
675
676func newSDSConnection(stream discoveryStream) *sdsConnection {
677	return &sdsConnection{
678		pushChannel: make(chan *sdsEvent, 1),
679		Connect:     time.Now(),
680		stream:      stream,
681	}
682}
683
684func receiveThread(con *sdsConnection, reqChannel chan *xdsapi.DiscoveryRequest, errP *error) {
685	defer close(reqChannel) // indicates close of the remote side.
686	for {
687		req, err := con.stream.Recv()
688		if err != nil {
689			// Add read lock to avoid race condition with set con.conID in StreamSecrets.
690			con.mutex.RLock()
691			conIDresourceNamePrefix := sdsLogPrefix(con.ResourceName)
692			con.mutex.RUnlock()
693			if status.Code(err) == codes.Canceled || err == io.EOF {
694				sdsServiceLog.Infof("%s connection is terminated: %v", conIDresourceNamePrefix, err)
695				return
696			}
697			*errP = err
698			sdsServiceLog.Errorf("%s connection is terminated with errors %v", conIDresourceNamePrefix, err)
699			return
700		}
701		reqChannel <- req
702	}
703}
704
705func constructConnectionID(proxyID string) string {
706	id := atomic.AddInt64(&connectionNumber, 1)
707	return proxyID + "-" + strconv.FormatInt(id, 10)
708}
709
710// sdsLogPrefix returns a unified log prefix.
711func sdsLogPrefix(resourceName string) string {
712	lPrefix := fmt.Sprintf("resource:%s", resourceName)
713	return lPrefix
714}
715