1/*
2 *
3 * Copyright 2020 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package client
20
21type watcherInfoWithUpdate struct {
22	wi     *watchInfo
23	update interface{}
24	err    error
25}
26
27// scheduleCallback should only be called by methods of watchInfo, which checks
28// for watcher states and maintain consistency.
29func (c *clientImpl) scheduleCallback(wi *watchInfo, update interface{}, err error) {
30	c.updateCh.Put(&watcherInfoWithUpdate{
31		wi:     wi,
32		update: update,
33		err:    err,
34	})
35}
36
37func (c *clientImpl) callCallback(wiu *watcherInfoWithUpdate) {
38	c.mu.Lock()
39	// Use a closure to capture the callback and type assertion, to save one
40	// more switch case.
41	//
42	// The callback must be called without c.mu. Otherwise if the callback calls
43	// another watch() inline, it will cause a deadlock. This leaves a small
44	// window that a watcher's callback could be called after the watcher is
45	// canceled, and the user needs to take care of it.
46	var ccb func()
47	switch wiu.wi.rType {
48	case ListenerResource:
49		if s, ok := c.ldsWatchers[wiu.wi.target]; ok && s[wiu.wi] {
50			ccb = func() { wiu.wi.ldsCallback(wiu.update.(ListenerUpdate), wiu.err) }
51		}
52	case RouteConfigResource:
53		if s, ok := c.rdsWatchers[wiu.wi.target]; ok && s[wiu.wi] {
54			ccb = func() { wiu.wi.rdsCallback(wiu.update.(RouteConfigUpdate), wiu.err) }
55		}
56	case ClusterResource:
57		if s, ok := c.cdsWatchers[wiu.wi.target]; ok && s[wiu.wi] {
58			ccb = func() { wiu.wi.cdsCallback(wiu.update.(ClusterUpdate), wiu.err) }
59		}
60	case EndpointsResource:
61		if s, ok := c.edsWatchers[wiu.wi.target]; ok && s[wiu.wi] {
62			ccb = func() { wiu.wi.edsCallback(wiu.update.(EndpointsUpdate), wiu.err) }
63		}
64	}
65	c.mu.Unlock()
66
67	if ccb != nil {
68		ccb()
69	}
70}
71
72// NewListeners is called by the underlying xdsAPIClient when it receives an
73// xDS response.
74//
75// A response can contain multiple resources. They will be parsed and put in a
76// map from resource name to the resource content.
77func (c *clientImpl) NewListeners(updates map[string]ListenerUpdate) {
78	c.mu.Lock()
79	defer c.mu.Unlock()
80
81	for name, update := range updates {
82		if s, ok := c.ldsWatchers[name]; ok {
83			for wi := range s {
84				wi.newUpdate(update)
85			}
86			// Sync cache.
87			c.logger.Debugf("LDS resource with name %v, value %+v added to cache", name, update)
88			c.ldsCache[name] = update
89		}
90	}
91	for name := range c.ldsCache {
92		if _, ok := updates[name]; !ok {
93			// If resource exists in cache, but not in the new update, delete it
94			// from cache, and also send an resource not found error to indicate
95			// resource removed.
96			delete(c.ldsCache, name)
97			for wi := range c.ldsWatchers[name] {
98				wi.resourceNotFound()
99			}
100		}
101	}
102	// When LDS resource is removed, we don't delete corresponding RDS cached
103	// data. The RDS watch will be canceled, and cache entry is removed when the
104	// last watch is canceled.
105}
106
107// NewRouteConfigs is called by the underlying xdsAPIClient when it receives an
108// xDS response.
109//
110// A response can contain multiple resources. They will be parsed and put in a
111// map from resource name to the resource content.
112func (c *clientImpl) NewRouteConfigs(updates map[string]RouteConfigUpdate) {
113	c.mu.Lock()
114	defer c.mu.Unlock()
115
116	for name, update := range updates {
117		if s, ok := c.rdsWatchers[name]; ok {
118			for wi := range s {
119				wi.newUpdate(update)
120			}
121			// Sync cache.
122			c.logger.Debugf("RDS resource with name %v, value %+v added to cache", name, update)
123			c.rdsCache[name] = update
124		}
125	}
126}
127
128// NewClusters is called by the underlying xdsAPIClient when it receives an xDS
129// response.
130//
131// A response can contain multiple resources. They will be parsed and put in a
132// map from resource name to the resource content.
133func (c *clientImpl) NewClusters(updates map[string]ClusterUpdate) {
134	c.mu.Lock()
135	defer c.mu.Unlock()
136
137	for name, update := range updates {
138		if s, ok := c.cdsWatchers[name]; ok {
139			for wi := range s {
140				wi.newUpdate(update)
141			}
142			// Sync cache.
143			c.logger.Debugf("CDS resource with name %v, value %+v added to cache", name, update)
144			c.cdsCache[name] = update
145		}
146	}
147	for name := range c.cdsCache {
148		if _, ok := updates[name]; !ok {
149			// If resource exists in cache, but not in the new update, delete it
150			// from cache, and also send an resource not found error to indicate
151			// resource removed.
152			delete(c.cdsCache, name)
153			for wi := range c.cdsWatchers[name] {
154				wi.resourceNotFound()
155			}
156		}
157	}
158	// When CDS resource is removed, we don't delete corresponding EDS cached
159	// data. The EDS watch will be canceled, and cache entry is removed when the
160	// last watch is canceled.
161}
162
163// NewEndpoints is called by the underlying xdsAPIClient when it receives an
164// xDS response.
165//
166// A response can contain multiple resources. They will be parsed and put in a
167// map from resource name to the resource content.
168func (c *clientImpl) NewEndpoints(updates map[string]EndpointsUpdate) {
169	c.mu.Lock()
170	defer c.mu.Unlock()
171
172	for name, update := range updates {
173		if s, ok := c.edsWatchers[name]; ok {
174			for wi := range s {
175				wi.newUpdate(update)
176			}
177			// Sync cache.
178			c.logger.Debugf("EDS resource with name %v, value %+v added to cache", name, update)
179			c.edsCache[name] = update
180		}
181	}
182}
183