1/* 2 * 3 * Copyright 2020 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package client 20 21import ( 22 "context" 23 "sync" 24 "time" 25 26 "github.com/golang/protobuf/proto" 27 "google.golang.org/grpc/xds/internal/client/load" 28 29 "google.golang.org/grpc" 30 "google.golang.org/grpc/internal/buffer" 31 "google.golang.org/grpc/internal/grpclog" 32) 33 34// ErrResourceTypeUnsupported is an error used to indicate an unsupported xDS 35// resource type. The wrapped ErrStr contains the details. 36type ErrResourceTypeUnsupported struct { 37 ErrStr string 38} 39 40// Error helps implements the error interface. 41func (e ErrResourceTypeUnsupported) Error() string { 42 return e.ErrStr 43} 44 45// VersionedClient is the interface to be provided by the transport protocol 46// specific client implementations. This mainly deals with the actual sending 47// and receiving of messages. 48type VersionedClient interface { 49 // NewStream returns a new xDS client stream specific to the underlying 50 // transport protocol version. 51 NewStream(ctx context.Context) (grpc.ClientStream, error) 52 53 // SendRequest constructs and sends out a DiscoveryRequest message specific 54 // to the underlying transport protocol version. 55 SendRequest(s grpc.ClientStream, resourceNames []string, rType ResourceType, version, nonce, errMsg string) error 56 57 // RecvResponse uses the provided stream to receive a response specific to 58 // the underlying transport protocol version. 59 RecvResponse(s grpc.ClientStream) (proto.Message, error) 60 61 // HandleResponse parses and validates the received response and notifies 62 // the top-level client which in turn notifies the registered watchers. 63 // 64 // Return values are: resourceType, version, nonce, error. 65 // If the provided protobuf message contains a resource type which is not 66 // supported, implementations must return an error of type 67 // ErrResourceTypeUnsupported. 68 HandleResponse(proto.Message) (ResourceType, string, string, error) 69 70 // NewLoadStatsStream returns a new LRS client stream specific to the underlying 71 // transport protocol version. 72 NewLoadStatsStream(ctx context.Context, cc *grpc.ClientConn) (grpc.ClientStream, error) 73 74 // SendFirstLoadStatsRequest constructs and sends the first request on the 75 // LRS stream. 76 SendFirstLoadStatsRequest(s grpc.ClientStream) error 77 78 // HandleLoadStatsResponse receives the first response from the server which 79 // contains the load reporting interval and the clusters for which the 80 // server asks the client to report load for. 81 // 82 // If the response sets SendAllClusters to true, the returned clusters is 83 // nil. 84 HandleLoadStatsResponse(s grpc.ClientStream) (clusters []string, _ time.Duration, _ error) 85 86 // SendLoadStatsRequest will be invoked at regular intervals to send load 87 // report with load data reported since the last time this method was 88 // invoked. 89 SendLoadStatsRequest(s grpc.ClientStream, loads []*load.Data) error 90} 91 92// TransportHelper contains all xDS transport protocol related functionality 93// which is common across different versioned client implementations. 94// 95// TransportHelper takes care of sending and receiving xDS requests and 96// responses on an ADS stream. It also takes care of ACK/NACK handling. It 97// delegates to the actual versioned client implementations wherever 98// appropriate. 99// 100// Implements the APIClient interface which makes it possible for versioned 101// client implementations to embed this type, and thereby satisfy the interface 102// requirements. 103type TransportHelper struct { 104 cancelCtx context.CancelFunc 105 106 vClient VersionedClient 107 logger *grpclog.PrefixLogger 108 backoff func(int) time.Duration 109 streamCh chan grpc.ClientStream 110 sendCh *buffer.Unbounded 111 112 mu sync.Mutex 113 // Message specific watch infos, protected by the above mutex. These are 114 // written to, after successfully reading from the update channel, and are 115 // read from when recovering from a broken stream to resend the xDS 116 // messages. When the user of this client object cancels a watch call, 117 // these are set to nil. All accesses to the map protected and any value 118 // inside the map should be protected with the above mutex. 119 watchMap map[ResourceType]map[string]bool 120 // versionMap contains the version that was acked (the version in the ack 121 // request that was sent on wire). The key is rType, the value is the 122 // version string, becaues the versions for different resource types should 123 // be independent. 124 versionMap map[ResourceType]string 125 // nonceMap contains the nonce from the most recent received response. 126 nonceMap map[ResourceType]string 127} 128 129// NewTransportHelper creates a new transport helper to be used by versioned 130// client implementations. 131func NewTransportHelper(vc VersionedClient, logger *grpclog.PrefixLogger, backoff func(int) time.Duration) *TransportHelper { 132 ctx, cancelCtx := context.WithCancel(context.Background()) 133 t := &TransportHelper{ 134 cancelCtx: cancelCtx, 135 vClient: vc, 136 logger: logger, 137 backoff: backoff, 138 139 streamCh: make(chan grpc.ClientStream, 1), 140 sendCh: buffer.NewUnbounded(), 141 watchMap: make(map[ResourceType]map[string]bool), 142 versionMap: make(map[ResourceType]string), 143 nonceMap: make(map[ResourceType]string), 144 } 145 146 go t.run(ctx) 147 return t 148} 149 150// AddWatch adds a watch for an xDS resource given its type and name. 151func (t *TransportHelper) AddWatch(rType ResourceType, resourceName string) { 152 t.sendCh.Put(&watchAction{ 153 rType: rType, 154 remove: false, 155 resource: resourceName, 156 }) 157} 158 159// RemoveWatch cancels an already registered watch for an xDS resource 160// given its type and name. 161func (t *TransportHelper) RemoveWatch(rType ResourceType, resourceName string) { 162 t.sendCh.Put(&watchAction{ 163 rType: rType, 164 remove: true, 165 resource: resourceName, 166 }) 167} 168 169// Close closes the transport helper. 170func (t *TransportHelper) Close() { 171 t.cancelCtx() 172} 173 174// run starts an ADS stream (and backs off exponentially, if the previous 175// stream failed without receiving a single reply) and runs the sender and 176// receiver routines to send and receive data from the stream respectively. 177func (t *TransportHelper) run(ctx context.Context) { 178 go t.send(ctx) 179 // TODO: start a goroutine monitoring ClientConn's connectivity state, and 180 // report error (and log) when stats is transient failure. 181 182 retries := 0 183 for { 184 select { 185 case <-ctx.Done(): 186 return 187 default: 188 } 189 190 if retries != 0 { 191 timer := time.NewTimer(t.backoff(retries)) 192 select { 193 case <-timer.C: 194 case <-ctx.Done(): 195 if !timer.Stop() { 196 <-timer.C 197 } 198 return 199 } 200 } 201 202 retries++ 203 stream, err := t.vClient.NewStream(ctx) 204 if err != nil { 205 t.logger.Warningf("xds: ADS stream creation failed: %v", err) 206 continue 207 } 208 t.logger.Infof("ADS stream created") 209 210 select { 211 case <-t.streamCh: 212 default: 213 } 214 t.streamCh <- stream 215 if t.recv(stream) { 216 retries = 0 217 } 218 } 219} 220 221// send is a separate goroutine for sending watch requests on the xds stream. 222// 223// It watches the stream channel for new streams, and the request channel for 224// new requests to send on the stream. 225// 226// For each new request (watchAction), it's 227// - processed and added to the watch map 228// - so resend will pick them up when there are new streams 229// - sent on the current stream if there's one 230// - the current stream is cleared when any send on it fails 231// 232// For each new stream, all the existing requests will be resent. 233// 234// Note that this goroutine doesn't do anything to the old stream when there's a 235// new one. In fact, there should be only one stream in progress, and new one 236// should only be created when the old one fails (recv returns an error). 237func (t *TransportHelper) send(ctx context.Context) { 238 var stream grpc.ClientStream 239 for { 240 select { 241 case <-ctx.Done(): 242 return 243 case stream = <-t.streamCh: 244 if !t.sendExisting(stream) { 245 // send failed, clear the current stream. 246 stream = nil 247 } 248 case u := <-t.sendCh.Get(): 249 t.sendCh.Load() 250 251 var ( 252 target []string 253 rType ResourceType 254 version, nonce, errMsg string 255 send bool 256 ) 257 switch update := u.(type) { 258 case *watchAction: 259 target, rType, version, nonce = t.processWatchInfo(update) 260 case *ackAction: 261 target, rType, version, nonce, send = t.processAckInfo(update, stream) 262 if !send { 263 continue 264 } 265 errMsg = update.errMsg 266 } 267 if stream == nil { 268 // There's no stream yet. Skip the request. This request 269 // will be resent to the new streams. If no stream is 270 // created, the watcher will timeout (same as server not 271 // sending response back). 272 continue 273 } 274 if err := t.vClient.SendRequest(stream, target, rType, version, nonce, errMsg); err != nil { 275 t.logger.Warningf("ADS request for {target: %q, type: %v, version: %q, nonce: %q} failed: %v", target, rType, version, nonce, err) 276 // send failed, clear the current stream. 277 stream = nil 278 } 279 } 280 } 281} 282 283// sendExisting sends out xDS requests for registered watchers when recovering 284// from a broken stream. 285// 286// We call stream.Send() here with the lock being held. It should be OK to do 287// that here because the stream has just started and Send() usually returns 288// quickly (once it pushes the message onto the transport layer) and is only 289// ever blocked if we don't have enough flow control quota. 290func (t *TransportHelper) sendExisting(stream grpc.ClientStream) bool { 291 t.mu.Lock() 292 defer t.mu.Unlock() 293 294 // Reset the ack versions when the stream restarts. 295 t.versionMap = make(map[ResourceType]string) 296 t.nonceMap = make(map[ResourceType]string) 297 298 for rType, s := range t.watchMap { 299 if err := t.vClient.SendRequest(stream, mapToSlice(s), rType, "", "", ""); err != nil { 300 t.logger.Warningf("ADS request failed: %v", err) 301 return false 302 } 303 } 304 305 return true 306} 307 308// recv receives xDS responses on the provided ADS stream and branches out to 309// message specific handlers. 310func (t *TransportHelper) recv(stream grpc.ClientStream) bool { 311 success := false 312 for { 313 resp, err := t.vClient.RecvResponse(stream) 314 if err != nil { 315 t.logger.Warningf("ADS stream is closed with error: %v", err) 316 return success 317 } 318 rType, version, nonce, err := t.vClient.HandleResponse(resp) 319 if e, ok := err.(ErrResourceTypeUnsupported); ok { 320 t.logger.Warningf("%s", e.ErrStr) 321 continue 322 } 323 if err != nil { 324 t.sendCh.Put(&ackAction{ 325 rType: rType, 326 version: "", 327 nonce: nonce, 328 errMsg: err.Error(), 329 stream: stream, 330 }) 331 t.logger.Warningf("Sending NACK for response type: %v, version: %v, nonce: %v, reason: %v", rType, version, nonce, err) 332 continue 333 } 334 t.sendCh.Put(&ackAction{ 335 rType: rType, 336 version: version, 337 nonce: nonce, 338 stream: stream, 339 }) 340 t.logger.Infof("Sending ACK for response type: %v, version: %v, nonce: %v", rType, version, nonce) 341 success = true 342 } 343} 344 345func mapToSlice(m map[string]bool) (ret []string) { 346 for i := range m { 347 ret = append(ret, i) 348 } 349 return 350} 351 352type watchAction struct { 353 rType ResourceType 354 remove bool // Whether this is to remove watch for the resource. 355 resource string 356} 357 358// processWatchInfo pulls the fields needed by the request from a watchAction. 359// 360// It also updates the watch map. 361func (t *TransportHelper) processWatchInfo(w *watchAction) (target []string, rType ResourceType, ver, nonce string) { 362 t.mu.Lock() 363 defer t.mu.Unlock() 364 365 var current map[string]bool 366 current, ok := t.watchMap[w.rType] 367 if !ok { 368 current = make(map[string]bool) 369 t.watchMap[w.rType] = current 370 } 371 372 if w.remove { 373 delete(current, w.resource) 374 if len(current) == 0 { 375 delete(t.watchMap, w.rType) 376 } 377 } else { 378 current[w.resource] = true 379 } 380 381 rType = w.rType 382 target = mapToSlice(current) 383 // We don't reset version or nonce when a new watch is started. The version 384 // and nonce from previous response are carried by the request unless the 385 // stream is recreated. 386 ver = t.versionMap[rType] 387 nonce = t.nonceMap[rType] 388 return target, rType, ver, nonce 389} 390 391type ackAction struct { 392 rType ResourceType 393 version string // NACK if version is an empty string. 394 nonce string 395 errMsg string // Empty unless it's a NACK. 396 // ACK/NACK are tagged with the stream it's for. When the stream is down, 397 // all the ACK/NACK for this stream will be dropped, and the version/nonce 398 // won't be updated. 399 stream grpc.ClientStream 400} 401 402// processAckInfo pulls the fields needed by the ack request from a ackAction. 403// 404// If no active watch is found for this ack, it returns false for send. 405func (t *TransportHelper) processAckInfo(ack *ackAction, stream grpc.ClientStream) (target []string, rType ResourceType, version, nonce string, send bool) { 406 if ack.stream != stream { 407 // If ACK's stream isn't the current sending stream, this means the ACK 408 // was pushed to queue before the old stream broke, and a new stream has 409 // been started since. Return immediately here so we don't update the 410 // nonce for the new stream. 411 return nil, UnknownResource, "", "", false 412 } 413 rType = ack.rType 414 415 t.mu.Lock() 416 defer t.mu.Unlock() 417 418 // Update the nonce no matter if we are going to send the ACK request on 419 // wire. We may not send the request if the watch is canceled. But the nonce 420 // needs to be updated so the next request will have the right nonce. 421 nonce = ack.nonce 422 t.nonceMap[rType] = nonce 423 424 s, ok := t.watchMap[rType] 425 if !ok || len(s) == 0 { 426 // We don't send the request ack if there's no active watch (this can be 427 // either the server sends responses before any request, or the watch is 428 // canceled while the ackAction is in queue), because there's no resource 429 // name. And if we send a request with empty resource name list, the 430 // server may treat it as a wild card and send us everything. 431 return nil, UnknownResource, "", "", false 432 } 433 send = true 434 target = mapToSlice(s) 435 436 version = ack.version 437 if version == "" { 438 // This is a nack, get the previous acked version. 439 version = t.versionMap[rType] 440 // version will still be an empty string if rType isn't 441 // found in versionMap, this can happen if there wasn't any ack 442 // before. 443 } else { 444 t.versionMap[rType] = version 445 } 446 return target, rType, version, nonce, send 447} 448 449// reportLoad starts an LRS stream to report load data to the management server. 450// It blocks until the context is cancelled. 451func (t *TransportHelper) reportLoad(ctx context.Context, cc *grpc.ClientConn, opts loadReportingOptions) { 452 retries := 0 453 for { 454 if ctx.Err() != nil { 455 return 456 } 457 458 if retries != 0 { 459 timer := time.NewTimer(t.backoff(retries)) 460 select { 461 case <-timer.C: 462 case <-ctx.Done(): 463 if !timer.Stop() { 464 <-timer.C 465 } 466 return 467 } 468 } 469 470 retries++ 471 stream, err := t.vClient.NewLoadStatsStream(ctx, cc) 472 if err != nil { 473 logger.Warningf("lrs: failed to create stream: %v", err) 474 continue 475 } 476 logger.Infof("lrs: created LRS stream") 477 478 if err := t.vClient.SendFirstLoadStatsRequest(stream); err != nil { 479 logger.Warningf("lrs: failed to send first request: %v", err) 480 continue 481 } 482 483 clusters, interval, err := t.vClient.HandleLoadStatsResponse(stream) 484 if err != nil { 485 logger.Warning(err) 486 continue 487 } 488 489 retries = 0 490 t.sendLoads(ctx, stream, opts.loadStore, clusters, interval) 491 } 492} 493 494func (t *TransportHelper) sendLoads(ctx context.Context, stream grpc.ClientStream, store *load.Store, clusterNames []string, interval time.Duration) { 495 tick := time.NewTicker(interval) 496 defer tick.Stop() 497 for { 498 select { 499 case <-tick.C: 500 case <-ctx.Done(): 501 return 502 } 503 if err := t.vClient.SendLoadStatsRequest(stream, store.Stats(clusterNames)); err != nil { 504 logger.Warning(err) 505 return 506 } 507 } 508} 509