1/* 2 * 3 * Copyright 2019 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package client 20 21import ( 22 "context" 23 "fmt" 24 "sync" 25 "time" 26 27 "google.golang.org/grpc" 28 "google.golang.org/grpc/internal/buffer" 29 "google.golang.org/grpc/internal/grpclog" 30 31 xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" 32 corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" 33 adsgrpc "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2" 34) 35 36// The value chosen here is based on the default value of the 37// initial_fetch_timeout field in corepb.ConfigSource proto. 38var defaultWatchExpiryTimeout = 15 * time.Second 39 40// v2Client performs the actual xDS RPCs using the xDS v2 API. It creates a 41// single ADS stream on which the different types of xDS requests and responses 42// are multiplexed. 43// The reason for splitting this out from the top level xdsClient object is 44// because there is already an xDS v3Aplha API in development. If and when we 45// want to switch to that, this separation will ease that process. 46type v2Client struct { 47 ctx context.Context 48 cancelCtx context.CancelFunc 49 50 // ClientConn to the xDS gRPC server. Owned by the parent xdsClient. 51 cc *grpc.ClientConn 52 nodeProto *corepb.Node 53 backoff func(int) time.Duration 54 55 logger *grpclog.PrefixLogger 56 57 streamCh chan adsStream 58 // sendCh in the channel onto which watchInfo objects are pushed by the 59 // watch API, and it is read and acted upon by the send() goroutine. 60 sendCh *buffer.Unbounded 61 62 mu sync.Mutex 63 // Message specific watch infos, protected by the above mutex. These are 64 // written to, after successfully reading from the update channel, and are 65 // read from when recovering from a broken stream to resend the xDS 66 // messages. When the user of this client object cancels a watch call, 67 // these are set to nil. All accesses to the map protected and any value 68 // inside the map should be protected with the above mutex. 69 watchMap map[string]*watchInfo 70 // versionMap contains the version that was acked (the version in the ack 71 // request that was sent on wire). The key is typeURL, the value is the 72 // version string, becaues the versions for different resource types should 73 // be independent. 74 versionMap map[string]string 75 // nonceMap contains the nonce from the most recent received response. 76 nonceMap map[string]string 77 // rdsCache maintains a mapping of {routeConfigName --> clusterName} from 78 // validated route configurations received in RDS responses. We cache all 79 // valid route configurations, whether or not we are interested in them 80 // when we received them (because we could become interested in them in the 81 // future and the server wont send us those resources again). 82 // Protected by the above mutex. 83 // 84 // TODO: remove RDS cache. The updated spec says client can ignore 85 // unrequested resources. 86 // https://github.com/envoyproxy/envoy/blob/master/api/xds_protocol.rst#resource-hints 87 rdsCache map[string]string 88 // rdsCache maintains a mapping of {clusterName --> CDSUpdate} from 89 // validated cluster configurations received in CDS responses. We cache all 90 // valid cluster configurations, whether or not we are interested in them 91 // when we received them (because we could become interested in them in the 92 // future and the server wont send us those resources again). This is only 93 // to support legacy management servers that do not honor the 94 // resource_names field. As per the latest spec, the server should resend 95 // the response when the request changes, even if it had sent the same 96 // resource earlier (when not asked for). Protected by the above mutex. 97 cdsCache map[string]CDSUpdate 98} 99 100// newV2Client creates a new v2Client initialized with the passed arguments. 101func newV2Client(cc *grpc.ClientConn, nodeProto *corepb.Node, backoff func(int) time.Duration, logger *grpclog.PrefixLogger) *v2Client { 102 v2c := &v2Client{ 103 cc: cc, 104 nodeProto: nodeProto, 105 backoff: backoff, 106 107 logger: logger, 108 109 streamCh: make(chan adsStream, 1), 110 sendCh: buffer.NewUnbounded(), 111 112 watchMap: make(map[string]*watchInfo), 113 versionMap: make(map[string]string), 114 nonceMap: make(map[string]string), 115 rdsCache: make(map[string]string), 116 cdsCache: make(map[string]CDSUpdate), 117 } 118 v2c.ctx, v2c.cancelCtx = context.WithCancel(context.Background()) 119 120 go v2c.run() 121 return v2c 122} 123 124// close cleans up resources and goroutines allocated by this client. 125func (v2c *v2Client) close() { 126 v2c.cancelCtx() 127} 128 129// run starts an ADS stream (and backs off exponentially, if the previous 130// stream failed without receiving a single reply) and runs the sender and 131// receiver routines to send and receive data from the stream respectively. 132func (v2c *v2Client) run() { 133 go v2c.send() 134 // TODO: start a goroutine monitoring ClientConn's connectivity state, and 135 // report error (and log) when stats is transient failure. 136 137 retries := 0 138 for { 139 select { 140 case <-v2c.ctx.Done(): 141 return 142 default: 143 } 144 145 if retries != 0 { 146 t := time.NewTimer(v2c.backoff(retries)) 147 select { 148 case <-t.C: 149 case <-v2c.ctx.Done(): 150 if !t.Stop() { 151 <-t.C 152 } 153 return 154 } 155 } 156 157 retries++ 158 cli := adsgrpc.NewAggregatedDiscoveryServiceClient(v2c.cc) 159 stream, err := cli.StreamAggregatedResources(v2c.ctx, grpc.WaitForReady(true)) 160 if err != nil { 161 v2c.logger.Warningf("xds: ADS stream creation failed: %v", err) 162 continue 163 } 164 v2c.logger.Infof("ADS stream created") 165 166 select { 167 case <-v2c.streamCh: 168 default: 169 } 170 v2c.streamCh <- stream 171 if v2c.recv(stream) { 172 retries = 0 173 } 174 } 175} 176 177// sendRequest sends a request for provided typeURL and resource on the provided 178// stream. 179// 180// version is the ack version to be sent with the request 181// - If this is the new request (not an ack/nack), version will be an empty 182// string 183// - If this is an ack, version will be the version from the response 184// - If this is a nack, version will be the previous acked version (from 185// versionMap). If there was no ack before, it will be an empty string 186func (v2c *v2Client) sendRequest(stream adsStream, resourceNames []string, typeURL, version, nonce string) bool { 187 req := &xdspb.DiscoveryRequest{ 188 Node: v2c.nodeProto, 189 TypeUrl: typeURL, 190 ResourceNames: resourceNames, 191 VersionInfo: version, 192 ResponseNonce: nonce, 193 // TODO: populate ErrorDetails for nack. 194 } 195 if err := stream.Send(req); err != nil { 196 return false 197 } 198 v2c.logger.Debugf("ADS request sent: %v", req) 199 return true 200} 201 202// sendExisting sends out xDS requests for registered watchers when recovering 203// from a broken stream. 204// 205// We call stream.Send() here with the lock being held. It should be OK to do 206// that here because the stream has just started and Send() usually returns 207// quickly (once it pushes the message onto the transport layer) and is only 208// ever blocked if we don't have enough flow control quota. 209func (v2c *v2Client) sendExisting(stream adsStream) bool { 210 v2c.mu.Lock() 211 defer v2c.mu.Unlock() 212 213 // Reset the ack versions when the stream restarts. 214 v2c.versionMap = make(map[string]string) 215 v2c.nonceMap = make(map[string]string) 216 217 for typeURL, wi := range v2c.watchMap { 218 if !v2c.sendRequest(stream, wi.target, typeURL, "", "") { 219 return false 220 } 221 } 222 223 return true 224} 225 226// processWatchInfo pulls the fields needed by the request from a watchInfo. 227// 228// It also calls callback with cached response, and updates the watch map in 229// v2c. 230// 231// If the watch was already canceled, it returns false for send 232func (v2c *v2Client) processWatchInfo(t *watchInfo) (target []string, typeURL, version, nonce string, send bool) { 233 v2c.mu.Lock() 234 defer v2c.mu.Unlock() 235 if t.state == watchCancelled { 236 return // This returns all zero values, and false for send. 237 } 238 t.state = watchStarted 239 send = true 240 241 typeURL = t.typeURL 242 target = t.target 243 v2c.checkCacheAndUpdateWatchMap(t) 244 // TODO: if watch is called again with the same resource names, 245 // there's no need to send another request. 246 247 // We don't reset version or nonce when a new watch is started. The version 248 // and nonce from previous response are carried by the request unless the 249 // stream is recreated. 250 version = v2c.versionMap[typeURL] 251 nonce = v2c.nonceMap[typeURL] 252 return 253} 254 255// processAckInfo pulls the fields needed by the ack request from a ackInfo. 256// 257// If no active watch is found for this ack, it returns false for send. 258func (v2c *v2Client) processAckInfo(t *ackInfo) (target []string, typeURL, version, nonce string, send bool) { 259 typeURL = t.typeURL 260 261 v2c.mu.Lock() 262 defer v2c.mu.Unlock() 263 wi, ok := v2c.watchMap[typeURL] 264 if !ok { 265 // We don't send the request ack if there's no active watch (this can be 266 // either the server sends responses before any request, or the watch is 267 // canceled while the ackInfo is in queue), because there's no resource 268 // name. And if we send a request with empty resource name list, the 269 // server may treat it as a wild card and send us everything. 270 return // This returns all zero values, and false for send. 271 } 272 send = true 273 274 version = t.version 275 nonce = t.nonce 276 target = wi.target 277 if version == "" { 278 // This is a nack, get the previous acked version. 279 version = v2c.versionMap[typeURL] 280 // version will still be an empty string if typeURL isn't 281 // found in versionMap, this can happen if there wasn't any ack 282 // before. 283 } else { 284 v2c.versionMap[typeURL] = version 285 } 286 v2c.nonceMap[typeURL] = nonce 287 return 288} 289 290// send is a separate goroutine for sending watch requests on the xds stream. 291// 292// It watches the stream channel for new streams, and the request channel for 293// new requests to send on the stream. 294// 295// For each new request (watchInfo), it's 296// - processed and added to the watch map 297// - so resend will pick them up when there are new streams) 298// - sent on the current stream if there's one 299// - the current stream is cleared when any send on it fails 300// 301// For each new stream, all the existing requests will be resent. 302// 303// Note that this goroutine doesn't do anything to the old stream when there's a 304// new one. In fact, there should be only one stream in progress, and new one 305// should only be created when the old one fails (recv returns an error). 306func (v2c *v2Client) send() { 307 var stream adsStream 308 for { 309 select { 310 case <-v2c.ctx.Done(): 311 return 312 case newStream := <-v2c.streamCh: 313 stream = newStream 314 if !v2c.sendExisting(stream) { 315 // send failed, clear the current stream. 316 stream = nil 317 } 318 case u := <-v2c.sendCh.Get(): 319 v2c.sendCh.Load() 320 321 var ( 322 target []string 323 typeURL, version, nonce string 324 send bool 325 ) 326 switch t := u.(type) { 327 case *watchInfo: 328 target, typeURL, version, nonce, send = v2c.processWatchInfo(t) 329 case *ackInfo: 330 target, typeURL, version, nonce, send = v2c.processAckInfo(t) 331 } 332 if !send { 333 continue 334 } 335 if stream == nil { 336 // There's no stream yet. Skip the request. This request 337 // will be resent to the new streams. If no stream is 338 // created, the watcher will timeout (same as server not 339 // sending response back). 340 continue 341 } 342 if !v2c.sendRequest(stream, target, typeURL, version, nonce) { 343 // send failed, clear the current stream. 344 stream = nil 345 } 346 } 347 } 348} 349 350// recv receives xDS responses on the provided ADS stream and branches out to 351// message specific handlers. 352func (v2c *v2Client) recv(stream adsStream) bool { 353 success := false 354 for { 355 resp, err := stream.Recv() 356 // TODO: call watch callbacks with error when stream is broken. 357 if err != nil { 358 v2c.logger.Warningf("ADS stream is closed with error: %v", err) 359 return success 360 } 361 v2c.logger.Infof("ADS response received, type: %v", resp.GetTypeUrl()) 362 v2c.logger.Debugf("ADS response received: %v", resp) 363 var respHandleErr error 364 switch resp.GetTypeUrl() { 365 case ldsURL: 366 respHandleErr = v2c.handleLDSResponse(resp) 367 case rdsURL: 368 respHandleErr = v2c.handleRDSResponse(resp) 369 case cdsURL: 370 respHandleErr = v2c.handleCDSResponse(resp) 371 case edsURL: 372 respHandleErr = v2c.handleEDSResponse(resp) 373 default: 374 v2c.logger.Warningf("Resource type %v unknown in response from server", resp.GetTypeUrl()) 375 continue 376 } 377 378 typeURL := resp.GetTypeUrl() 379 if respHandleErr != nil { 380 v2c.sendCh.Put(&ackInfo{ 381 typeURL: typeURL, 382 version: "", 383 nonce: resp.GetNonce(), 384 }) 385 v2c.logger.Warningf("Sending NACK for response type: %v, version: %v, nonce: %v, reason: %v", typeURL, resp.GetVersionInfo(), resp.GetNonce(), respHandleErr) 386 continue 387 } 388 v2c.sendCh.Put(&ackInfo{ 389 typeURL: typeURL, 390 version: resp.GetVersionInfo(), 391 nonce: resp.GetNonce(), 392 }) 393 v2c.logger.Infof("Sending ACK for response type: %v, version: %v, nonce: %v", typeURL, resp.GetVersionInfo(), resp.GetNonce()) 394 success = true 395 } 396} 397 398// watchLDS registers an LDS watcher for the provided target. Updates 399// corresponding to received LDS responses will be pushed to the provided 400// callback. The caller can cancel the watch by invoking the returned cancel 401// function. 402// The provided callback should not block or perform any expensive operations 403// or call other methods of the v2Client object. 404func (v2c *v2Client) watchLDS(target string, ldsCb ldsCallback) (cancel func()) { 405 return v2c.watch(&watchInfo{ 406 typeURL: ldsURL, 407 target: []string{target}, 408 callback: ldsCb, 409 }) 410} 411 412// watchRDS registers an RDS watcher for the provided routeName. Updates 413// corresponding to received RDS responses will be pushed to the provided 414// callback. The caller can cancel the watch by invoking the returned cancel 415// function. 416// The provided callback should not block or perform any expensive operations 417// or call other methods of the v2Client object. 418func (v2c *v2Client) watchRDS(routeName string, rdsCb rdsCallback) (cancel func()) { 419 return v2c.watch(&watchInfo{ 420 typeURL: rdsURL, 421 target: []string{routeName}, 422 callback: rdsCb, 423 }) 424 // TODO: Once a registered RDS watch is cancelled, we should send an RDS 425 // request with no resources. This will let the server know that we are no 426 // longer interested in this resource. 427} 428 429// watchCDS registers an CDS watcher for the provided clusterName. Updates 430// corresponding to received CDS responses will be pushed to the provided 431// callback. The caller can cancel the watch by invoking the returned cancel 432// function. 433// The provided callback should not block or perform any expensive operations 434// or call other methods of the v2Client object. 435func (v2c *v2Client) watchCDS(clusterName string, cdsCb cdsCallback) (cancel func()) { 436 return v2c.watch(&watchInfo{ 437 typeURL: cdsURL, 438 target: []string{clusterName}, 439 callback: cdsCb, 440 }) 441} 442 443// watchEDS registers an EDS watcher for the provided clusterName. Updates 444// corresponding to received EDS responses will be pushed to the provided 445// callback. The caller can cancel the watch by invoking the returned cancel 446// function. 447// The provided callback should not block or perform any expensive operations 448// or call other methods of the v2Client object. 449func (v2c *v2Client) watchEDS(clusterName string, edsCb edsCallback) (cancel func()) { 450 return v2c.watch(&watchInfo{ 451 typeURL: edsURL, 452 target: []string{clusterName}, 453 callback: edsCb, 454 }) 455 // TODO: Once a registered EDS watch is cancelled, we should send an EDS 456 // request with no resources. This will let the server know that we are no 457 // longer interested in this resource. 458} 459 460func (v2c *v2Client) watch(wi *watchInfo) (cancel func()) { 461 v2c.sendCh.Put(wi) 462 v2c.logger.Infof("Sending ADS request for new watch of type: %v, resource names: %v", wi.typeURL, wi.target) 463 return func() { 464 v2c.mu.Lock() 465 defer v2c.mu.Unlock() 466 if wi.state == watchEnqueued { 467 wi.state = watchCancelled 468 return 469 } 470 v2c.watchMap[wi.typeURL].cancel() 471 delete(v2c.watchMap, wi.typeURL) 472 // TODO: should we reset ack version string when cancelling the watch? 473 } 474} 475 476// checkCacheAndUpdateWatchMap is called when a new watch call is handled in 477// send(). If an existing watcher is found, its expiry timer is stopped. If the 478// watchInfo to be added to the watchMap is found in the cache, the watcher 479// callback is immediately invoked. 480// 481// Caller should hold v2c.mu 482func (v2c *v2Client) checkCacheAndUpdateWatchMap(wi *watchInfo) { 483 if existing := v2c.watchMap[wi.typeURL]; existing != nil { 484 existing.cancel() 485 } 486 487 v2c.watchMap[wi.typeURL] = wi 488 switch wi.typeURL { 489 // We need to grab the lock inside of the expiryTimer's afterFunc because 490 // we need to access the watchInfo, which is stored in the watchMap. 491 case ldsURL: 492 wi.expiryTimer = time.AfterFunc(defaultWatchExpiryTimeout, func() { 493 v2c.mu.Lock() 494 wi.callback.(ldsCallback)(ldsUpdate{}, fmt.Errorf("xds: LDS target %s not found, watcher timeout", wi.target)) 495 v2c.mu.Unlock() 496 }) 497 case rdsURL: 498 routeName := wi.target[0] 499 if cluster := v2c.rdsCache[routeName]; cluster != "" { 500 var err error 501 if v2c.watchMap[ldsURL] == nil { 502 cluster = "" 503 err = fmt.Errorf("xds: no LDS watcher found when handling RDS watch for route {%v} from cache", routeName) 504 } 505 v2c.logger.Infof("Resource with name %v, type %v found in cache", routeName, wi.typeURL) 506 wi.callback.(rdsCallback)(rdsUpdate{clusterName: cluster}, err) 507 return 508 } 509 // Add the watch expiry timer only for new watches we don't find in 510 // the cache, and return from here. 511 wi.expiryTimer = time.AfterFunc(defaultWatchExpiryTimeout, func() { 512 v2c.mu.Lock() 513 wi.callback.(rdsCallback)(rdsUpdate{}, fmt.Errorf("xds: RDS target %s not found, watcher timeout", wi.target)) 514 v2c.mu.Unlock() 515 }) 516 case cdsURL: 517 clusterName := wi.target[0] 518 if update, ok := v2c.cdsCache[clusterName]; ok { 519 var err error 520 if v2c.watchMap[cdsURL] == nil { 521 err = fmt.Errorf("xds: no CDS watcher found when handling CDS watch for cluster {%v} from cache", clusterName) 522 } 523 v2c.logger.Infof("Resource with name %v, type %v found in cache", clusterName, wi.typeURL) 524 wi.callback.(cdsCallback)(update, err) 525 return 526 } 527 wi.expiryTimer = time.AfterFunc(defaultWatchExpiryTimeout, func() { 528 v2c.mu.Lock() 529 wi.callback.(cdsCallback)(CDSUpdate{}, fmt.Errorf("xds: CDS target %s not found, watcher timeout", wi.target)) 530 v2c.mu.Unlock() 531 }) 532 case edsURL: 533 wi.expiryTimer = time.AfterFunc(defaultWatchExpiryTimeout, func() { 534 v2c.mu.Lock() 535 wi.callback.(edsCallback)(nil, fmt.Errorf("xds: EDS target %s not found, watcher timeout", wi.target)) 536 v2c.mu.Unlock() 537 }) 538 } 539} 540