1/* 2 * 3 * Copyright 2020 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package xdsclient 20 21import "google.golang.org/grpc/internal/pretty" 22 23type watcherInfoWithUpdate struct { 24 wi *watchInfo 25 update interface{} 26 err error 27} 28 29// scheduleCallback should only be called by methods of watchInfo, which checks 30// for watcher states and maintain consistency. 31func (c *clientImpl) scheduleCallback(wi *watchInfo, update interface{}, err error) { 32 c.updateCh.Put(&watcherInfoWithUpdate{ 33 wi: wi, 34 update: update, 35 err: err, 36 }) 37} 38 39func (c *clientImpl) callCallback(wiu *watcherInfoWithUpdate) { 40 c.mu.Lock() 41 // Use a closure to capture the callback and type assertion, to save one 42 // more switch case. 43 // 44 // The callback must be called without c.mu. Otherwise if the callback calls 45 // another watch() inline, it will cause a deadlock. This leaves a small 46 // window that a watcher's callback could be called after the watcher is 47 // canceled, and the user needs to take care of it. 48 var ccb func() 49 switch wiu.wi.rType { 50 case ListenerResource: 51 if s, ok := c.ldsWatchers[wiu.wi.target]; ok && s[wiu.wi] { 52 ccb = func() { wiu.wi.ldsCallback(wiu.update.(ListenerUpdate), wiu.err) } 53 } 54 case RouteConfigResource: 55 if s, ok := c.rdsWatchers[wiu.wi.target]; ok && s[wiu.wi] { 56 ccb = func() { wiu.wi.rdsCallback(wiu.update.(RouteConfigUpdate), wiu.err) } 57 } 58 case ClusterResource: 59 if s, ok := c.cdsWatchers[wiu.wi.target]; ok && s[wiu.wi] { 60 ccb = func() { wiu.wi.cdsCallback(wiu.update.(ClusterUpdate), wiu.err) } 61 } 62 case EndpointsResource: 63 if s, ok := c.edsWatchers[wiu.wi.target]; ok && s[wiu.wi] { 64 ccb = func() { wiu.wi.edsCallback(wiu.update.(EndpointsUpdate), wiu.err) } 65 } 66 } 67 c.mu.Unlock() 68 69 if ccb != nil { 70 ccb() 71 } 72} 73 74// NewListeners is called by the underlying xdsAPIClient when it receives an 75// xDS response. 76// 77// A response can contain multiple resources. They will be parsed and put in a 78// map from resource name to the resource content. 79func (c *clientImpl) NewListeners(updates map[string]ListenerUpdate, metadata UpdateMetadata) { 80 c.mu.Lock() 81 defer c.mu.Unlock() 82 83 if metadata.ErrState != nil { 84 // On NACK, update overall version to the NACKed resp. 85 c.ldsVersion = metadata.ErrState.Version 86 for name := range updates { 87 if s, ok := c.ldsWatchers[name]; ok { 88 // On error, keep previous version for each resource. But update 89 // status and error. 90 mdCopy := c.ldsMD[name] 91 mdCopy.ErrState = metadata.ErrState 92 mdCopy.Status = metadata.Status 93 c.ldsMD[name] = mdCopy 94 for wi := range s { 95 wi.newError(metadata.ErrState.Err) 96 } 97 } 98 } 99 return 100 } 101 102 // If no error received, the status is ACK. 103 c.ldsVersion = metadata.Version 104 for name, update := range updates { 105 if s, ok := c.ldsWatchers[name]; ok { 106 // Only send the update if this is not an error. 107 for wi := range s { 108 wi.newUpdate(update) 109 } 110 // Sync cache. 111 c.logger.Debugf("LDS resource with name %v, value %+v added to cache", name, pretty.ToJSON(update)) 112 c.ldsCache[name] = update 113 c.ldsMD[name] = metadata 114 } 115 } 116 // Resources not in the new update were removed by the server, so delete 117 // them. 118 for name := range c.ldsCache { 119 if _, ok := updates[name]; !ok { 120 // If resource exists in cache, but not in the new update, delete 121 // the resource from cache, and also send an resource not found 122 // error to indicate resource removed. 123 delete(c.ldsCache, name) 124 c.ldsMD[name] = UpdateMetadata{Status: ServiceStatusNotExist} 125 for wi := range c.ldsWatchers[name] { 126 wi.resourceNotFound() 127 } 128 } 129 } 130 // When LDS resource is removed, we don't delete corresponding RDS cached 131 // data. The RDS watch will be canceled, and cache entry is removed when the 132 // last watch is canceled. 133} 134 135// NewRouteConfigs is called by the underlying xdsAPIClient when it receives an 136// xDS response. 137// 138// A response can contain multiple resources. They will be parsed and put in a 139// map from resource name to the resource content. 140func (c *clientImpl) NewRouteConfigs(updates map[string]RouteConfigUpdate, metadata UpdateMetadata) { 141 c.mu.Lock() 142 defer c.mu.Unlock() 143 144 if metadata.ErrState != nil { 145 // On NACK, update overall version to the NACKed resp. 146 c.rdsVersion = metadata.ErrState.Version 147 for name := range updates { 148 if s, ok := c.rdsWatchers[name]; ok { 149 // On error, keep previous version for each resource. But update 150 // status and error. 151 mdCopy := c.rdsMD[name] 152 mdCopy.ErrState = metadata.ErrState 153 mdCopy.Status = metadata.Status 154 c.rdsMD[name] = mdCopy 155 for wi := range s { 156 wi.newError(metadata.ErrState.Err) 157 } 158 } 159 } 160 return 161 } 162 163 // If no error received, the status is ACK. 164 c.rdsVersion = metadata.Version 165 for name, update := range updates { 166 if s, ok := c.rdsWatchers[name]; ok { 167 // Only send the update if this is not an error. 168 for wi := range s { 169 wi.newUpdate(update) 170 } 171 // Sync cache. 172 c.logger.Debugf("RDS resource with name %v, value %+v added to cache", name, pretty.ToJSON(update)) 173 c.rdsCache[name] = update 174 c.rdsMD[name] = metadata 175 } 176 } 177} 178 179// NewClusters is called by the underlying xdsAPIClient when it receives an xDS 180// response. 181// 182// A response can contain multiple resources. They will be parsed and put in a 183// map from resource name to the resource content. 184func (c *clientImpl) NewClusters(updates map[string]ClusterUpdate, metadata UpdateMetadata) { 185 c.mu.Lock() 186 defer c.mu.Unlock() 187 188 if metadata.ErrState != nil { 189 // On NACK, update overall version to the NACKed resp. 190 c.cdsVersion = metadata.ErrState.Version 191 for name := range updates { 192 if s, ok := c.cdsWatchers[name]; ok { 193 // On error, keep previous version for each resource. But update 194 // status and error. 195 mdCopy := c.cdsMD[name] 196 mdCopy.ErrState = metadata.ErrState 197 mdCopy.Status = metadata.Status 198 c.cdsMD[name] = mdCopy 199 for wi := range s { 200 wi.newError(metadata.ErrState.Err) 201 } 202 } 203 } 204 return 205 } 206 207 // If no error received, the status is ACK. 208 c.cdsVersion = metadata.Version 209 for name, update := range updates { 210 if s, ok := c.cdsWatchers[name]; ok { 211 // Only send the update if this is not an error. 212 for wi := range s { 213 wi.newUpdate(update) 214 } 215 // Sync cache. 216 c.logger.Debugf("CDS resource with name %v, value %+v added to cache", name, pretty.ToJSON(update)) 217 c.cdsCache[name] = update 218 c.cdsMD[name] = metadata 219 } 220 } 221 // Resources not in the new update were removed by the server, so delete 222 // them. 223 for name := range c.cdsCache { 224 if _, ok := updates[name]; !ok { 225 // If resource exists in cache, but not in the new update, delete it 226 // from cache, and also send an resource not found error to indicate 227 // resource removed. 228 delete(c.cdsCache, name) 229 c.ldsMD[name] = UpdateMetadata{Status: ServiceStatusNotExist} 230 for wi := range c.cdsWatchers[name] { 231 wi.resourceNotFound() 232 } 233 } 234 } 235 // When CDS resource is removed, we don't delete corresponding EDS cached 236 // data. The EDS watch will be canceled, and cache entry is removed when the 237 // last watch is canceled. 238} 239 240// NewEndpoints is called by the underlying xdsAPIClient when it receives an 241// xDS response. 242// 243// A response can contain multiple resources. They will be parsed and put in a 244// map from resource name to the resource content. 245func (c *clientImpl) NewEndpoints(updates map[string]EndpointsUpdate, metadata UpdateMetadata) { 246 c.mu.Lock() 247 defer c.mu.Unlock() 248 249 if metadata.ErrState != nil { 250 // On NACK, update overall version to the NACKed resp. 251 c.edsVersion = metadata.ErrState.Version 252 for name := range updates { 253 if s, ok := c.edsWatchers[name]; ok { 254 // On error, keep previous version for each resource. But update 255 // status and error. 256 mdCopy := c.edsMD[name] 257 mdCopy.ErrState = metadata.ErrState 258 mdCopy.Status = metadata.Status 259 c.edsMD[name] = mdCopy 260 for wi := range s { 261 wi.newError(metadata.ErrState.Err) 262 } 263 } 264 } 265 return 266 } 267 268 // If no error received, the status is ACK. 269 c.edsVersion = metadata.Version 270 for name, update := range updates { 271 if s, ok := c.edsWatchers[name]; ok { 272 // Only send the update if this is not an error. 273 for wi := range s { 274 wi.newUpdate(update) 275 } 276 // Sync cache. 277 c.logger.Debugf("EDS resource with name %v, value %+v added to cache", name, pretty.ToJSON(update)) 278 c.edsCache[name] = update 279 c.edsMD[name] = metadata 280 } 281 } 282} 283 284// NewConnectionError is called by the underlying xdsAPIClient when it receives 285// a connection error. The error will be forwarded to all the resource watchers. 286func (c *clientImpl) NewConnectionError(err error) { 287 c.mu.Lock() 288 defer c.mu.Unlock() 289 290 for _, s := range c.edsWatchers { 291 for wi := range s { 292 wi.newError(NewErrorf(ErrorTypeConnection, "xds: error received from xDS stream: %v", err)) 293 } 294 } 295} 296