1/* 2 * 3 * Copyright 2019 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package client 20 21import ( 22 "context" 23 "fmt" 24 "sync" 25 "time" 26 27 "google.golang.org/grpc" 28 "google.golang.org/grpc/grpclog" 29 "google.golang.org/grpc/internal/buffer" 30 31 xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" 32 corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" 33 adsgrpc "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2" 34) 35 36// The value chosen here is based on the default value of the 37// initial_fetch_timeout field in corepb.ConfigSource proto. 38var defaultWatchExpiryTimeout = 15 * time.Second 39 40// v2Client performs the actual xDS RPCs using the xDS v2 API. It creates a 41// single ADS stream on which the different types of xDS requests and responses 42// are multiplexed. 43// The reason for splitting this out from the top level xdsClient object is 44// because there is already an xDS v3Aplha API in development. If and when we 45// want to switch to that, this separation will ease that process. 46type v2Client struct { 47 ctx context.Context 48 cancelCtx context.CancelFunc 49 50 // ClientConn to the xDS gRPC server. Owned by the parent xdsClient. 51 cc *grpc.ClientConn 52 nodeProto *corepb.Node 53 backoff func(int) time.Duration 54 55 // sendCh in the channel onto which watchInfo objects are pushed by the 56 // watch API, and it is read and acted upon by the send() goroutine. 57 sendCh *buffer.Unbounded 58 59 mu sync.Mutex 60 // Message specific watch infos, protected by the above mutex. These are 61 // written to, after successfully reading from the update channel, and are 62 // read from when recovering from a broken stream to resend the xDS 63 // messages. When the user of this client object cancels a watch call, 64 // these are set to nil. All accesses to the map protected and any value 65 // inside the map should be protected with the above mutex. 66 watchMap map[string]*watchInfo 67 // ackMap contains the version that was acked (the version in the ack 68 // request that was sent on wire). The key is typeURL, the value is the 69 // version string, becaues the versions for different resource types 70 // should be independent. 71 ackMap map[string]string 72 // rdsCache maintains a mapping of {routeConfigName --> clusterName} from 73 // validated route configurations received in RDS responses. We cache all 74 // valid route configurations, whether or not we are interested in them 75 // when we received them (because we could become interested in them in the 76 // future and the server wont send us those resources again). 77 // Protected by the above mutex. 78 // 79 // TODO: remove RDS cache. The updated spec says client can ignore 80 // unrequested resources. 81 // https://github.com/envoyproxy/envoy/blob/master/api/xds_protocol.rst#resource-hints 82 rdsCache map[string]string 83 // rdsCache maintains a mapping of {clusterName --> CDSUpdate} from 84 // validated cluster configurations received in CDS responses. We cache all 85 // valid cluster configurations, whether or not we are interested in them 86 // when we received them (because we could become interested in them in the 87 // future and the server wont send us those resources again). This is only 88 // to support legacy management servers that do not honor the 89 // resource_names field. As per the latest spec, the server should resend 90 // the response when the request changes, even if it had sent the same 91 // resource earlier (when not asked for). Protected by the above mutex. 92 cdsCache map[string]CDSUpdate 93} 94 95// newV2Client creates a new v2Client initialized with the passed arguments. 96func newV2Client(cc *grpc.ClientConn, nodeProto *corepb.Node, backoff func(int) time.Duration) *v2Client { 97 v2c := &v2Client{ 98 cc: cc, 99 nodeProto: nodeProto, 100 backoff: backoff, 101 sendCh: buffer.NewUnbounded(), 102 watchMap: make(map[string]*watchInfo), 103 ackMap: make(map[string]string), 104 rdsCache: make(map[string]string), 105 cdsCache: make(map[string]CDSUpdate), 106 } 107 v2c.ctx, v2c.cancelCtx = context.WithCancel(context.Background()) 108 109 go v2c.run() 110 return v2c 111} 112 113// close cleans up resources and goroutines allocated by this client. 114func (v2c *v2Client) close() { 115 v2c.cancelCtx() 116} 117 118// run starts an ADS stream (and backs off exponentially, if the previous 119// stream failed without receiving a single reply) and runs the sender and 120// receiver routines to send and receive data from the stream respectively. 121func (v2c *v2Client) run() { 122 retries := 0 123 for { 124 select { 125 case <-v2c.ctx.Done(): 126 return 127 default: 128 } 129 130 if retries != 0 { 131 t := time.NewTimer(v2c.backoff(retries)) 132 select { 133 case <-t.C: 134 case <-v2c.ctx.Done(): 135 if !t.Stop() { 136 <-t.C 137 } 138 return 139 } 140 } 141 142 retries++ 143 cli := adsgrpc.NewAggregatedDiscoveryServiceClient(v2c.cc) 144 stream, err := cli.StreamAggregatedResources(v2c.ctx, grpc.WaitForReady(true)) 145 if err != nil { 146 grpclog.Infof("xds: ADS stream creation failed: %v", err) 147 continue 148 } 149 150 // send() could be blocked on reading updates from the different update 151 // channels when it is not actually sending out messages. So, we need a 152 // way to break out of send() when recv() returns. This done channel is 153 // used to for that purpose. 154 done := make(chan struct{}) 155 go v2c.send(stream, done) 156 if v2c.recv(stream) { 157 retries = 0 158 } 159 close(done) 160 } 161} 162 163// sendRequest sends a request for provided typeURL and resource on the provided 164// stream. 165// 166// version is the ack version to be sent with the request 167// - If this is the new request (not an ack/nack), version will be an empty 168// string 169// - If this is an ack, version will be the version from the response 170// - If this is a nack, version will be the previous acked version (from 171// ackMap). If there was no ack before, it will be an empty string 172func (v2c *v2Client) sendRequest(stream adsStream, resourceNames []string, typeURL, version, nonce string) bool { 173 req := &xdspb.DiscoveryRequest{ 174 Node: v2c.nodeProto, 175 TypeUrl: typeURL, 176 ResourceNames: resourceNames, 177 VersionInfo: version, 178 ResponseNonce: nonce, 179 // TODO: populate ErrorDetails for nack. 180 } 181 if err := stream.Send(req); err != nil { 182 grpclog.Warningf("xds: request (type %s) for resource %v failed: %v", typeURL, resourceNames, err) 183 return false 184 } 185 return true 186} 187 188// sendExisting sends out xDS requests for registered watchers when recovering 189// from a broken stream. 190// 191// We call stream.Send() here with the lock being held. It should be OK to do 192// that here because the stream has just started and Send() usually returns 193// quickly (once it pushes the message onto the transport layer) and is only 194// ever blocked if we don't have enough flow control quota. 195func (v2c *v2Client) sendExisting(stream adsStream) bool { 196 v2c.mu.Lock() 197 defer v2c.mu.Unlock() 198 199 // Reset the ack versions when the stream restarts. 200 v2c.ackMap = make(map[string]string) 201 202 for typeURL, wi := range v2c.watchMap { 203 if !v2c.sendRequest(stream, wi.target, typeURL, "", "") { 204 return false 205 } 206 } 207 208 return true 209} 210 211// processWatchInfo pulls the fields needed by the request from a watchInfo. 212// 213// It also calls callback with cached response, and updates the watch map in 214// v2c. 215// 216// If the watch was already canceled, it returns false for send 217func (v2c *v2Client) processWatchInfo(t *watchInfo) (target []string, typeURL, version, nonce string, send bool) { 218 v2c.mu.Lock() 219 defer v2c.mu.Unlock() 220 if t.state == watchCancelled { 221 return // This returns all zero values, and false for send. 222 } 223 t.state = watchStarted 224 send = true 225 226 typeURL = t.typeURL 227 target = t.target 228 v2c.checkCacheAndUpdateWatchMap(t) 229 // TODO: if watch is called again with the same resource names, 230 // there's no need to send another request. 231 // 232 // TODO: should we reset version (for ack) when a new watch is 233 // started? Or do this only if the resource names are different 234 // (so we send a new request)? 235 return 236} 237 238// processAckInfo pulls the fields needed by the ack request from a ackInfo. 239// 240// If no active watch is found for this ack, it returns false for send. 241func (v2c *v2Client) processAckInfo(t *ackInfo) (target []string, typeURL, version, nonce string, send bool) { 242 typeURL = t.typeURL 243 244 v2c.mu.Lock() 245 defer v2c.mu.Unlock() 246 wi, ok := v2c.watchMap[typeURL] 247 if !ok { 248 // We don't send the request ack if there's no active watch (this can be 249 // either the server sends responses before any request, or the watch is 250 // canceled while the ackInfo is in queue), because there's no resource 251 // name. And if we send a request with empty resource name list, the 252 // server may treat it as a wild card and send us everything. 253 grpclog.Warningf("xds: ack (type %s) not sent because there's no active watch for the type", typeURL) 254 return // This returns all zero values, and false for send. 255 } 256 send = true 257 258 version = t.version 259 nonce = t.nonce 260 target = wi.target 261 if version == "" { 262 // This is a nack, get the previous acked version. 263 version = v2c.ackMap[typeURL] 264 // version will still be an empty string if typeURL isn't 265 // found in ackMap, this can happen if there wasn't any ack 266 // before. 267 } else { 268 v2c.ackMap[typeURL] = version 269 } 270 return 271} 272 273// send reads watch infos from update channel and sends out actual xDS requests 274// on the provided ADS stream. 275func (v2c *v2Client) send(stream adsStream, done chan struct{}) { 276 if !v2c.sendExisting(stream) { 277 return 278 } 279 280 for { 281 select { 282 case <-v2c.ctx.Done(): 283 return 284 case u := <-v2c.sendCh.Get(): 285 v2c.sendCh.Load() 286 287 var ( 288 target []string 289 typeURL, version, nonce string 290 send bool 291 ) 292 switch t := u.(type) { 293 case *watchInfo: 294 target, typeURL, version, nonce, send = v2c.processWatchInfo(t) 295 case *ackInfo: 296 target, typeURL, version, nonce, send = v2c.processAckInfo(t) 297 } 298 if !send { 299 continue 300 } 301 if !v2c.sendRequest(stream, target, typeURL, version, nonce) { 302 return 303 } 304 case <-done: 305 return 306 } 307 } 308} 309 310// recv receives xDS responses on the provided ADS stream and branches out to 311// message specific handlers. 312func (v2c *v2Client) recv(stream adsStream) bool { 313 success := false 314 for { 315 resp, err := stream.Recv() 316 // TODO: call watch callbacks with error when stream is broken. 317 if err != nil { 318 grpclog.Warningf("xds: ADS stream recv failed: %v", err) 319 return success 320 } 321 var respHandleErr error 322 switch resp.GetTypeUrl() { 323 case ldsURL: 324 respHandleErr = v2c.handleLDSResponse(resp) 325 case rdsURL: 326 respHandleErr = v2c.handleRDSResponse(resp) 327 case cdsURL: 328 respHandleErr = v2c.handleCDSResponse(resp) 329 case edsURL: 330 respHandleErr = v2c.handleEDSResponse(resp) 331 default: 332 grpclog.Warningf("xds: unknown response URL type: %v", resp.GetTypeUrl()) 333 continue 334 } 335 336 typeURL := resp.GetTypeUrl() 337 if respHandleErr != nil { 338 grpclog.Warningf("xds: response (type %s) handler failed: %v", typeURL, respHandleErr) 339 v2c.sendCh.Put(&ackInfo{ 340 typeURL: typeURL, 341 version: "", 342 nonce: resp.GetNonce(), 343 }) 344 continue 345 } 346 v2c.sendCh.Put(&ackInfo{ 347 typeURL: typeURL, 348 version: resp.GetVersionInfo(), 349 nonce: resp.GetNonce(), 350 }) 351 success = true 352 } 353} 354 355// watchLDS registers an LDS watcher for the provided target. Updates 356// corresponding to received LDS responses will be pushed to the provided 357// callback. The caller can cancel the watch by invoking the returned cancel 358// function. 359// The provided callback should not block or perform any expensive operations 360// or call other methods of the v2Client object. 361func (v2c *v2Client) watchLDS(target string, ldsCb ldsCallback) (cancel func()) { 362 return v2c.watch(&watchInfo{ 363 typeURL: ldsURL, 364 target: []string{target}, 365 callback: ldsCb, 366 }) 367} 368 369// watchRDS registers an RDS watcher for the provided routeName. Updates 370// corresponding to received RDS responses will be pushed to the provided 371// callback. The caller can cancel the watch by invoking the returned cancel 372// function. 373// The provided callback should not block or perform any expensive operations 374// or call other methods of the v2Client object. 375func (v2c *v2Client) watchRDS(routeName string, rdsCb rdsCallback) (cancel func()) { 376 return v2c.watch(&watchInfo{ 377 typeURL: rdsURL, 378 target: []string{routeName}, 379 callback: rdsCb, 380 }) 381 // TODO: Once a registered RDS watch is cancelled, we should send an RDS 382 // request with no resources. This will let the server know that we are no 383 // longer interested in this resource. 384} 385 386// watchCDS registers an CDS watcher for the provided clusterName. Updates 387// corresponding to received CDS responses will be pushed to the provided 388// callback. The caller can cancel the watch by invoking the returned cancel 389// function. 390// The provided callback should not block or perform any expensive operations 391// or call other methods of the v2Client object. 392func (v2c *v2Client) watchCDS(clusterName string, cdsCb cdsCallback) (cancel func()) { 393 return v2c.watch(&watchInfo{ 394 typeURL: cdsURL, 395 target: []string{clusterName}, 396 callback: cdsCb, 397 }) 398} 399 400// watchEDS registers an EDS watcher for the provided clusterName. Updates 401// corresponding to received EDS responses will be pushed to the provided 402// callback. The caller can cancel the watch by invoking the returned cancel 403// function. 404// The provided callback should not block or perform any expensive operations 405// or call other methods of the v2Client object. 406func (v2c *v2Client) watchEDS(clusterName string, edsCb edsCallback) (cancel func()) { 407 return v2c.watch(&watchInfo{ 408 typeURL: edsURL, 409 target: []string{clusterName}, 410 callback: edsCb, 411 }) 412 // TODO: Once a registered EDS watch is cancelled, we should send an EDS 413 // request with no resources. This will let the server know that we are no 414 // longer interested in this resource. 415} 416 417func (v2c *v2Client) watch(wi *watchInfo) (cancel func()) { 418 v2c.sendCh.Put(wi) 419 return func() { 420 v2c.mu.Lock() 421 defer v2c.mu.Unlock() 422 if wi.state == watchEnqueued { 423 wi.state = watchCancelled 424 return 425 } 426 v2c.watchMap[wi.typeURL].cancel() 427 delete(v2c.watchMap, wi.typeURL) 428 // TODO: should we reset ack version string when cancelling the watch? 429 } 430} 431 432// checkCacheAndUpdateWatchMap is called when a new watch call is handled in 433// send(). If an existing watcher is found, its expiry timer is stopped. If the 434// watchInfo to be added to the watchMap is found in the cache, the watcher 435// callback is immediately invoked. 436// 437// Caller should hold v2c.mu 438func (v2c *v2Client) checkCacheAndUpdateWatchMap(wi *watchInfo) { 439 if existing := v2c.watchMap[wi.typeURL]; existing != nil { 440 existing.cancel() 441 } 442 443 v2c.watchMap[wi.typeURL] = wi 444 switch wi.typeURL { 445 // We need to grab the lock inside of the expiryTimer's afterFunc because 446 // we need to access the watchInfo, which is stored in the watchMap. 447 case ldsURL: 448 wi.expiryTimer = time.AfterFunc(defaultWatchExpiryTimeout, func() { 449 v2c.mu.Lock() 450 wi.callback.(ldsCallback)(ldsUpdate{}, fmt.Errorf("xds: LDS target %s not found", wi.target)) 451 v2c.mu.Unlock() 452 }) 453 case rdsURL: 454 routeName := wi.target[0] 455 if cluster := v2c.rdsCache[routeName]; cluster != "" { 456 var err error 457 if v2c.watchMap[ldsURL] == nil { 458 cluster = "" 459 err = fmt.Errorf("xds: no LDS watcher found when handling RDS watch for route {%v} from cache", routeName) 460 } 461 wi.callback.(rdsCallback)(rdsUpdate{clusterName: cluster}, err) 462 return 463 } 464 // Add the watch expiry timer only for new watches we don't find in 465 // the cache, and return from here. 466 wi.expiryTimer = time.AfterFunc(defaultWatchExpiryTimeout, func() { 467 v2c.mu.Lock() 468 wi.callback.(rdsCallback)(rdsUpdate{}, fmt.Errorf("xds: RDS target %s not found", wi.target)) 469 v2c.mu.Unlock() 470 }) 471 case cdsURL: 472 clusterName := wi.target[0] 473 if update, ok := v2c.cdsCache[clusterName]; ok { 474 var err error 475 if v2c.watchMap[cdsURL] == nil { 476 err = fmt.Errorf("xds: no CDS watcher found when handling CDS watch for cluster {%v} from cache", clusterName) 477 } 478 wi.callback.(cdsCallback)(update, err) 479 return 480 } 481 wi.expiryTimer = time.AfterFunc(defaultWatchExpiryTimeout, func() { 482 v2c.mu.Lock() 483 wi.callback.(cdsCallback)(CDSUpdate{}, fmt.Errorf("xds: CDS target %s not found", wi.target)) 484 v2c.mu.Unlock() 485 }) 486 case edsURL: 487 wi.expiryTimer = time.AfterFunc(defaultWatchExpiryTimeout, func() { 488 v2c.mu.Lock() 489 wi.callback.(edsCallback)(nil, fmt.Errorf("xds: EDS target %s not found", wi.target)) 490 v2c.mu.Unlock() 491 }) 492 } 493} 494