1/*
2 *
3 * Copyright 2020 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package client
20
21type watcherInfoWithUpdate struct {
22	wi     *watchInfo
23	update interface{}
24	err    error
25}
26
27// scheduleCallback should only be called by methods of watchInfo, which checks
28// for watcher states and maintain consistency.
29func (c *clientImpl) scheduleCallback(wi *watchInfo, update interface{}, err error) {
30	c.updateCh.Put(&watcherInfoWithUpdate{
31		wi:     wi,
32		update: update,
33		err:    err,
34	})
35}
36
37func (c *clientImpl) callCallback(wiu *watcherInfoWithUpdate) {
38	c.mu.Lock()
39	// Use a closure to capture the callback and type assertion, to save one
40	// more switch case.
41	//
42	// The callback must be called without c.mu. Otherwise if the callback calls
43	// another watch() inline, it will cause a deadlock. This leaves a small
44	// window that a watcher's callback could be called after the watcher is
45	// canceled, and the user needs to take care of it.
46	var ccb func()
47	switch wiu.wi.rType {
48	case ListenerResource:
49		if s, ok := c.ldsWatchers[wiu.wi.target]; ok && s[wiu.wi] {
50			ccb = func() { wiu.wi.ldsCallback(wiu.update.(ListenerUpdate), wiu.err) }
51		}
52	case RouteConfigResource:
53		if s, ok := c.rdsWatchers[wiu.wi.target]; ok && s[wiu.wi] {
54			ccb = func() { wiu.wi.rdsCallback(wiu.update.(RouteConfigUpdate), wiu.err) }
55		}
56	case ClusterResource:
57		if s, ok := c.cdsWatchers[wiu.wi.target]; ok && s[wiu.wi] {
58			ccb = func() { wiu.wi.cdsCallback(wiu.update.(ClusterUpdate), wiu.err) }
59		}
60	case EndpointsResource:
61		if s, ok := c.edsWatchers[wiu.wi.target]; ok && s[wiu.wi] {
62			ccb = func() { wiu.wi.edsCallback(wiu.update.(EndpointsUpdate), wiu.err) }
63		}
64	}
65	c.mu.Unlock()
66
67	if ccb != nil {
68		ccb()
69	}
70}
71
72// NewListeners is called by the underlying xdsAPIClient when it receives an
73// xDS response.
74//
75// A response can contain multiple resources. They will be parsed and put in a
76// map from resource name to the resource content.
77func (c *clientImpl) NewListeners(updates map[string]ListenerUpdate, metadata UpdateMetadata) {
78	c.mu.Lock()
79	defer c.mu.Unlock()
80
81	if metadata.ErrState != nil {
82		// On NACK, update overall version to the NACKed resp.
83		c.ldsVersion = metadata.ErrState.Version
84		for name := range updates {
85			if _, ok := c.ldsWatchers[name]; ok {
86				// On error, keep previous version for each resource. But update
87				// status and error.
88				mdCopy := c.ldsMD[name]
89				mdCopy.ErrState = metadata.ErrState
90				mdCopy.Status = metadata.Status
91				c.ldsMD[name] = mdCopy
92				// TODO: send the NACK error to the watcher.
93			}
94		}
95		return
96	}
97
98	// If no error received, the status is ACK.
99	c.ldsVersion = metadata.Version
100	for name, update := range updates {
101		if s, ok := c.ldsWatchers[name]; ok {
102			// Only send the update if this is not an error.
103			for wi := range s {
104				wi.newUpdate(update)
105			}
106			// Sync cache.
107			c.logger.Debugf("LDS resource with name %v, value %+v added to cache", name, update)
108			c.ldsCache[name] = update
109			c.ldsMD[name] = metadata
110		}
111	}
112	// Resources not in the new update were removed by the server, so delete
113	// them.
114	for name := range c.ldsCache {
115		if _, ok := updates[name]; !ok {
116			// If resource exists in cache, but not in the new update, delete
117			// the resource from cache, and also send an resource not found
118			// error to indicate resource removed.
119			delete(c.ldsCache, name)
120			c.ldsMD[name] = UpdateMetadata{Status: ServiceStatusNotExist}
121			for wi := range c.ldsWatchers[name] {
122				wi.resourceNotFound()
123			}
124		}
125	}
126	// When LDS resource is removed, we don't delete corresponding RDS cached
127	// data. The RDS watch will be canceled, and cache entry is removed when the
128	// last watch is canceled.
129}
130
131// NewRouteConfigs is called by the underlying xdsAPIClient when it receives an
132// xDS response.
133//
134// A response can contain multiple resources. They will be parsed and put in a
135// map from resource name to the resource content.
136func (c *clientImpl) NewRouteConfigs(updates map[string]RouteConfigUpdate, metadata UpdateMetadata) {
137	c.mu.Lock()
138	defer c.mu.Unlock()
139
140	if metadata.ErrState != nil {
141		// On NACK, update overall version to the NACKed resp.
142		c.rdsVersion = metadata.ErrState.Version
143		for name := range updates {
144			if _, ok := c.rdsWatchers[name]; ok {
145				// On error, keep previous version for each resource. But update
146				// status and error.
147				mdCopy := c.rdsMD[name]
148				mdCopy.ErrState = metadata.ErrState
149				mdCopy.Status = metadata.Status
150				c.rdsMD[name] = mdCopy
151				// TODO: send the NACK error to the watcher.
152			}
153		}
154		return
155	}
156
157	// If no error received, the status is ACK.
158	c.rdsVersion = metadata.Version
159	for name, update := range updates {
160		if s, ok := c.rdsWatchers[name]; ok {
161			// Only send the update if this is not an error.
162			for wi := range s {
163				wi.newUpdate(update)
164			}
165			// Sync cache.
166			c.logger.Debugf("RDS resource with name %v, value %+v added to cache", name, update)
167			c.rdsCache[name] = update
168			c.rdsMD[name] = metadata
169		}
170	}
171}
172
173// NewClusters is called by the underlying xdsAPIClient when it receives an xDS
174// response.
175//
176// A response can contain multiple resources. They will be parsed and put in a
177// map from resource name to the resource content.
178func (c *clientImpl) NewClusters(updates map[string]ClusterUpdate, metadata UpdateMetadata) {
179	c.mu.Lock()
180	defer c.mu.Unlock()
181
182	if metadata.ErrState != nil {
183		// On NACK, update overall version to the NACKed resp.
184		c.cdsVersion = metadata.ErrState.Version
185		for name := range updates {
186			if _, ok := c.cdsWatchers[name]; ok {
187				// On error, keep previous version for each resource. But update
188				// status and error.
189				mdCopy := c.cdsMD[name]
190				mdCopy.ErrState = metadata.ErrState
191				mdCopy.Status = metadata.Status
192				c.cdsMD[name] = mdCopy
193				// TODO: send the NACK error to the watcher.
194			}
195		}
196		return
197	}
198
199	// If no error received, the status is ACK.
200	c.cdsVersion = metadata.Version
201	for name, update := range updates {
202		if s, ok := c.cdsWatchers[name]; ok {
203			// Only send the update if this is not an error.
204			for wi := range s {
205				wi.newUpdate(update)
206			}
207			// Sync cache.
208			c.logger.Debugf("CDS resource with name %v, value %+v added to cache", name, update)
209			c.cdsCache[name] = update
210			c.cdsMD[name] = metadata
211		}
212	}
213	// Resources not in the new update were removed by the server, so delete
214	// them.
215	for name := range c.cdsCache {
216		if _, ok := updates[name]; !ok {
217			// If resource exists in cache, but not in the new update, delete it
218			// from cache, and also send an resource not found error to indicate
219			// resource removed.
220			delete(c.cdsCache, name)
221			c.ldsMD[name] = UpdateMetadata{Status: ServiceStatusNotExist}
222			for wi := range c.cdsWatchers[name] {
223				wi.resourceNotFound()
224			}
225		}
226	}
227	// When CDS resource is removed, we don't delete corresponding EDS cached
228	// data. The EDS watch will be canceled, and cache entry is removed when the
229	// last watch is canceled.
230}
231
232// NewEndpoints is called by the underlying xdsAPIClient when it receives an
233// xDS response.
234//
235// A response can contain multiple resources. They will be parsed and put in a
236// map from resource name to the resource content.
237func (c *clientImpl) NewEndpoints(updates map[string]EndpointsUpdate, metadata UpdateMetadata) {
238	c.mu.Lock()
239	defer c.mu.Unlock()
240
241	if metadata.ErrState != nil {
242		// On NACK, update overall version to the NACKed resp.
243		c.edsVersion = metadata.ErrState.Version
244		for name := range updates {
245			if _, ok := c.edsWatchers[name]; ok {
246				// On error, keep previous version for each resource. But update
247				// status and error.
248				mdCopy := c.edsMD[name]
249				mdCopy.ErrState = metadata.ErrState
250				mdCopy.Status = metadata.Status
251				c.edsMD[name] = mdCopy
252				// TODO: send the NACK error to the watcher.
253			}
254		}
255		return
256	}
257
258	// If no error received, the status is ACK.
259	c.edsVersion = metadata.Version
260	for name, update := range updates {
261		if s, ok := c.edsWatchers[name]; ok {
262			// Only send the update if this is not an error.
263			for wi := range s {
264				wi.newUpdate(update)
265			}
266			// Sync cache.
267			c.logger.Debugf("EDS resource with name %v, value %+v added to cache", name, update)
268			c.edsCache[name] = update
269			c.edsMD[name] = metadata
270		}
271	}
272}
273