1// Code generated by create_version. DO NOT EDIT.
2// Copyright 2020 Envoyproxy Authors
3//
4//   Licensed under the Apache License, Version 2.0 (the "License");
5//   you may not use this file except in compliance with the License.
6//   You may obtain a copy of the License at
7//
8//       http://www.apache.org/licenses/LICENSE-2.0
9//
10//   Unless required by applicable law or agreed to in writing, software
11//   distributed under the License is distributed on an "AS IS" BASIS,
12//   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13//   See the License for the specific language governing permissions and
14//   limitations under the License.
15
16// Package sotw provides an implementation of GRPC SoTW (State of The World) part of XDS server
17package sotw
18
19import (
20	"context"
21	"errors"
22	"strconv"
23	"sync/atomic"
24
25	"google.golang.org/grpc"
26	"google.golang.org/grpc/codes"
27	"google.golang.org/grpc/status"
28
29	core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
30	discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
31	"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
32	"github.com/envoyproxy/go-control-plane/pkg/resource/v3"
33)
34
35type Server interface {
36	StreamHandler(stream Stream, typeURL string) error
37}
38
39type Callbacks interface {
40	// OnStreamOpen is called once an xDS stream is open with a stream ID and the type URL (or "" for ADS).
41	// Returning an error will end processing and close the stream. OnStreamClosed will still be called.
42	OnStreamOpen(context.Context, int64, string) error
43	// OnStreamClosed is called immediately prior to closing an xDS stream with a stream ID.
44	OnStreamClosed(int64)
45	// OnStreamRequest is called once a request is received on a stream.
46	// Returning an error will end processing and close the stream. OnStreamClosed will still be called.
47	OnStreamRequest(int64, *discovery.DiscoveryRequest) error
48	// OnStreamResponse is called immediately prior to sending a response on a stream.
49	OnStreamResponse(int64, *discovery.DiscoveryRequest, *discovery.DiscoveryResponse)
50}
51
52// NewServer creates handlers from a config watcher and callbacks.
53func NewServer(ctx context.Context, config cache.ConfigWatcher, callbacks Callbacks) Server {
54	return &server{cache: config, callbacks: callbacks, ctx: ctx}
55}
56
57type server struct {
58	cache     cache.ConfigWatcher
59	callbacks Callbacks
60	ctx       context.Context
61
62	// streamCount for counting bi-di streams
63	streamCount int64
64}
65
66// Generic RPC stream.
67type Stream interface {
68	grpc.ServerStream
69
70	Send(*discovery.DiscoveryResponse) error
71	Recv() (*discovery.DiscoveryRequest, error)
72}
73
74// watches for all xDS resource types
75type watches struct {
76	endpoints chan cache.Response
77	clusters  chan cache.Response
78	routes    chan cache.Response
79	listeners chan cache.Response
80	secrets   chan cache.Response
81	runtimes  chan cache.Response
82
83	endpointCancel func()
84	clusterCancel  func()
85	routeCancel    func()
86	listenerCancel func()
87	secretCancel   func()
88	runtimeCancel  func()
89
90	endpointNonce string
91	clusterNonce  string
92	routeNonce    string
93	listenerNonce string
94	secretNonce   string
95	runtimeNonce  string
96
97	// Opaque resources share a muxed channel. Nonces and watch cancellations are indexed by type URL.
98	responses     chan cache.Response
99	cancellations map[string]func()
100	nonces        map[string]string
101	terminations  map[string]chan struct{}
102}
103
104// Initialize all watches
105func (values *watches) Init() {
106	// muxed channel needs a buffer to release go-routines populating it
107	values.responses = make(chan cache.Response, 5)
108	values.cancellations = make(map[string]func())
109	values.nonces = make(map[string]string)
110	values.terminations = make(map[string]chan struct{})
111}
112
113// Token response value used to signal a watch failure in muxed watches.
114var errorResponse = &cache.RawResponse{}
115
116// Cancel all watches
117func (values *watches) Cancel() {
118	if values.endpointCancel != nil {
119		values.endpointCancel()
120	}
121	if values.clusterCancel != nil {
122		values.clusterCancel()
123	}
124	if values.routeCancel != nil {
125		values.routeCancel()
126	}
127	if values.listenerCancel != nil {
128		values.listenerCancel()
129	}
130	if values.secretCancel != nil {
131		values.secretCancel()
132	}
133	if values.runtimeCancel != nil {
134		values.runtimeCancel()
135	}
136	for _, cancel := range values.cancellations {
137		if cancel != nil {
138			cancel()
139		}
140	}
141	for _, terminate := range values.terminations {
142		close(terminate)
143	}
144}
145
146// process handles a bi-di stream request
147func (s *server) process(stream Stream, reqCh <-chan *discovery.DiscoveryRequest, defaultTypeURL string) error {
148	// increment stream count
149	streamID := atomic.AddInt64(&s.streamCount, 1)
150
151	// unique nonce generator for req-resp pairs per xDS stream; the server
152	// ignores stale nonces. nonce is only modified within send() function.
153	var streamNonce int64
154
155	// a collection of stack allocated watches per request type
156	var values watches
157	values.Init()
158	defer func() {
159		values.Cancel()
160		if s.callbacks != nil {
161			s.callbacks.OnStreamClosed(streamID)
162		}
163	}()
164
165	// sends a response by serializing to protobuf Any
166	send := func(resp cache.Response, typeURL string) (string, error) {
167		if resp == nil {
168			return "", errors.New("missing response")
169		}
170
171		out, err := resp.GetDiscoveryResponse()
172		if err != nil {
173			return "", err
174		}
175
176		// increment nonce
177		streamNonce = streamNonce + 1
178		out.Nonce = strconv.FormatInt(streamNonce, 10)
179		if s.callbacks != nil {
180			s.callbacks.OnStreamResponse(streamID, resp.GetRequest(), out)
181		}
182		return out.Nonce, stream.Send(out)
183	}
184
185	if s.callbacks != nil {
186		if err := s.callbacks.OnStreamOpen(stream.Context(), streamID, defaultTypeURL); err != nil {
187			return err
188		}
189	}
190
191	// node may only be set on the first discovery request
192	var node = &core.Node{}
193
194	for {
195		select {
196		case <-s.ctx.Done():
197			return nil
198		// config watcher can send the requested resources types in any order
199		case resp, more := <-values.endpoints:
200			if !more {
201				return status.Errorf(codes.Unavailable, "endpoints watch failed")
202			}
203			nonce, err := send(resp, resource.EndpointType)
204			if err != nil {
205				return err
206			}
207			values.endpointNonce = nonce
208
209		case resp, more := <-values.clusters:
210			if !more {
211				return status.Errorf(codes.Unavailable, "clusters watch failed")
212			}
213			nonce, err := send(resp, resource.ClusterType)
214			if err != nil {
215				return err
216			}
217			values.clusterNonce = nonce
218
219		case resp, more := <-values.routes:
220			if !more {
221				return status.Errorf(codes.Unavailable, "routes watch failed")
222			}
223			nonce, err := send(resp, resource.RouteType)
224			if err != nil {
225				return err
226			}
227			values.routeNonce = nonce
228
229		case resp, more := <-values.listeners:
230			if !more {
231				return status.Errorf(codes.Unavailable, "listeners watch failed")
232			}
233			nonce, err := send(resp, resource.ListenerType)
234			if err != nil {
235				return err
236			}
237			values.listenerNonce = nonce
238
239		case resp, more := <-values.secrets:
240			if !more {
241				return status.Errorf(codes.Unavailable, "secrets watch failed")
242			}
243			nonce, err := send(resp, resource.SecretType)
244			if err != nil {
245				return err
246			}
247			values.secretNonce = nonce
248
249		case resp, more := <-values.runtimes:
250			if !more {
251				return status.Errorf(codes.Unavailable, "runtimes watch failed")
252			}
253			nonce, err := send(resp, resource.RuntimeType)
254			if err != nil {
255				return err
256			}
257			values.runtimeNonce = nonce
258
259		case resp, more := <-values.responses:
260			if more {
261				if resp == errorResponse {
262					return status.Errorf(codes.Unavailable, "resource watch failed")
263				}
264				typeUrl := resp.GetRequest().TypeUrl
265				nonce, err := send(resp, typeUrl)
266				if err != nil {
267					return err
268				}
269				values.nonces[typeUrl] = nonce
270			}
271
272		case req, more := <-reqCh:
273			// input stream ended or errored out
274			if !more {
275				return nil
276			}
277			if req == nil {
278				return status.Errorf(codes.Unavailable, "empty request")
279			}
280
281			// node field in discovery request is delta-compressed
282			if req.Node != nil {
283				node = req.Node
284			} else {
285				req.Node = node
286			}
287
288			// nonces can be reused across streams; we verify nonce only if nonce is not initialized
289			nonce := req.GetResponseNonce()
290
291			// type URL is required for ADS but is implicit for xDS
292			if defaultTypeURL == resource.AnyType {
293				if req.TypeUrl == "" {
294					return status.Errorf(codes.InvalidArgument, "type URL is required for ADS")
295				}
296			} else if req.TypeUrl == "" {
297				req.TypeUrl = defaultTypeURL
298			}
299
300			if s.callbacks != nil {
301				if err := s.callbacks.OnStreamRequest(streamID, req); err != nil {
302					return err
303				}
304			}
305
306			// cancel existing watches to (re-)request a newer version
307			switch {
308			case req.TypeUrl == resource.EndpointType:
309				if values.endpointNonce == "" || values.endpointNonce == nonce {
310					if values.endpointCancel != nil {
311						values.endpointCancel()
312					}
313					values.endpoints, values.endpointCancel = s.cache.CreateWatch(req)
314				}
315			case req.TypeUrl == resource.ClusterType:
316				if values.clusterNonce == "" || values.clusterNonce == nonce {
317					if values.clusterCancel != nil {
318						values.clusterCancel()
319					}
320					values.clusters, values.clusterCancel = s.cache.CreateWatch(req)
321				}
322			case req.TypeUrl == resource.RouteType:
323				if values.routeNonce == "" || values.routeNonce == nonce {
324					if values.routeCancel != nil {
325						values.routeCancel()
326					}
327					values.routes, values.routeCancel = s.cache.CreateWatch(req)
328				}
329			case req.TypeUrl == resource.ListenerType:
330				if values.listenerNonce == "" || values.listenerNonce == nonce {
331					if values.listenerCancel != nil {
332						values.listenerCancel()
333					}
334					values.listeners, values.listenerCancel = s.cache.CreateWatch(req)
335				}
336			case req.TypeUrl == resource.SecretType:
337				if values.secretNonce == "" || values.secretNonce == nonce {
338					if values.secretCancel != nil {
339						values.secretCancel()
340					}
341					values.secrets, values.secretCancel = s.cache.CreateWatch(req)
342				}
343			case req.TypeUrl == resource.RuntimeType:
344				if values.runtimeNonce == "" || values.runtimeNonce == nonce {
345					if values.runtimeCancel != nil {
346						values.runtimeCancel()
347					}
348					values.runtimes, values.runtimeCancel = s.cache.CreateWatch(req)
349				}
350			default:
351				typeUrl := req.TypeUrl
352				responseNonce, seen := values.nonces[typeUrl]
353				if !seen || responseNonce == nonce {
354					// We must signal goroutine termination to prevent a race between the cancel closing the watch
355					// and the producer closing the watch.
356					if terminate, exists := values.terminations[typeUrl]; exists {
357						close(terminate)
358					}
359					if cancel, seen := values.cancellations[typeUrl]; seen && cancel != nil {
360						cancel()
361					}
362					var watch chan cache.Response
363					watch, values.cancellations[typeUrl] = s.cache.CreateWatch(req)
364					// Muxing watches across multiple type URLs onto a single channel requires spawning
365					// a go-routine. Golang does not allow selecting over a dynamic set of channels.
366					terminate := make(chan struct{})
367					values.terminations[typeUrl] = terminate
368					go func() {
369						select {
370						case resp, more := <-watch:
371							if more {
372								values.responses <- resp
373							} else {
374								// Check again if the watch is cancelled.
375								select {
376								case <-terminate: // do nothing
377								default:
378									// We cannot close the responses channel since it can be closed twice.
379									// Instead we send a fake error response.
380									values.responses <- errorResponse
381								}
382							}
383							break
384						case <-terminate:
385							break
386						}
387					}()
388				}
389			}
390		}
391	}
392}
393
394// StreamHandler converts a blocking read call to channels and initiates stream processing
395func (s *server) StreamHandler(stream Stream, typeURL string) error {
396	// a channel for receiving incoming requests
397	reqCh := make(chan *discovery.DiscoveryRequest)
398	reqStop := int32(0)
399	go func() {
400		for {
401			req, err := stream.Recv()
402			if atomic.LoadInt32(&reqStop) != 0 {
403				return
404			}
405			if err != nil {
406				close(reqCh)
407				return
408			}
409			select {
410			case reqCh <- req:
411			case <-s.ctx.Done():
412				return
413			}
414		}
415	}()
416
417	err := s.process(stream, reqCh, typeURL)
418
419	// prevents writing to a closed channel if send failed on blocked recv
420	// TODO(kuat) figure out how to unblock recv through gRPC API
421	atomic.StoreInt32(&reqStop, 1)
422
423	return err
424}
425