1/* 2 * 3 * Copyright 2020 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package client 20 21type watcherInfoWithUpdate struct { 22 wi *watchInfo 23 update interface{} 24 err error 25} 26 27// scheduleCallback should only be called by methods of watchInfo, which checks 28// for watcher states and maintain consistency. 29func (c *clientImpl) scheduleCallback(wi *watchInfo, update interface{}, err error) { 30 c.updateCh.Put(&watcherInfoWithUpdate{ 31 wi: wi, 32 update: update, 33 err: err, 34 }) 35} 36 37func (c *clientImpl) callCallback(wiu *watcherInfoWithUpdate) { 38 c.mu.Lock() 39 // Use a closure to capture the callback and type assertion, to save one 40 // more switch case. 41 // 42 // The callback must be called without c.mu. Otherwise if the callback calls 43 // another watch() inline, it will cause a deadlock. This leaves a small 44 // window that a watcher's callback could be called after the watcher is 45 // canceled, and the user needs to take care of it. 46 var ccb func() 47 switch wiu.wi.rType { 48 case ListenerResource: 49 if s, ok := c.ldsWatchers[wiu.wi.target]; ok && s[wiu.wi] { 50 ccb = func() { wiu.wi.ldsCallback(wiu.update.(ListenerUpdate), wiu.err) } 51 } 52 case RouteConfigResource: 53 if s, ok := c.rdsWatchers[wiu.wi.target]; ok && s[wiu.wi] { 54 ccb = func() { wiu.wi.rdsCallback(wiu.update.(RouteConfigUpdate), wiu.err) } 55 } 56 case ClusterResource: 57 if s, ok := c.cdsWatchers[wiu.wi.target]; ok && s[wiu.wi] { 58 ccb = func() { wiu.wi.cdsCallback(wiu.update.(ClusterUpdate), wiu.err) } 59 } 60 case EndpointsResource: 61 if s, ok := c.edsWatchers[wiu.wi.target]; ok && s[wiu.wi] { 62 ccb = func() { wiu.wi.edsCallback(wiu.update.(EndpointsUpdate), wiu.err) } 63 } 64 } 65 c.mu.Unlock() 66 67 if ccb != nil { 68 ccb() 69 } 70} 71 72// NewListeners is called by the underlying xdsAPIClient when it receives an 73// xDS response. 74// 75// A response can contain multiple resources. They will be parsed and put in a 76// map from resource name to the resource content. 77func (c *clientImpl) NewListeners(updates map[string]ListenerUpdate, metadata UpdateMetadata) { 78 c.mu.Lock() 79 defer c.mu.Unlock() 80 81 if metadata.ErrState != nil { 82 // On NACK, update overall version to the NACKed resp. 83 c.ldsVersion = metadata.ErrState.Version 84 for name := range updates { 85 if _, ok := c.ldsWatchers[name]; ok { 86 // On error, keep previous version for each resource. But update 87 // status and error. 88 mdCopy := c.ldsMD[name] 89 mdCopy.ErrState = metadata.ErrState 90 mdCopy.Status = metadata.Status 91 c.ldsMD[name] = mdCopy 92 // TODO: send the NACK error to the watcher. 93 } 94 } 95 return 96 } 97 98 // If no error received, the status is ACK. 99 c.ldsVersion = metadata.Version 100 for name, update := range updates { 101 if s, ok := c.ldsWatchers[name]; ok { 102 // Only send the update if this is not an error. 103 for wi := range s { 104 wi.newUpdate(update) 105 } 106 // Sync cache. 107 c.logger.Debugf("LDS resource with name %v, value %+v added to cache", name, update) 108 c.ldsCache[name] = update 109 c.ldsMD[name] = metadata 110 } 111 } 112 // Resources not in the new update were removed by the server, so delete 113 // them. 114 for name := range c.ldsCache { 115 if _, ok := updates[name]; !ok { 116 // If resource exists in cache, but not in the new update, delete 117 // the resource from cache, and also send an resource not found 118 // error to indicate resource removed. 119 delete(c.ldsCache, name) 120 c.ldsMD[name] = UpdateMetadata{Status: ServiceStatusNotExist} 121 for wi := range c.ldsWatchers[name] { 122 wi.resourceNotFound() 123 } 124 } 125 } 126 // When LDS resource is removed, we don't delete corresponding RDS cached 127 // data. The RDS watch will be canceled, and cache entry is removed when the 128 // last watch is canceled. 129} 130 131// NewRouteConfigs is called by the underlying xdsAPIClient when it receives an 132// xDS response. 133// 134// A response can contain multiple resources. They will be parsed and put in a 135// map from resource name to the resource content. 136func (c *clientImpl) NewRouteConfigs(updates map[string]RouteConfigUpdate, metadata UpdateMetadata) { 137 c.mu.Lock() 138 defer c.mu.Unlock() 139 140 if metadata.ErrState != nil { 141 // On NACK, update overall version to the NACKed resp. 142 c.rdsVersion = metadata.ErrState.Version 143 for name := range updates { 144 if _, ok := c.rdsWatchers[name]; ok { 145 // On error, keep previous version for each resource. But update 146 // status and error. 147 mdCopy := c.rdsMD[name] 148 mdCopy.ErrState = metadata.ErrState 149 mdCopy.Status = metadata.Status 150 c.rdsMD[name] = mdCopy 151 // TODO: send the NACK error to the watcher. 152 } 153 } 154 return 155 } 156 157 // If no error received, the status is ACK. 158 c.rdsVersion = metadata.Version 159 for name, update := range updates { 160 if s, ok := c.rdsWatchers[name]; ok { 161 // Only send the update if this is not an error. 162 for wi := range s { 163 wi.newUpdate(update) 164 } 165 // Sync cache. 166 c.logger.Debugf("RDS resource with name %v, value %+v added to cache", name, update) 167 c.rdsCache[name] = update 168 c.rdsMD[name] = metadata 169 } 170 } 171} 172 173// NewClusters is called by the underlying xdsAPIClient when it receives an xDS 174// response. 175// 176// A response can contain multiple resources. They will be parsed and put in a 177// map from resource name to the resource content. 178func (c *clientImpl) NewClusters(updates map[string]ClusterUpdate, metadata UpdateMetadata) { 179 c.mu.Lock() 180 defer c.mu.Unlock() 181 182 if metadata.ErrState != nil { 183 // On NACK, update overall version to the NACKed resp. 184 c.cdsVersion = metadata.ErrState.Version 185 for name := range updates { 186 if _, ok := c.cdsWatchers[name]; ok { 187 // On error, keep previous version for each resource. But update 188 // status and error. 189 mdCopy := c.cdsMD[name] 190 mdCopy.ErrState = metadata.ErrState 191 mdCopy.Status = metadata.Status 192 c.cdsMD[name] = mdCopy 193 // TODO: send the NACK error to the watcher. 194 } 195 } 196 return 197 } 198 199 // If no error received, the status is ACK. 200 c.cdsVersion = metadata.Version 201 for name, update := range updates { 202 if s, ok := c.cdsWatchers[name]; ok { 203 // Only send the update if this is not an error. 204 for wi := range s { 205 wi.newUpdate(update) 206 } 207 // Sync cache. 208 c.logger.Debugf("CDS resource with name %v, value %+v added to cache", name, update) 209 c.cdsCache[name] = update 210 c.cdsMD[name] = metadata 211 } 212 } 213 // Resources not in the new update were removed by the server, so delete 214 // them. 215 for name := range c.cdsCache { 216 if _, ok := updates[name]; !ok { 217 // If resource exists in cache, but not in the new update, delete it 218 // from cache, and also send an resource not found error to indicate 219 // resource removed. 220 delete(c.cdsCache, name) 221 c.ldsMD[name] = UpdateMetadata{Status: ServiceStatusNotExist} 222 for wi := range c.cdsWatchers[name] { 223 wi.resourceNotFound() 224 } 225 } 226 } 227 // When CDS resource is removed, we don't delete corresponding EDS cached 228 // data. The EDS watch will be canceled, and cache entry is removed when the 229 // last watch is canceled. 230} 231 232// NewEndpoints is called by the underlying xdsAPIClient when it receives an 233// xDS response. 234// 235// A response can contain multiple resources. They will be parsed and put in a 236// map from resource name to the resource content. 237func (c *clientImpl) NewEndpoints(updates map[string]EndpointsUpdate, metadata UpdateMetadata) { 238 c.mu.Lock() 239 defer c.mu.Unlock() 240 241 if metadata.ErrState != nil { 242 // On NACK, update overall version to the NACKed resp. 243 c.edsVersion = metadata.ErrState.Version 244 for name := range updates { 245 if _, ok := c.edsWatchers[name]; ok { 246 // On error, keep previous version for each resource. But update 247 // status and error. 248 mdCopy := c.edsMD[name] 249 mdCopy.ErrState = metadata.ErrState 250 mdCopy.Status = metadata.Status 251 c.edsMD[name] = mdCopy 252 // TODO: send the NACK error to the watcher. 253 } 254 } 255 return 256 } 257 258 // If no error received, the status is ACK. 259 c.edsVersion = metadata.Version 260 for name, update := range updates { 261 if s, ok := c.edsWatchers[name]; ok { 262 // Only send the update if this is not an error. 263 for wi := range s { 264 wi.newUpdate(update) 265 } 266 // Sync cache. 267 c.logger.Debugf("EDS resource with name %v, value %+v added to cache", name, update) 268 c.edsCache[name] = update 269 c.edsMD[name] = metadata 270 } 271 } 272} 273