1/*
2 *
3 * Copyright 2020 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package xdsclient
20
21import "google.golang.org/grpc/internal/pretty"
22
23type watcherInfoWithUpdate struct {
24	wi     *watchInfo
25	update interface{}
26	err    error
27}
28
29// scheduleCallback should only be called by methods of watchInfo, which checks
30// for watcher states and maintain consistency.
31func (c *clientImpl) scheduleCallback(wi *watchInfo, update interface{}, err error) {
32	c.updateCh.Put(&watcherInfoWithUpdate{
33		wi:     wi,
34		update: update,
35		err:    err,
36	})
37}
38
39func (c *clientImpl) callCallback(wiu *watcherInfoWithUpdate) {
40	c.mu.Lock()
41	// Use a closure to capture the callback and type assertion, to save one
42	// more switch case.
43	//
44	// The callback must be called without c.mu. Otherwise if the callback calls
45	// another watch() inline, it will cause a deadlock. This leaves a small
46	// window that a watcher's callback could be called after the watcher is
47	// canceled, and the user needs to take care of it.
48	var ccb func()
49	switch wiu.wi.rType {
50	case ListenerResource:
51		if s, ok := c.ldsWatchers[wiu.wi.target]; ok && s[wiu.wi] {
52			ccb = func() { wiu.wi.ldsCallback(wiu.update.(ListenerUpdate), wiu.err) }
53		}
54	case RouteConfigResource:
55		if s, ok := c.rdsWatchers[wiu.wi.target]; ok && s[wiu.wi] {
56			ccb = func() { wiu.wi.rdsCallback(wiu.update.(RouteConfigUpdate), wiu.err) }
57		}
58	case ClusterResource:
59		if s, ok := c.cdsWatchers[wiu.wi.target]; ok && s[wiu.wi] {
60			ccb = func() { wiu.wi.cdsCallback(wiu.update.(ClusterUpdate), wiu.err) }
61		}
62	case EndpointsResource:
63		if s, ok := c.edsWatchers[wiu.wi.target]; ok && s[wiu.wi] {
64			ccb = func() { wiu.wi.edsCallback(wiu.update.(EndpointsUpdate), wiu.err) }
65		}
66	}
67	c.mu.Unlock()
68
69	if ccb != nil {
70		ccb()
71	}
72}
73
74// NewListeners is called by the underlying xdsAPIClient when it receives an
75// xDS response.
76//
77// A response can contain multiple resources. They will be parsed and put in a
78// map from resource name to the resource content.
79func (c *clientImpl) NewListeners(updates map[string]ListenerUpdate, metadata UpdateMetadata) {
80	c.mu.Lock()
81	defer c.mu.Unlock()
82
83	if metadata.ErrState != nil {
84		// On NACK, update overall version to the NACKed resp.
85		c.ldsVersion = metadata.ErrState.Version
86		for name := range updates {
87			if s, ok := c.ldsWatchers[name]; ok {
88				// On error, keep previous version for each resource. But update
89				// status and error.
90				mdCopy := c.ldsMD[name]
91				mdCopy.ErrState = metadata.ErrState
92				mdCopy.Status = metadata.Status
93				c.ldsMD[name] = mdCopy
94				for wi := range s {
95					wi.newError(metadata.ErrState.Err)
96				}
97			}
98		}
99		return
100	}
101
102	// If no error received, the status is ACK.
103	c.ldsVersion = metadata.Version
104	for name, update := range updates {
105		if s, ok := c.ldsWatchers[name]; ok {
106			// Only send the update if this is not an error.
107			for wi := range s {
108				wi.newUpdate(update)
109			}
110			// Sync cache.
111			c.logger.Debugf("LDS resource with name %v, value %+v added to cache", name, pretty.ToJSON(update))
112			c.ldsCache[name] = update
113			c.ldsMD[name] = metadata
114		}
115	}
116	// Resources not in the new update were removed by the server, so delete
117	// them.
118	for name := range c.ldsCache {
119		if _, ok := updates[name]; !ok {
120			// If resource exists in cache, but not in the new update, delete
121			// the resource from cache, and also send an resource not found
122			// error to indicate resource removed.
123			delete(c.ldsCache, name)
124			c.ldsMD[name] = UpdateMetadata{Status: ServiceStatusNotExist}
125			for wi := range c.ldsWatchers[name] {
126				wi.resourceNotFound()
127			}
128		}
129	}
130	// When LDS resource is removed, we don't delete corresponding RDS cached
131	// data. The RDS watch will be canceled, and cache entry is removed when the
132	// last watch is canceled.
133}
134
135// NewRouteConfigs is called by the underlying xdsAPIClient when it receives an
136// xDS response.
137//
138// A response can contain multiple resources. They will be parsed and put in a
139// map from resource name to the resource content.
140func (c *clientImpl) NewRouteConfigs(updates map[string]RouteConfigUpdate, metadata UpdateMetadata) {
141	c.mu.Lock()
142	defer c.mu.Unlock()
143
144	if metadata.ErrState != nil {
145		// On NACK, update overall version to the NACKed resp.
146		c.rdsVersion = metadata.ErrState.Version
147		for name := range updates {
148			if s, ok := c.rdsWatchers[name]; ok {
149				// On error, keep previous version for each resource. But update
150				// status and error.
151				mdCopy := c.rdsMD[name]
152				mdCopy.ErrState = metadata.ErrState
153				mdCopy.Status = metadata.Status
154				c.rdsMD[name] = mdCopy
155				for wi := range s {
156					wi.newError(metadata.ErrState.Err)
157				}
158			}
159		}
160		return
161	}
162
163	// If no error received, the status is ACK.
164	c.rdsVersion = metadata.Version
165	for name, update := range updates {
166		if s, ok := c.rdsWatchers[name]; ok {
167			// Only send the update if this is not an error.
168			for wi := range s {
169				wi.newUpdate(update)
170			}
171			// Sync cache.
172			c.logger.Debugf("RDS resource with name %v, value %+v added to cache", name, pretty.ToJSON(update))
173			c.rdsCache[name] = update
174			c.rdsMD[name] = metadata
175		}
176	}
177}
178
179// NewClusters is called by the underlying xdsAPIClient when it receives an xDS
180// response.
181//
182// A response can contain multiple resources. They will be parsed and put in a
183// map from resource name to the resource content.
184func (c *clientImpl) NewClusters(updates map[string]ClusterUpdate, metadata UpdateMetadata) {
185	c.mu.Lock()
186	defer c.mu.Unlock()
187
188	if metadata.ErrState != nil {
189		// On NACK, update overall version to the NACKed resp.
190		c.cdsVersion = metadata.ErrState.Version
191		for name := range updates {
192			if s, ok := c.cdsWatchers[name]; ok {
193				// On error, keep previous version for each resource. But update
194				// status and error.
195				mdCopy := c.cdsMD[name]
196				mdCopy.ErrState = metadata.ErrState
197				mdCopy.Status = metadata.Status
198				c.cdsMD[name] = mdCopy
199				for wi := range s {
200					wi.newError(metadata.ErrState.Err)
201				}
202			}
203		}
204		return
205	}
206
207	// If no error received, the status is ACK.
208	c.cdsVersion = metadata.Version
209	for name, update := range updates {
210		if s, ok := c.cdsWatchers[name]; ok {
211			// Only send the update if this is not an error.
212			for wi := range s {
213				wi.newUpdate(update)
214			}
215			// Sync cache.
216			c.logger.Debugf("CDS resource with name %v, value %+v added to cache", name, pretty.ToJSON(update))
217			c.cdsCache[name] = update
218			c.cdsMD[name] = metadata
219		}
220	}
221	// Resources not in the new update were removed by the server, so delete
222	// them.
223	for name := range c.cdsCache {
224		if _, ok := updates[name]; !ok {
225			// If resource exists in cache, but not in the new update, delete it
226			// from cache, and also send an resource not found error to indicate
227			// resource removed.
228			delete(c.cdsCache, name)
229			c.ldsMD[name] = UpdateMetadata{Status: ServiceStatusNotExist}
230			for wi := range c.cdsWatchers[name] {
231				wi.resourceNotFound()
232			}
233		}
234	}
235	// When CDS resource is removed, we don't delete corresponding EDS cached
236	// data. The EDS watch will be canceled, and cache entry is removed when the
237	// last watch is canceled.
238}
239
240// NewEndpoints is called by the underlying xdsAPIClient when it receives an
241// xDS response.
242//
243// A response can contain multiple resources. They will be parsed and put in a
244// map from resource name to the resource content.
245func (c *clientImpl) NewEndpoints(updates map[string]EndpointsUpdate, metadata UpdateMetadata) {
246	c.mu.Lock()
247	defer c.mu.Unlock()
248
249	if metadata.ErrState != nil {
250		// On NACK, update overall version to the NACKed resp.
251		c.edsVersion = metadata.ErrState.Version
252		for name := range updates {
253			if s, ok := c.edsWatchers[name]; ok {
254				// On error, keep previous version for each resource. But update
255				// status and error.
256				mdCopy := c.edsMD[name]
257				mdCopy.ErrState = metadata.ErrState
258				mdCopy.Status = metadata.Status
259				c.edsMD[name] = mdCopy
260				for wi := range s {
261					wi.newError(metadata.ErrState.Err)
262				}
263			}
264		}
265		return
266	}
267
268	// If no error received, the status is ACK.
269	c.edsVersion = metadata.Version
270	for name, update := range updates {
271		if s, ok := c.edsWatchers[name]; ok {
272			// Only send the update if this is not an error.
273			for wi := range s {
274				wi.newUpdate(update)
275			}
276			// Sync cache.
277			c.logger.Debugf("EDS resource with name %v, value %+v added to cache", name, pretty.ToJSON(update))
278			c.edsCache[name] = update
279			c.edsMD[name] = metadata
280		}
281	}
282}
283
284// NewConnectionError is called by the underlying xdsAPIClient when it receives
285// a connection error. The error will be forwarded to all the resource watchers.
286func (c *clientImpl) NewConnectionError(err error) {
287	c.mu.Lock()
288	defer c.mu.Unlock()
289
290	for _, s := range c.edsWatchers {
291		for wi := range s {
292			wi.newError(NewErrorf(ErrorTypeConnection, "xds: error received from xDS stream: %v", err))
293		}
294	}
295}
296