1/*
2 *
3 * Copyright 2020 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package client
20
21import (
22	"context"
23	"sync"
24	"time"
25
26	"github.com/golang/protobuf/proto"
27	"google.golang.org/grpc/xds/internal/client/load"
28
29	"google.golang.org/grpc"
30	"google.golang.org/grpc/internal/buffer"
31	"google.golang.org/grpc/internal/grpclog"
32)
33
34// ErrResourceTypeUnsupported is an error used to indicate an unsupported xDS
35// resource type. The wrapped ErrStr contains the details.
36type ErrResourceTypeUnsupported struct {
37	ErrStr string
38}
39
40// Error helps implements the error interface.
41func (e ErrResourceTypeUnsupported) Error() string {
42	return e.ErrStr
43}
44
45// VersionedClient is the interface to be provided by the transport protocol
46// specific client implementations. This mainly deals with the actual sending
47// and receiving of messages.
48type VersionedClient interface {
49	// NewStream returns a new xDS client stream specific to the underlying
50	// transport protocol version.
51	NewStream(ctx context.Context) (grpc.ClientStream, error)
52
53	// SendRequest constructs and sends out a DiscoveryRequest message specific
54	// to the underlying transport protocol version.
55	SendRequest(s grpc.ClientStream, resourceNames []string, rType ResourceType, version, nonce, errMsg string) error
56
57	// RecvResponse uses the provided stream to receive a response specific to
58	// the underlying transport protocol version.
59	RecvResponse(s grpc.ClientStream) (proto.Message, error)
60
61	// HandleResponse parses and validates the received response and notifies
62	// the top-level client which in turn notifies the registered watchers.
63	//
64	// Return values are: resourceType, version, nonce, error.
65	// If the provided protobuf message contains a resource type which is not
66	// supported, implementations must return an error of type
67	// ErrResourceTypeUnsupported.
68	HandleResponse(proto.Message) (ResourceType, string, string, error)
69
70	// NewLoadStatsStream returns a new LRS client stream specific to the underlying
71	// transport protocol version.
72	NewLoadStatsStream(ctx context.Context, cc *grpc.ClientConn) (grpc.ClientStream, error)
73
74	// SendFirstLoadStatsRequest constructs and sends the first request on the
75	// LRS stream.
76	SendFirstLoadStatsRequest(s grpc.ClientStream) error
77
78	// HandleLoadStatsResponse receives the first response from the server which
79	// contains the load reporting interval and the clusters for which the
80	// server asks the client to report load for.
81	//
82	// If the response sets SendAllClusters to true, the returned clusters is
83	// nil.
84	HandleLoadStatsResponse(s grpc.ClientStream) (clusters []string, _ time.Duration, _ error)
85
86	// SendLoadStatsRequest will be invoked at regular intervals to send load
87	// report with load data reported since the last time this method was
88	// invoked.
89	SendLoadStatsRequest(s grpc.ClientStream, loads []*load.Data) error
90}
91
92// TransportHelper contains all xDS transport protocol related functionality
93// which is common across different versioned client implementations.
94//
95// TransportHelper takes care of sending and receiving xDS requests and
96// responses on an ADS stream. It also takes care of ACK/NACK handling. It
97// delegates to the actual versioned client implementations wherever
98// appropriate.
99//
100// Implements the APIClient interface which makes it possible for versioned
101// client implementations to embed this type, and thereby satisfy the interface
102// requirements.
103type TransportHelper struct {
104	cancelCtx context.CancelFunc
105
106	vClient  VersionedClient
107	logger   *grpclog.PrefixLogger
108	backoff  func(int) time.Duration
109	streamCh chan grpc.ClientStream
110	sendCh   *buffer.Unbounded
111
112	mu sync.Mutex
113	// Message specific watch infos, protected by the above mutex. These are
114	// written to, after successfully reading from the update channel, and are
115	// read from when recovering from a broken stream to resend the xDS
116	// messages. When the user of this client object cancels a watch call,
117	// these are set to nil. All accesses to the map protected and any value
118	// inside the map should be protected with the above mutex.
119	watchMap map[ResourceType]map[string]bool
120	// versionMap contains the version that was acked (the version in the ack
121	// request that was sent on wire). The key is rType, the value is the
122	// version string, becaues the versions for different resource types should
123	// be independent.
124	versionMap map[ResourceType]string
125	// nonceMap contains the nonce from the most recent received response.
126	nonceMap map[ResourceType]string
127}
128
129// NewTransportHelper creates a new transport helper to be used by versioned
130// client implementations.
131func NewTransportHelper(vc VersionedClient, logger *grpclog.PrefixLogger, backoff func(int) time.Duration) *TransportHelper {
132	ctx, cancelCtx := context.WithCancel(context.Background())
133	t := &TransportHelper{
134		cancelCtx: cancelCtx,
135		vClient:   vc,
136		logger:    logger,
137		backoff:   backoff,
138
139		streamCh:   make(chan grpc.ClientStream, 1),
140		sendCh:     buffer.NewUnbounded(),
141		watchMap:   make(map[ResourceType]map[string]bool),
142		versionMap: make(map[ResourceType]string),
143		nonceMap:   make(map[ResourceType]string),
144	}
145
146	go t.run(ctx)
147	return t
148}
149
150// AddWatch adds a watch for an xDS resource given its type and name.
151func (t *TransportHelper) AddWatch(rType ResourceType, resourceName string) {
152	t.sendCh.Put(&watchAction{
153		rType:    rType,
154		remove:   false,
155		resource: resourceName,
156	})
157}
158
159// RemoveWatch cancels an already registered watch for an xDS resource
160// given its type and name.
161func (t *TransportHelper) RemoveWatch(rType ResourceType, resourceName string) {
162	t.sendCh.Put(&watchAction{
163		rType:    rType,
164		remove:   true,
165		resource: resourceName,
166	})
167}
168
169// Close closes the transport helper.
170func (t *TransportHelper) Close() {
171	t.cancelCtx()
172}
173
174// run starts an ADS stream (and backs off exponentially, if the previous
175// stream failed without receiving a single reply) and runs the sender and
176// receiver routines to send and receive data from the stream respectively.
177func (t *TransportHelper) run(ctx context.Context) {
178	go t.send(ctx)
179	// TODO: start a goroutine monitoring ClientConn's connectivity state, and
180	// report error (and log) when stats is transient failure.
181
182	retries := 0
183	for {
184		select {
185		case <-ctx.Done():
186			return
187		default:
188		}
189
190		if retries != 0 {
191			timer := time.NewTimer(t.backoff(retries))
192			select {
193			case <-timer.C:
194			case <-ctx.Done():
195				if !timer.Stop() {
196					<-timer.C
197				}
198				return
199			}
200		}
201
202		retries++
203		stream, err := t.vClient.NewStream(ctx)
204		if err != nil {
205			t.logger.Warningf("xds: ADS stream creation failed: %v", err)
206			continue
207		}
208		t.logger.Infof("ADS stream created")
209
210		select {
211		case <-t.streamCh:
212		default:
213		}
214		t.streamCh <- stream
215		if t.recv(stream) {
216			retries = 0
217		}
218	}
219}
220
221// send is a separate goroutine for sending watch requests on the xds stream.
222//
223// It watches the stream channel for new streams, and the request channel for
224// new requests to send on the stream.
225//
226// For each new request (watchAction), it's
227//  - processed and added to the watch map
228//    - so resend will pick them up when there are new streams
229//  - sent on the current stream if there's one
230//    - the current stream is cleared when any send on it fails
231//
232// For each new stream, all the existing requests will be resent.
233//
234// Note that this goroutine doesn't do anything to the old stream when there's a
235// new one. In fact, there should be only one stream in progress, and new one
236// should only be created when the old one fails (recv returns an error).
237func (t *TransportHelper) send(ctx context.Context) {
238	var stream grpc.ClientStream
239	for {
240		select {
241		case <-ctx.Done():
242			return
243		case stream = <-t.streamCh:
244			if !t.sendExisting(stream) {
245				// send failed, clear the current stream.
246				stream = nil
247			}
248		case u := <-t.sendCh.Get():
249			t.sendCh.Load()
250
251			var (
252				target                 []string
253				rType                  ResourceType
254				version, nonce, errMsg string
255				send                   bool
256			)
257			switch update := u.(type) {
258			case *watchAction:
259				target, rType, version, nonce = t.processWatchInfo(update)
260			case *ackAction:
261				target, rType, version, nonce, send = t.processAckInfo(update, stream)
262				if !send {
263					continue
264				}
265				errMsg = update.errMsg
266			}
267			if stream == nil {
268				// There's no stream yet. Skip the request. This request
269				// will be resent to the new streams. If no stream is
270				// created, the watcher will timeout (same as server not
271				// sending response back).
272				continue
273			}
274			if err := t.vClient.SendRequest(stream, target, rType, version, nonce, errMsg); err != nil {
275				t.logger.Warningf("ADS request for {target: %q, type: %v, version: %q, nonce: %q} failed: %v", target, rType, version, nonce, err)
276				// send failed, clear the current stream.
277				stream = nil
278			}
279		}
280	}
281}
282
283// sendExisting sends out xDS requests for registered watchers when recovering
284// from a broken stream.
285//
286// We call stream.Send() here with the lock being held. It should be OK to do
287// that here because the stream has just started and Send() usually returns
288// quickly (once it pushes the message onto the transport layer) and is only
289// ever blocked if we don't have enough flow control quota.
290func (t *TransportHelper) sendExisting(stream grpc.ClientStream) bool {
291	t.mu.Lock()
292	defer t.mu.Unlock()
293
294	// Reset the ack versions when the stream restarts.
295	t.versionMap = make(map[ResourceType]string)
296	t.nonceMap = make(map[ResourceType]string)
297
298	for rType, s := range t.watchMap {
299		if err := t.vClient.SendRequest(stream, mapToSlice(s), rType, "", "", ""); err != nil {
300			t.logger.Warningf("ADS request failed: %v", err)
301			return false
302		}
303	}
304
305	return true
306}
307
308// recv receives xDS responses on the provided ADS stream and branches out to
309// message specific handlers.
310func (t *TransportHelper) recv(stream grpc.ClientStream) bool {
311	success := false
312	for {
313		resp, err := t.vClient.RecvResponse(stream)
314		if err != nil {
315			t.logger.Warningf("ADS stream is closed with error: %v", err)
316			return success
317		}
318		rType, version, nonce, err := t.vClient.HandleResponse(resp)
319		if e, ok := err.(ErrResourceTypeUnsupported); ok {
320			t.logger.Warningf("%s", e.ErrStr)
321			continue
322		}
323		if err != nil {
324			t.sendCh.Put(&ackAction{
325				rType:   rType,
326				version: "",
327				nonce:   nonce,
328				errMsg:  err.Error(),
329				stream:  stream,
330			})
331			t.logger.Warningf("Sending NACK for response type: %v, version: %v, nonce: %v, reason: %v", rType, version, nonce, err)
332			continue
333		}
334		t.sendCh.Put(&ackAction{
335			rType:   rType,
336			version: version,
337			nonce:   nonce,
338			stream:  stream,
339		})
340		t.logger.Infof("Sending ACK for response type: %v, version: %v, nonce: %v", rType, version, nonce)
341		success = true
342	}
343}
344
345func mapToSlice(m map[string]bool) (ret []string) {
346	for i := range m {
347		ret = append(ret, i)
348	}
349	return
350}
351
352type watchAction struct {
353	rType    ResourceType
354	remove   bool // Whether this is to remove watch for the resource.
355	resource string
356}
357
358// processWatchInfo pulls the fields needed by the request from a watchAction.
359//
360// It also updates the watch map.
361func (t *TransportHelper) processWatchInfo(w *watchAction) (target []string, rType ResourceType, ver, nonce string) {
362	t.mu.Lock()
363	defer t.mu.Unlock()
364
365	var current map[string]bool
366	current, ok := t.watchMap[w.rType]
367	if !ok {
368		current = make(map[string]bool)
369		t.watchMap[w.rType] = current
370	}
371
372	if w.remove {
373		delete(current, w.resource)
374		if len(current) == 0 {
375			delete(t.watchMap, w.rType)
376		}
377	} else {
378		current[w.resource] = true
379	}
380
381	rType = w.rType
382	target = mapToSlice(current)
383	// We don't reset version or nonce when a new watch is started. The version
384	// and nonce from previous response are carried by the request unless the
385	// stream is recreated.
386	ver = t.versionMap[rType]
387	nonce = t.nonceMap[rType]
388	return target, rType, ver, nonce
389}
390
391type ackAction struct {
392	rType   ResourceType
393	version string // NACK if version is an empty string.
394	nonce   string
395	errMsg  string // Empty unless it's a NACK.
396	// ACK/NACK are tagged with the stream it's for. When the stream is down,
397	// all the ACK/NACK for this stream will be dropped, and the version/nonce
398	// won't be updated.
399	stream grpc.ClientStream
400}
401
402// processAckInfo pulls the fields needed by the ack request from a ackAction.
403//
404// If no active watch is found for this ack, it returns false for send.
405func (t *TransportHelper) processAckInfo(ack *ackAction, stream grpc.ClientStream) (target []string, rType ResourceType, version, nonce string, send bool) {
406	if ack.stream != stream {
407		// If ACK's stream isn't the current sending stream, this means the ACK
408		// was pushed to queue before the old stream broke, and a new stream has
409		// been started since. Return immediately here so we don't update the
410		// nonce for the new stream.
411		return nil, UnknownResource, "", "", false
412	}
413	rType = ack.rType
414
415	t.mu.Lock()
416	defer t.mu.Unlock()
417
418	// Update the nonce no matter if we are going to send the ACK request on
419	// wire. We may not send the request if the watch is canceled. But the nonce
420	// needs to be updated so the next request will have the right nonce.
421	nonce = ack.nonce
422	t.nonceMap[rType] = nonce
423
424	s, ok := t.watchMap[rType]
425	if !ok || len(s) == 0 {
426		// We don't send the request ack if there's no active watch (this can be
427		// either the server sends responses before any request, or the watch is
428		// canceled while the ackAction is in queue), because there's no resource
429		// name. And if we send a request with empty resource name list, the
430		// server may treat it as a wild card and send us everything.
431		return nil, UnknownResource, "", "", false
432	}
433	send = true
434	target = mapToSlice(s)
435
436	version = ack.version
437	if version == "" {
438		// This is a nack, get the previous acked version.
439		version = t.versionMap[rType]
440		// version will still be an empty string if rType isn't
441		// found in versionMap, this can happen if there wasn't any ack
442		// before.
443	} else {
444		t.versionMap[rType] = version
445	}
446	return target, rType, version, nonce, send
447}
448
449// reportLoad starts an LRS stream to report load data to the management server.
450// It blocks until the context is cancelled.
451func (t *TransportHelper) reportLoad(ctx context.Context, cc *grpc.ClientConn, opts loadReportingOptions) {
452	retries := 0
453	for {
454		if ctx.Err() != nil {
455			return
456		}
457
458		if retries != 0 {
459			timer := time.NewTimer(t.backoff(retries))
460			select {
461			case <-timer.C:
462			case <-ctx.Done():
463				if !timer.Stop() {
464					<-timer.C
465				}
466				return
467			}
468		}
469
470		retries++
471		stream, err := t.vClient.NewLoadStatsStream(ctx, cc)
472		if err != nil {
473			logger.Warningf("lrs: failed to create stream: %v", err)
474			continue
475		}
476		logger.Infof("lrs: created LRS stream")
477
478		if err := t.vClient.SendFirstLoadStatsRequest(stream); err != nil {
479			logger.Warningf("lrs: failed to send first request: %v", err)
480			continue
481		}
482
483		clusters, interval, err := t.vClient.HandleLoadStatsResponse(stream)
484		if err != nil {
485			logger.Warning(err)
486			continue
487		}
488
489		retries = 0
490		t.sendLoads(ctx, stream, opts.loadStore, clusters, interval)
491	}
492}
493
494func (t *TransportHelper) sendLoads(ctx context.Context, stream grpc.ClientStream, store *load.Store, clusterNames []string, interval time.Duration) {
495	tick := time.NewTicker(interval)
496	defer tick.Stop()
497	for {
498		select {
499		case <-tick.C:
500		case <-ctx.Done():
501			return
502		}
503		if err := t.vClient.SendLoadStatsRequest(stream, store.Stats(clusterNames)); err != nil {
504			logger.Warning(err)
505			return
506		}
507	}
508}
509