1/*
2 *
3 * Copyright 2019 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package client
20
21import (
22	"context"
23	"fmt"
24	"sync"
25	"time"
26
27	"google.golang.org/grpc"
28	"google.golang.org/grpc/grpclog"
29	"google.golang.org/grpc/internal/buffer"
30
31	xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
32	corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
33	adsgrpc "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2"
34)
35
36// The value chosen here is based on the default value of the
37// initial_fetch_timeout field in corepb.ConfigSource proto.
38var defaultWatchExpiryTimeout = 15 * time.Second
39
40// v2Client performs the actual xDS RPCs using the xDS v2 API. It creates a
41// single ADS stream on which the different types of xDS requests and responses
42// are multiplexed.
43// The reason for splitting this out from the top level xdsClient object is
44// because there is already an xDS v3Aplha API in development. If and when we
45// want to switch to that, this separation will ease that process.
46type v2Client struct {
47	ctx       context.Context
48	cancelCtx context.CancelFunc
49
50	// ClientConn to the xDS gRPC server. Owned by the parent xdsClient.
51	cc        *grpc.ClientConn
52	nodeProto *corepb.Node
53	backoff   func(int) time.Duration
54
55	// sendCh in the channel onto which watchInfo objects are pushed by the
56	// watch API, and it is read and acted upon by the send() goroutine.
57	sendCh *buffer.Unbounded
58
59	mu sync.Mutex
60	// Message specific watch infos, protected by the above mutex. These are
61	// written to, after successfully reading from the update channel, and are
62	// read from when recovering from a broken stream to resend the xDS
63	// messages. When the user of this client object cancels a watch call,
64	// these are set to nil. All accesses to the map protected and any value
65	// inside the map should be protected with the above mutex.
66	watchMap map[string]*watchInfo
67	// ackMap contains the version that was acked (the version in the ack
68	// request that was sent on wire). The key is typeURL, the value is the
69	// version string, becaues the versions for different resource types
70	// should be independent.
71	ackMap map[string]string
72	// rdsCache maintains a mapping of {routeConfigName --> clusterName} from
73	// validated route configurations received in RDS responses. We cache all
74	// valid route configurations, whether or not we are interested in them
75	// when we received them (because we could become interested in them in the
76	// future and the server wont send us those resources again).
77	// Protected by the above mutex.
78	//
79	// TODO: remove RDS cache. The updated spec says client can ignore
80	// unrequested resources.
81	// https://github.com/envoyproxy/envoy/blob/master/api/xds_protocol.rst#resource-hints
82	rdsCache map[string]string
83	// rdsCache maintains a mapping of {clusterName --> CDSUpdate} from
84	// validated cluster configurations received in CDS responses. We cache all
85	// valid cluster configurations, whether or not we are interested in them
86	// when we received them (because we could become interested in them in the
87	// future and the server wont send us those resources again). This is only
88	// to support legacy management servers that do not honor the
89	// resource_names field. As per the latest spec, the server should resend
90	// the response when the request changes, even if it had sent the same
91	// resource earlier (when not asked for). Protected by the above mutex.
92	cdsCache map[string]CDSUpdate
93}
94
95// newV2Client creates a new v2Client initialized with the passed arguments.
96func newV2Client(cc *grpc.ClientConn, nodeProto *corepb.Node, backoff func(int) time.Duration) *v2Client {
97	v2c := &v2Client{
98		cc:        cc,
99		nodeProto: nodeProto,
100		backoff:   backoff,
101		sendCh:    buffer.NewUnbounded(),
102		watchMap:  make(map[string]*watchInfo),
103		ackMap:    make(map[string]string),
104		rdsCache:  make(map[string]string),
105		cdsCache:  make(map[string]CDSUpdate),
106	}
107	v2c.ctx, v2c.cancelCtx = context.WithCancel(context.Background())
108
109	go v2c.run()
110	return v2c
111}
112
113// close cleans up resources and goroutines allocated by this client.
114func (v2c *v2Client) close() {
115	v2c.cancelCtx()
116}
117
118// run starts an ADS stream (and backs off exponentially, if the previous
119// stream failed without receiving a single reply) and runs the sender and
120// receiver routines to send and receive data from the stream respectively.
121func (v2c *v2Client) run() {
122	retries := 0
123	for {
124		select {
125		case <-v2c.ctx.Done():
126			return
127		default:
128		}
129
130		if retries != 0 {
131			t := time.NewTimer(v2c.backoff(retries))
132			select {
133			case <-t.C:
134			case <-v2c.ctx.Done():
135				if !t.Stop() {
136					<-t.C
137				}
138				return
139			}
140		}
141
142		retries++
143		cli := adsgrpc.NewAggregatedDiscoveryServiceClient(v2c.cc)
144		stream, err := cli.StreamAggregatedResources(v2c.ctx, grpc.WaitForReady(true))
145		if err != nil {
146			grpclog.Infof("xds: ADS stream creation failed: %v", err)
147			continue
148		}
149
150		// send() could be blocked on reading updates from the different update
151		// channels when it is not actually sending out messages. So, we need a
152		// way to break out of send() when recv() returns. This done channel is
153		// used to for that purpose.
154		done := make(chan struct{})
155		go v2c.send(stream, done)
156		if v2c.recv(stream) {
157			retries = 0
158		}
159		close(done)
160	}
161}
162
163// sendRequest sends a request for provided typeURL and resource on the provided
164// stream.
165//
166// version is the ack version to be sent with the request
167// - If this is the new request (not an ack/nack), version will be an empty
168// string
169// - If this is an ack, version will be the version from the response
170// - If this is a nack, version will be the previous acked version (from
171// ackMap). If there was no ack before, it will be an empty string
172func (v2c *v2Client) sendRequest(stream adsStream, resourceNames []string, typeURL, version, nonce string) bool {
173	req := &xdspb.DiscoveryRequest{
174		Node:          v2c.nodeProto,
175		TypeUrl:       typeURL,
176		ResourceNames: resourceNames,
177		VersionInfo:   version,
178		ResponseNonce: nonce,
179		// TODO: populate ErrorDetails for nack.
180	}
181	if err := stream.Send(req); err != nil {
182		grpclog.Warningf("xds: request (type %s) for resource %v failed: %v", typeURL, resourceNames, err)
183		return false
184	}
185	return true
186}
187
188// sendExisting sends out xDS requests for registered watchers when recovering
189// from a broken stream.
190//
191// We call stream.Send() here with the lock being held. It should be OK to do
192// that here because the stream has just started and Send() usually returns
193// quickly (once it pushes the message onto the transport layer) and is only
194// ever blocked if we don't have enough flow control quota.
195func (v2c *v2Client) sendExisting(stream adsStream) bool {
196	v2c.mu.Lock()
197	defer v2c.mu.Unlock()
198
199	// Reset the ack versions when the stream restarts.
200	v2c.ackMap = make(map[string]string)
201
202	for typeURL, wi := range v2c.watchMap {
203		if !v2c.sendRequest(stream, wi.target, typeURL, "", "") {
204			return false
205		}
206	}
207
208	return true
209}
210
211// processWatchInfo pulls the fields needed by the request from a watchInfo.
212//
213// It also calls callback with cached response, and updates the watch map in
214// v2c.
215//
216// If the watch was already canceled, it returns false for send
217func (v2c *v2Client) processWatchInfo(t *watchInfo) (target []string, typeURL, version, nonce string, send bool) {
218	v2c.mu.Lock()
219	defer v2c.mu.Unlock()
220	if t.state == watchCancelled {
221		return // This returns all zero values, and false for send.
222	}
223	t.state = watchStarted
224	send = true
225
226	typeURL = t.typeURL
227	target = t.target
228	v2c.checkCacheAndUpdateWatchMap(t)
229	// TODO: if watch is called again with the same resource names,
230	// there's no need to send another request.
231	//
232	// TODO: should we reset version (for ack) when a new watch is
233	// started? Or do this only if the resource names are different
234	// (so we send a new request)?
235	return
236}
237
238// processAckInfo pulls the fields needed by the ack request from a ackInfo.
239//
240// If no active watch is found for this ack, it returns false for send.
241func (v2c *v2Client) processAckInfo(t *ackInfo) (target []string, typeURL, version, nonce string, send bool) {
242	typeURL = t.typeURL
243
244	v2c.mu.Lock()
245	defer v2c.mu.Unlock()
246	wi, ok := v2c.watchMap[typeURL]
247	if !ok {
248		// We don't send the request ack if there's no active watch (this can be
249		// either the server sends responses before any request, or the watch is
250		// canceled while the ackInfo is in queue), because there's no resource
251		// name. And if we send a request with empty resource name list, the
252		// server may treat it as a wild card and send us everything.
253		grpclog.Warningf("xds: ack (type %s) not sent because there's no active watch for the type", typeURL)
254		return // This returns all zero values, and false for send.
255	}
256	send = true
257
258	version = t.version
259	nonce = t.nonce
260	target = wi.target
261	if version == "" {
262		// This is a nack, get the previous acked version.
263		version = v2c.ackMap[typeURL]
264		// version will still be an empty string if typeURL isn't
265		// found in ackMap, this can happen if there wasn't any ack
266		// before.
267	} else {
268		v2c.ackMap[typeURL] = version
269	}
270	return
271}
272
273// send reads watch infos from update channel and sends out actual xDS requests
274// on the provided ADS stream.
275func (v2c *v2Client) send(stream adsStream, done chan struct{}) {
276	if !v2c.sendExisting(stream) {
277		return
278	}
279
280	for {
281		select {
282		case <-v2c.ctx.Done():
283			return
284		case u := <-v2c.sendCh.Get():
285			v2c.sendCh.Load()
286
287			var (
288				target                  []string
289				typeURL, version, nonce string
290				send                    bool
291			)
292			switch t := u.(type) {
293			case *watchInfo:
294				target, typeURL, version, nonce, send = v2c.processWatchInfo(t)
295			case *ackInfo:
296				target, typeURL, version, nonce, send = v2c.processAckInfo(t)
297			}
298			if !send {
299				continue
300			}
301			if !v2c.sendRequest(stream, target, typeURL, version, nonce) {
302				return
303			}
304		case <-done:
305			return
306		}
307	}
308}
309
310// recv receives xDS responses on the provided ADS stream and branches out to
311// message specific handlers.
312func (v2c *v2Client) recv(stream adsStream) bool {
313	success := false
314	for {
315		resp, err := stream.Recv()
316		// TODO: call watch callbacks with error when stream is broken.
317		if err != nil {
318			grpclog.Warningf("xds: ADS stream recv failed: %v", err)
319			return success
320		}
321		var respHandleErr error
322		switch resp.GetTypeUrl() {
323		case ldsURL:
324			respHandleErr = v2c.handleLDSResponse(resp)
325		case rdsURL:
326			respHandleErr = v2c.handleRDSResponse(resp)
327		case cdsURL:
328			respHandleErr = v2c.handleCDSResponse(resp)
329		case edsURL:
330			respHandleErr = v2c.handleEDSResponse(resp)
331		default:
332			grpclog.Warningf("xds: unknown response URL type: %v", resp.GetTypeUrl())
333			continue
334		}
335
336		typeURL := resp.GetTypeUrl()
337		if respHandleErr != nil {
338			grpclog.Warningf("xds: response (type %s) handler failed: %v", typeURL, respHandleErr)
339			v2c.sendCh.Put(&ackInfo{
340				typeURL: typeURL,
341				version: "",
342				nonce:   resp.GetNonce(),
343			})
344			continue
345		}
346		v2c.sendCh.Put(&ackInfo{
347			typeURL: typeURL,
348			version: resp.GetVersionInfo(),
349			nonce:   resp.GetNonce(),
350		})
351		success = true
352	}
353}
354
355// watchLDS registers an LDS watcher for the provided target. Updates
356// corresponding to received LDS responses will be pushed to the provided
357// callback. The caller can cancel the watch by invoking the returned cancel
358// function.
359// The provided callback should not block or perform any expensive operations
360// or call other methods of the v2Client object.
361func (v2c *v2Client) watchLDS(target string, ldsCb ldsCallback) (cancel func()) {
362	return v2c.watch(&watchInfo{
363		typeURL:  ldsURL,
364		target:   []string{target},
365		callback: ldsCb,
366	})
367}
368
369// watchRDS registers an RDS watcher for the provided routeName. Updates
370// corresponding to received RDS responses will be pushed to the provided
371// callback. The caller can cancel the watch by invoking the returned cancel
372// function.
373// The provided callback should not block or perform any expensive operations
374// or call other methods of the v2Client object.
375func (v2c *v2Client) watchRDS(routeName string, rdsCb rdsCallback) (cancel func()) {
376	return v2c.watch(&watchInfo{
377		typeURL:  rdsURL,
378		target:   []string{routeName},
379		callback: rdsCb,
380	})
381	// TODO: Once a registered RDS watch is cancelled, we should send an RDS
382	// request with no resources. This will let the server know that we are no
383	// longer interested in this resource.
384}
385
386// watchCDS registers an CDS watcher for the provided clusterName. Updates
387// corresponding to received CDS responses will be pushed to the provided
388// callback. The caller can cancel the watch by invoking the returned cancel
389// function.
390// The provided callback should not block or perform any expensive operations
391// or call other methods of the v2Client object.
392func (v2c *v2Client) watchCDS(clusterName string, cdsCb cdsCallback) (cancel func()) {
393	return v2c.watch(&watchInfo{
394		typeURL:  cdsURL,
395		target:   []string{clusterName},
396		callback: cdsCb,
397	})
398}
399
400// watchEDS registers an EDS watcher for the provided clusterName. Updates
401// corresponding to received EDS responses will be pushed to the provided
402// callback. The caller can cancel the watch by invoking the returned cancel
403// function.
404// The provided callback should not block or perform any expensive operations
405// or call other methods of the v2Client object.
406func (v2c *v2Client) watchEDS(clusterName string, edsCb edsCallback) (cancel func()) {
407	return v2c.watch(&watchInfo{
408		typeURL:  edsURL,
409		target:   []string{clusterName},
410		callback: edsCb,
411	})
412	// TODO: Once a registered EDS watch is cancelled, we should send an EDS
413	// request with no resources. This will let the server know that we are no
414	// longer interested in this resource.
415}
416
417func (v2c *v2Client) watch(wi *watchInfo) (cancel func()) {
418	v2c.sendCh.Put(wi)
419	return func() {
420		v2c.mu.Lock()
421		defer v2c.mu.Unlock()
422		if wi.state == watchEnqueued {
423			wi.state = watchCancelled
424			return
425		}
426		v2c.watchMap[wi.typeURL].cancel()
427		delete(v2c.watchMap, wi.typeURL)
428		// TODO: should we reset ack version string when cancelling the watch?
429	}
430}
431
432// checkCacheAndUpdateWatchMap is called when a new watch call is handled in
433// send(). If an existing watcher is found, its expiry timer is stopped. If the
434// watchInfo to be added to the watchMap is found in the cache, the watcher
435// callback is immediately invoked.
436//
437// Caller should hold v2c.mu
438func (v2c *v2Client) checkCacheAndUpdateWatchMap(wi *watchInfo) {
439	if existing := v2c.watchMap[wi.typeURL]; existing != nil {
440		existing.cancel()
441	}
442
443	v2c.watchMap[wi.typeURL] = wi
444	switch wi.typeURL {
445	// We need to grab the lock inside of the expiryTimer's afterFunc because
446	// we need to access the watchInfo, which is stored in the watchMap.
447	case ldsURL:
448		wi.expiryTimer = time.AfterFunc(defaultWatchExpiryTimeout, func() {
449			v2c.mu.Lock()
450			wi.callback.(ldsCallback)(ldsUpdate{}, fmt.Errorf("xds: LDS target %s not found", wi.target))
451			v2c.mu.Unlock()
452		})
453	case rdsURL:
454		routeName := wi.target[0]
455		if cluster := v2c.rdsCache[routeName]; cluster != "" {
456			var err error
457			if v2c.watchMap[ldsURL] == nil {
458				cluster = ""
459				err = fmt.Errorf("xds: no LDS watcher found when handling RDS watch for route {%v} from cache", routeName)
460			}
461			wi.callback.(rdsCallback)(rdsUpdate{clusterName: cluster}, err)
462			return
463		}
464		// Add the watch expiry timer only for new watches we don't find in
465		// the cache, and return from here.
466		wi.expiryTimer = time.AfterFunc(defaultWatchExpiryTimeout, func() {
467			v2c.mu.Lock()
468			wi.callback.(rdsCallback)(rdsUpdate{}, fmt.Errorf("xds: RDS target %s not found", wi.target))
469			v2c.mu.Unlock()
470		})
471	case cdsURL:
472		clusterName := wi.target[0]
473		if update, ok := v2c.cdsCache[clusterName]; ok {
474			var err error
475			if v2c.watchMap[cdsURL] == nil {
476				err = fmt.Errorf("xds: no CDS watcher found when handling CDS watch for cluster {%v} from cache", clusterName)
477			}
478			wi.callback.(cdsCallback)(update, err)
479			return
480		}
481		wi.expiryTimer = time.AfterFunc(defaultWatchExpiryTimeout, func() {
482			v2c.mu.Lock()
483			wi.callback.(cdsCallback)(CDSUpdate{}, fmt.Errorf("xds: CDS target %s not found", wi.target))
484			v2c.mu.Unlock()
485		})
486	case edsURL:
487		wi.expiryTimer = time.AfterFunc(defaultWatchExpiryTimeout, func() {
488			v2c.mu.Lock()
489			wi.callback.(edsCallback)(nil, fmt.Errorf("xds: EDS target %s not found", wi.target))
490			v2c.mu.Unlock()
491		})
492	}
493}
494