1/*
2 *
3 * Copyright 2019 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package client
20
21import (
22	"context"
23	"fmt"
24	"sync"
25	"time"
26
27	"google.golang.org/grpc"
28	"google.golang.org/grpc/internal/buffer"
29	"google.golang.org/grpc/internal/grpclog"
30
31	xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
32	corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
33	adsgrpc "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2"
34)
35
36// The value chosen here is based on the default value of the
37// initial_fetch_timeout field in corepb.ConfigSource proto.
38var defaultWatchExpiryTimeout = 15 * time.Second
39
40// v2Client performs the actual xDS RPCs using the xDS v2 API. It creates a
41// single ADS stream on which the different types of xDS requests and responses
42// are multiplexed.
43// The reason for splitting this out from the top level xdsClient object is
44// because there is already an xDS v3Aplha API in development. If and when we
45// want to switch to that, this separation will ease that process.
46type v2Client struct {
47	ctx       context.Context
48	cancelCtx context.CancelFunc
49
50	// ClientConn to the xDS gRPC server. Owned by the parent xdsClient.
51	cc        *grpc.ClientConn
52	nodeProto *corepb.Node
53	backoff   func(int) time.Duration
54
55	logger *grpclog.PrefixLogger
56
57	streamCh chan adsStream
58	// sendCh in the channel onto which watchInfo objects are pushed by the
59	// watch API, and it is read and acted upon by the send() goroutine.
60	sendCh *buffer.Unbounded
61
62	mu sync.Mutex
63	// Message specific watch infos, protected by the above mutex. These are
64	// written to, after successfully reading from the update channel, and are
65	// read from when recovering from a broken stream to resend the xDS
66	// messages. When the user of this client object cancels a watch call,
67	// these are set to nil. All accesses to the map protected and any value
68	// inside the map should be protected with the above mutex.
69	watchMap map[string]*watchInfo
70	// versionMap contains the version that was acked (the version in the ack
71	// request that was sent on wire). The key is typeURL, the value is the
72	// version string, becaues the versions for different resource types should
73	// be independent.
74	versionMap map[string]string
75	// nonceMap contains the nonce from the most recent received response.
76	nonceMap map[string]string
77	// rdsCache maintains a mapping of {routeConfigName --> clusterName} from
78	// validated route configurations received in RDS responses. We cache all
79	// valid route configurations, whether or not we are interested in them
80	// when we received them (because we could become interested in them in the
81	// future and the server wont send us those resources again).
82	// Protected by the above mutex.
83	//
84	// TODO: remove RDS cache. The updated spec says client can ignore
85	// unrequested resources.
86	// https://github.com/envoyproxy/envoy/blob/master/api/xds_protocol.rst#resource-hints
87	rdsCache map[string]string
88	// rdsCache maintains a mapping of {clusterName --> CDSUpdate} from
89	// validated cluster configurations received in CDS responses. We cache all
90	// valid cluster configurations, whether or not we are interested in them
91	// when we received them (because we could become interested in them in the
92	// future and the server wont send us those resources again). This is only
93	// to support legacy management servers that do not honor the
94	// resource_names field. As per the latest spec, the server should resend
95	// the response when the request changes, even if it had sent the same
96	// resource earlier (when not asked for). Protected by the above mutex.
97	cdsCache map[string]CDSUpdate
98}
99
100// newV2Client creates a new v2Client initialized with the passed arguments.
101func newV2Client(cc *grpc.ClientConn, nodeProto *corepb.Node, backoff func(int) time.Duration, logger *grpclog.PrefixLogger) *v2Client {
102	v2c := &v2Client{
103		cc:        cc,
104		nodeProto: nodeProto,
105		backoff:   backoff,
106
107		logger: logger,
108
109		streamCh: make(chan adsStream, 1),
110		sendCh:   buffer.NewUnbounded(),
111
112		watchMap:   make(map[string]*watchInfo),
113		versionMap: make(map[string]string),
114		nonceMap:   make(map[string]string),
115		rdsCache:   make(map[string]string),
116		cdsCache:   make(map[string]CDSUpdate),
117	}
118	v2c.ctx, v2c.cancelCtx = context.WithCancel(context.Background())
119
120	go v2c.run()
121	return v2c
122}
123
124// close cleans up resources and goroutines allocated by this client.
125func (v2c *v2Client) close() {
126	v2c.cancelCtx()
127}
128
129// run starts an ADS stream (and backs off exponentially, if the previous
130// stream failed without receiving a single reply) and runs the sender and
131// receiver routines to send and receive data from the stream respectively.
132func (v2c *v2Client) run() {
133	go v2c.send()
134	// TODO: start a goroutine monitoring ClientConn's connectivity state, and
135	// report error (and log) when stats is transient failure.
136
137	retries := 0
138	for {
139		select {
140		case <-v2c.ctx.Done():
141			return
142		default:
143		}
144
145		if retries != 0 {
146			t := time.NewTimer(v2c.backoff(retries))
147			select {
148			case <-t.C:
149			case <-v2c.ctx.Done():
150				if !t.Stop() {
151					<-t.C
152				}
153				return
154			}
155		}
156
157		retries++
158		cli := adsgrpc.NewAggregatedDiscoveryServiceClient(v2c.cc)
159		stream, err := cli.StreamAggregatedResources(v2c.ctx, grpc.WaitForReady(true))
160		if err != nil {
161			v2c.logger.Warningf("xds: ADS stream creation failed: %v", err)
162			continue
163		}
164		v2c.logger.Infof("ADS stream created")
165
166		select {
167		case <-v2c.streamCh:
168		default:
169		}
170		v2c.streamCh <- stream
171		if v2c.recv(stream) {
172			retries = 0
173		}
174	}
175}
176
177// sendRequest sends a request for provided typeURL and resource on the provided
178// stream.
179//
180// version is the ack version to be sent with the request
181// - If this is the new request (not an ack/nack), version will be an empty
182// string
183// - If this is an ack, version will be the version from the response
184// - If this is a nack, version will be the previous acked version (from
185// versionMap). If there was no ack before, it will be an empty string
186func (v2c *v2Client) sendRequest(stream adsStream, resourceNames []string, typeURL, version, nonce string) bool {
187	req := &xdspb.DiscoveryRequest{
188		Node:          v2c.nodeProto,
189		TypeUrl:       typeURL,
190		ResourceNames: resourceNames,
191		VersionInfo:   version,
192		ResponseNonce: nonce,
193		// TODO: populate ErrorDetails for nack.
194	}
195	if err := stream.Send(req); err != nil {
196		return false
197	}
198	v2c.logger.Debugf("ADS request sent: %v", req)
199	return true
200}
201
202// sendExisting sends out xDS requests for registered watchers when recovering
203// from a broken stream.
204//
205// We call stream.Send() here with the lock being held. It should be OK to do
206// that here because the stream has just started and Send() usually returns
207// quickly (once it pushes the message onto the transport layer) and is only
208// ever blocked if we don't have enough flow control quota.
209func (v2c *v2Client) sendExisting(stream adsStream) bool {
210	v2c.mu.Lock()
211	defer v2c.mu.Unlock()
212
213	// Reset the ack versions when the stream restarts.
214	v2c.versionMap = make(map[string]string)
215	v2c.nonceMap = make(map[string]string)
216
217	for typeURL, wi := range v2c.watchMap {
218		if !v2c.sendRequest(stream, wi.target, typeURL, "", "") {
219			return false
220		}
221	}
222
223	return true
224}
225
226// processWatchInfo pulls the fields needed by the request from a watchInfo.
227//
228// It also calls callback with cached response, and updates the watch map in
229// v2c.
230//
231// If the watch was already canceled, it returns false for send
232func (v2c *v2Client) processWatchInfo(t *watchInfo) (target []string, typeURL, version, nonce string, send bool) {
233	v2c.mu.Lock()
234	defer v2c.mu.Unlock()
235	if t.state == watchCancelled {
236		return // This returns all zero values, and false for send.
237	}
238	t.state = watchStarted
239	send = true
240
241	typeURL = t.typeURL
242	target = t.target
243	v2c.checkCacheAndUpdateWatchMap(t)
244	// TODO: if watch is called again with the same resource names,
245	// there's no need to send another request.
246
247	// We don't reset version or nonce when a new watch is started. The version
248	// and nonce from previous response are carried by the request unless the
249	// stream is recreated.
250	version = v2c.versionMap[typeURL]
251	nonce = v2c.nonceMap[typeURL]
252	return
253}
254
255// processAckInfo pulls the fields needed by the ack request from a ackInfo.
256//
257// If no active watch is found for this ack, it returns false for send.
258func (v2c *v2Client) processAckInfo(t *ackInfo) (target []string, typeURL, version, nonce string, send bool) {
259	typeURL = t.typeURL
260
261	v2c.mu.Lock()
262	defer v2c.mu.Unlock()
263	wi, ok := v2c.watchMap[typeURL]
264	if !ok {
265		// We don't send the request ack if there's no active watch (this can be
266		// either the server sends responses before any request, or the watch is
267		// canceled while the ackInfo is in queue), because there's no resource
268		// name. And if we send a request with empty resource name list, the
269		// server may treat it as a wild card and send us everything.
270		return // This returns all zero values, and false for send.
271	}
272	send = true
273
274	version = t.version
275	nonce = t.nonce
276	target = wi.target
277	if version == "" {
278		// This is a nack, get the previous acked version.
279		version = v2c.versionMap[typeURL]
280		// version will still be an empty string if typeURL isn't
281		// found in versionMap, this can happen if there wasn't any ack
282		// before.
283	} else {
284		v2c.versionMap[typeURL] = version
285	}
286	v2c.nonceMap[typeURL] = nonce
287	return
288}
289
290// send is a separate goroutine for sending watch requests on the xds stream.
291//
292// It watches the stream channel for new streams, and the request channel for
293// new requests to send on the stream.
294//
295// For each new request (watchInfo), it's
296//  - processed and added to the watch map
297//    - so resend will pick them up when there are new streams)
298//  - sent on the current stream if there's one
299//    - the current stream is cleared when any send on it fails
300//
301// For each new stream, all the existing requests will be resent.
302//
303// Note that this goroutine doesn't do anything to the old stream when there's a
304// new one. In fact, there should be only one stream in progress, and new one
305// should only be created when the old one fails (recv returns an error).
306func (v2c *v2Client) send() {
307	var stream adsStream
308	for {
309		select {
310		case <-v2c.ctx.Done():
311			return
312		case newStream := <-v2c.streamCh:
313			stream = newStream
314			if !v2c.sendExisting(stream) {
315				// send failed, clear the current stream.
316				stream = nil
317			}
318		case u := <-v2c.sendCh.Get():
319			v2c.sendCh.Load()
320
321			var (
322				target                  []string
323				typeURL, version, nonce string
324				send                    bool
325			)
326			switch t := u.(type) {
327			case *watchInfo:
328				target, typeURL, version, nonce, send = v2c.processWatchInfo(t)
329			case *ackInfo:
330				target, typeURL, version, nonce, send = v2c.processAckInfo(t)
331			}
332			if !send {
333				continue
334			}
335			if stream == nil {
336				// There's no stream yet. Skip the request. This request
337				// will be resent to the new streams. If no stream is
338				// created, the watcher will timeout (same as server not
339				// sending response back).
340				continue
341			}
342			if !v2c.sendRequest(stream, target, typeURL, version, nonce) {
343				// send failed, clear the current stream.
344				stream = nil
345			}
346		}
347	}
348}
349
350// recv receives xDS responses on the provided ADS stream and branches out to
351// message specific handlers.
352func (v2c *v2Client) recv(stream adsStream) bool {
353	success := false
354	for {
355		resp, err := stream.Recv()
356		// TODO: call watch callbacks with error when stream is broken.
357		if err != nil {
358			v2c.logger.Warningf("ADS stream is closed with error: %v", err)
359			return success
360		}
361		v2c.logger.Infof("ADS response received, type: %v", resp.GetTypeUrl())
362		v2c.logger.Debugf("ADS response received: %v", resp)
363		var respHandleErr error
364		switch resp.GetTypeUrl() {
365		case ldsURL:
366			respHandleErr = v2c.handleLDSResponse(resp)
367		case rdsURL:
368			respHandleErr = v2c.handleRDSResponse(resp)
369		case cdsURL:
370			respHandleErr = v2c.handleCDSResponse(resp)
371		case edsURL:
372			respHandleErr = v2c.handleEDSResponse(resp)
373		default:
374			v2c.logger.Warningf("Resource type %v unknown in response from server", resp.GetTypeUrl())
375			continue
376		}
377
378		typeURL := resp.GetTypeUrl()
379		if respHandleErr != nil {
380			v2c.sendCh.Put(&ackInfo{
381				typeURL: typeURL,
382				version: "",
383				nonce:   resp.GetNonce(),
384			})
385			v2c.logger.Warningf("Sending NACK for response type: %v, version: %v, nonce: %v, reason: %v", typeURL, resp.GetVersionInfo(), resp.GetNonce(), respHandleErr)
386			continue
387		}
388		v2c.sendCh.Put(&ackInfo{
389			typeURL: typeURL,
390			version: resp.GetVersionInfo(),
391			nonce:   resp.GetNonce(),
392		})
393		v2c.logger.Infof("Sending ACK for response type: %v, version: %v, nonce: %v", typeURL, resp.GetVersionInfo(), resp.GetNonce())
394		success = true
395	}
396}
397
398// watchLDS registers an LDS watcher for the provided target. Updates
399// corresponding to received LDS responses will be pushed to the provided
400// callback. The caller can cancel the watch by invoking the returned cancel
401// function.
402// The provided callback should not block or perform any expensive operations
403// or call other methods of the v2Client object.
404func (v2c *v2Client) watchLDS(target string, ldsCb ldsCallback) (cancel func()) {
405	return v2c.watch(&watchInfo{
406		typeURL:  ldsURL,
407		target:   []string{target},
408		callback: ldsCb,
409	})
410}
411
412// watchRDS registers an RDS watcher for the provided routeName. Updates
413// corresponding to received RDS responses will be pushed to the provided
414// callback. The caller can cancel the watch by invoking the returned cancel
415// function.
416// The provided callback should not block or perform any expensive operations
417// or call other methods of the v2Client object.
418func (v2c *v2Client) watchRDS(routeName string, rdsCb rdsCallback) (cancel func()) {
419	return v2c.watch(&watchInfo{
420		typeURL:  rdsURL,
421		target:   []string{routeName},
422		callback: rdsCb,
423	})
424	// TODO: Once a registered RDS watch is cancelled, we should send an RDS
425	// request with no resources. This will let the server know that we are no
426	// longer interested in this resource.
427}
428
429// watchCDS registers an CDS watcher for the provided clusterName. Updates
430// corresponding to received CDS responses will be pushed to the provided
431// callback. The caller can cancel the watch by invoking the returned cancel
432// function.
433// The provided callback should not block or perform any expensive operations
434// or call other methods of the v2Client object.
435func (v2c *v2Client) watchCDS(clusterName string, cdsCb cdsCallback) (cancel func()) {
436	return v2c.watch(&watchInfo{
437		typeURL:  cdsURL,
438		target:   []string{clusterName},
439		callback: cdsCb,
440	})
441}
442
443// watchEDS registers an EDS watcher for the provided clusterName. Updates
444// corresponding to received EDS responses will be pushed to the provided
445// callback. The caller can cancel the watch by invoking the returned cancel
446// function.
447// The provided callback should not block or perform any expensive operations
448// or call other methods of the v2Client object.
449func (v2c *v2Client) watchEDS(clusterName string, edsCb edsCallback) (cancel func()) {
450	return v2c.watch(&watchInfo{
451		typeURL:  edsURL,
452		target:   []string{clusterName},
453		callback: edsCb,
454	})
455	// TODO: Once a registered EDS watch is cancelled, we should send an EDS
456	// request with no resources. This will let the server know that we are no
457	// longer interested in this resource.
458}
459
460func (v2c *v2Client) watch(wi *watchInfo) (cancel func()) {
461	v2c.sendCh.Put(wi)
462	v2c.logger.Infof("Sending ADS request for new watch of type: %v, resource names: %v", wi.typeURL, wi.target)
463	return func() {
464		v2c.mu.Lock()
465		defer v2c.mu.Unlock()
466		if wi.state == watchEnqueued {
467			wi.state = watchCancelled
468			return
469		}
470		v2c.watchMap[wi.typeURL].cancel()
471		delete(v2c.watchMap, wi.typeURL)
472		// TODO: should we reset ack version string when cancelling the watch?
473	}
474}
475
476// checkCacheAndUpdateWatchMap is called when a new watch call is handled in
477// send(). If an existing watcher is found, its expiry timer is stopped. If the
478// watchInfo to be added to the watchMap is found in the cache, the watcher
479// callback is immediately invoked.
480//
481// Caller should hold v2c.mu
482func (v2c *v2Client) checkCacheAndUpdateWatchMap(wi *watchInfo) {
483	if existing := v2c.watchMap[wi.typeURL]; existing != nil {
484		existing.cancel()
485	}
486
487	v2c.watchMap[wi.typeURL] = wi
488	switch wi.typeURL {
489	// We need to grab the lock inside of the expiryTimer's afterFunc because
490	// we need to access the watchInfo, which is stored in the watchMap.
491	case ldsURL:
492		wi.expiryTimer = time.AfterFunc(defaultWatchExpiryTimeout, func() {
493			v2c.mu.Lock()
494			wi.callback.(ldsCallback)(ldsUpdate{}, fmt.Errorf("xds: LDS target %s not found, watcher timeout", wi.target))
495			v2c.mu.Unlock()
496		})
497	case rdsURL:
498		routeName := wi.target[0]
499		if cluster := v2c.rdsCache[routeName]; cluster != "" {
500			var err error
501			if v2c.watchMap[ldsURL] == nil {
502				cluster = ""
503				err = fmt.Errorf("xds: no LDS watcher found when handling RDS watch for route {%v} from cache", routeName)
504			}
505			v2c.logger.Infof("Resource with name %v, type %v found in cache", routeName, wi.typeURL)
506			wi.callback.(rdsCallback)(rdsUpdate{clusterName: cluster}, err)
507			return
508		}
509		// Add the watch expiry timer only for new watches we don't find in
510		// the cache, and return from here.
511		wi.expiryTimer = time.AfterFunc(defaultWatchExpiryTimeout, func() {
512			v2c.mu.Lock()
513			wi.callback.(rdsCallback)(rdsUpdate{}, fmt.Errorf("xds: RDS target %s not found, watcher timeout", wi.target))
514			v2c.mu.Unlock()
515		})
516	case cdsURL:
517		clusterName := wi.target[0]
518		if update, ok := v2c.cdsCache[clusterName]; ok {
519			var err error
520			if v2c.watchMap[cdsURL] == nil {
521				err = fmt.Errorf("xds: no CDS watcher found when handling CDS watch for cluster {%v} from cache", clusterName)
522			}
523			v2c.logger.Infof("Resource with name %v, type %v found in cache", clusterName, wi.typeURL)
524			wi.callback.(cdsCallback)(update, err)
525			return
526		}
527		wi.expiryTimer = time.AfterFunc(defaultWatchExpiryTimeout, func() {
528			v2c.mu.Lock()
529			wi.callback.(cdsCallback)(CDSUpdate{}, fmt.Errorf("xds: CDS target %s not found, watcher timeout", wi.target))
530			v2c.mu.Unlock()
531		})
532	case edsURL:
533		wi.expiryTimer = time.AfterFunc(defaultWatchExpiryTimeout, func() {
534			v2c.mu.Lock()
535			wi.callback.(edsCallback)(nil, fmt.Errorf("xds: EDS target %s not found, watcher timeout", wi.target))
536			v2c.mu.Unlock()
537		})
538	}
539}
540