1/* 2 * 3 * Copyright 2016 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19// Package grpclb defines a grpclb balancer. 20// 21// To install grpclb balancer, import this package as: 22// import _ "google.golang.org/grpc/balancer/grpclb" 23package grpclb 24 25import ( 26 "context" 27 "errors" 28 "fmt" 29 "sync" 30 "time" 31 32 "google.golang.org/grpc" 33 "google.golang.org/grpc/balancer" 34 grpclbstate "google.golang.org/grpc/balancer/grpclb/state" 35 "google.golang.org/grpc/connectivity" 36 "google.golang.org/grpc/credentials" 37 "google.golang.org/grpc/grpclog" 38 "google.golang.org/grpc/internal" 39 "google.golang.org/grpc/internal/backoff" 40 "google.golang.org/grpc/internal/resolver/dns" 41 "google.golang.org/grpc/resolver" 42 43 durationpb "github.com/golang/protobuf/ptypes/duration" 44 lbpb "google.golang.org/grpc/balancer/grpclb/grpc_lb_v1" 45) 46 47const ( 48 lbTokenKey = "lb-token" 49 defaultFallbackTimeout = 10 * time.Second 50 grpclbName = "grpclb" 51) 52 53var errServerTerminatedConnection = errors.New("grpclb: failed to recv server list: server terminated connection") 54var logger = grpclog.Component("grpclb") 55 56func convertDuration(d *durationpb.Duration) time.Duration { 57 if d == nil { 58 return 0 59 } 60 return time.Duration(d.Seconds)*time.Second + time.Duration(d.Nanos)*time.Nanosecond 61} 62 63// Client API for LoadBalancer service. 64// Mostly copied from generated pb.go file. 65// To avoid circular dependency. 66type loadBalancerClient struct { 67 cc *grpc.ClientConn 68} 69 70func (c *loadBalancerClient) BalanceLoad(ctx context.Context, opts ...grpc.CallOption) (*balanceLoadClientStream, error) { 71 desc := &grpc.StreamDesc{ 72 StreamName: "BalanceLoad", 73 ServerStreams: true, 74 ClientStreams: true, 75 } 76 stream, err := c.cc.NewStream(ctx, desc, "/grpc.lb.v1.LoadBalancer/BalanceLoad", opts...) 77 if err != nil { 78 return nil, err 79 } 80 x := &balanceLoadClientStream{stream} 81 return x, nil 82} 83 84type balanceLoadClientStream struct { 85 grpc.ClientStream 86} 87 88func (x *balanceLoadClientStream) Send(m *lbpb.LoadBalanceRequest) error { 89 return x.ClientStream.SendMsg(m) 90} 91 92func (x *balanceLoadClientStream) Recv() (*lbpb.LoadBalanceResponse, error) { 93 m := new(lbpb.LoadBalanceResponse) 94 if err := x.ClientStream.RecvMsg(m); err != nil { 95 return nil, err 96 } 97 return m, nil 98} 99 100func init() { 101 balancer.Register(newLBBuilder()) 102 dns.EnableSRVLookups = true 103} 104 105// newLBBuilder creates a builder for grpclb. 106func newLBBuilder() balancer.Builder { 107 return newLBBuilderWithFallbackTimeout(defaultFallbackTimeout) 108} 109 110// newLBBuilderWithFallbackTimeout creates a grpclb builder with the given 111// fallbackTimeout. If no response is received from the remote balancer within 112// fallbackTimeout, the backend addresses from the resolved address list will be 113// used. 114// 115// Only call this function when a non-default fallback timeout is needed. 116func newLBBuilderWithFallbackTimeout(fallbackTimeout time.Duration) balancer.Builder { 117 return &lbBuilder{ 118 fallbackTimeout: fallbackTimeout, 119 } 120} 121 122type lbBuilder struct { 123 fallbackTimeout time.Duration 124} 125 126func (b *lbBuilder) Name() string { 127 return grpclbName 128} 129 130func (b *lbBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer { 131 // This generates a manual resolver builder with a fixed scheme. This 132 // scheme will be used to dial to remote LB, so we can send filtered 133 // address updates to remote LB ClientConn using this manual resolver. 134 r := &lbManualResolver{scheme: "grpclb-internal", ccb: cc} 135 136 lb := &lbBalancer{ 137 cc: newLBCacheClientConn(cc), 138 target: opt.Target.Endpoint, 139 opt: opt, 140 fallbackTimeout: b.fallbackTimeout, 141 doneCh: make(chan struct{}), 142 143 manualResolver: r, 144 subConns: make(map[resolver.Address]balancer.SubConn), 145 scStates: make(map[balancer.SubConn]connectivity.State), 146 picker: &errPicker{err: balancer.ErrNoSubConnAvailable}, 147 clientStats: newRPCStats(), 148 backoff: backoff.DefaultExponential, // TODO: make backoff configurable. 149 } 150 151 var err error 152 if opt.CredsBundle != nil { 153 lb.grpclbClientConnCreds, err = opt.CredsBundle.NewWithMode(internal.CredsBundleModeBalancer) 154 if err != nil { 155 logger.Warningf("lbBalancer: client connection creds NewWithMode failed: %v", err) 156 } 157 lb.grpclbBackendCreds, err = opt.CredsBundle.NewWithMode(internal.CredsBundleModeBackendFromBalancer) 158 if err != nil { 159 logger.Warningf("lbBalancer: backend creds NewWithMode failed: %v", err) 160 } 161 } 162 163 return lb 164} 165 166type lbBalancer struct { 167 cc *lbCacheClientConn 168 target string 169 opt balancer.BuildOptions 170 171 usePickFirst bool 172 173 // grpclbClientConnCreds is the creds bundle to be used to connect to grpclb 174 // servers. If it's nil, use the TransportCredentials from BuildOptions 175 // instead. 176 grpclbClientConnCreds credentials.Bundle 177 // grpclbBackendCreds is the creds bundle to be used for addresses that are 178 // returned by grpclb server. If it's nil, don't set anything when creating 179 // SubConns. 180 grpclbBackendCreds credentials.Bundle 181 182 fallbackTimeout time.Duration 183 doneCh chan struct{} 184 185 // manualResolver is used in the remote LB ClientConn inside grpclb. When 186 // resolved address updates are received by grpclb, filtered updates will be 187 // send to remote LB ClientConn through this resolver. 188 manualResolver *lbManualResolver 189 // The ClientConn to talk to the remote balancer. 190 ccRemoteLB *remoteBalancerCCWrapper 191 // backoff for calling remote balancer. 192 backoff backoff.Strategy 193 194 // Support client side load reporting. Each picker gets a reference to this, 195 // and will update its content. 196 clientStats *rpcStats 197 198 mu sync.Mutex // guards everything following. 199 // The full server list including drops, used to check if the newly received 200 // serverList contains anything new. Each generate picker will also have 201 // reference to this list to do the first layer pick. 202 fullServerList []*lbpb.Server 203 // Backend addresses. It's kept so the addresses are available when 204 // switching between round_robin and pickfirst. 205 backendAddrs []resolver.Address 206 // All backends addresses, with metadata set to nil. This list contains all 207 // backend addresses in the same order and with the same duplicates as in 208 // serverlist. When generating picker, a SubConn slice with the same order 209 // but with only READY SCs will be gerenated. 210 backendAddrsWithoutMetadata []resolver.Address 211 // Roundrobin functionalities. 212 state connectivity.State 213 subConns map[resolver.Address]balancer.SubConn // Used to new/remove SubConn. 214 scStates map[balancer.SubConn]connectivity.State // Used to filter READY SubConns. 215 picker balancer.Picker 216 // Support fallback to resolved backend addresses if there's no response 217 // from remote balancer within fallbackTimeout. 218 remoteBalancerConnected bool 219 serverListReceived bool 220 inFallback bool 221 // resolvedBackendAddrs is resolvedAddrs minus remote balancers. It's set 222 // when resolved address updates are received, and read in the goroutine 223 // handling fallback. 224 resolvedBackendAddrs []resolver.Address 225 connErr error // the last connection error 226} 227 228// regeneratePicker takes a snapshot of the balancer, and generates a picker from 229// it. The picker 230// - always returns ErrTransientFailure if the balancer is in TransientFailure, 231// - does two layer roundrobin pick otherwise. 232// Caller must hold lb.mu. 233func (lb *lbBalancer) regeneratePicker(resetDrop bool) { 234 if lb.state == connectivity.TransientFailure { 235 lb.picker = &errPicker{err: fmt.Errorf("all SubConns are in TransientFailure, last connection error: %v", lb.connErr)} 236 return 237 } 238 239 if lb.state == connectivity.Connecting { 240 lb.picker = &errPicker{err: balancer.ErrNoSubConnAvailable} 241 return 242 } 243 244 var readySCs []balancer.SubConn 245 if lb.usePickFirst { 246 for _, sc := range lb.subConns { 247 readySCs = append(readySCs, sc) 248 break 249 } 250 } else { 251 for _, a := range lb.backendAddrsWithoutMetadata { 252 if sc, ok := lb.subConns[a]; ok { 253 if st, ok := lb.scStates[sc]; ok && st == connectivity.Ready { 254 readySCs = append(readySCs, sc) 255 } 256 } 257 } 258 } 259 260 if len(readySCs) <= 0 { 261 // If there's no ready SubConns, always re-pick. This is to avoid drops 262 // unless at least one SubConn is ready. Otherwise we may drop more 263 // often than want because of drops + re-picks(which become re-drops). 264 // 265 // This doesn't seem to be necessary after the connecting check above. 266 // Kept for safety. 267 lb.picker = &errPicker{err: balancer.ErrNoSubConnAvailable} 268 return 269 } 270 if lb.inFallback { 271 lb.picker = newRRPicker(readySCs) 272 return 273 } 274 if resetDrop { 275 lb.picker = newLBPicker(lb.fullServerList, readySCs, lb.clientStats) 276 return 277 } 278 prevLBPicker, ok := lb.picker.(*lbPicker) 279 if !ok { 280 lb.picker = newLBPicker(lb.fullServerList, readySCs, lb.clientStats) 281 return 282 } 283 prevLBPicker.updateReadySCs(readySCs) 284} 285 286// aggregateSubConnStats calculate the aggregated state of SubConns in 287// lb.SubConns. These SubConns are subconns in use (when switching between 288// fallback and grpclb). lb.scState contains states for all SubConns, including 289// those in cache (SubConns are cached for 10 seconds after remove). 290// 291// The aggregated state is: 292// - If at least one SubConn in Ready, the aggregated state is Ready; 293// - Else if at least one SubConn in Connecting or IDLE, the aggregated state is Connecting; 294// - It's OK to consider IDLE as Connecting. SubConns never stay in IDLE, 295// they start to connect immediately. But there's a race between the overall 296// state is reported, and when the new SubConn state arrives. And SubConns 297// never go back to IDLE. 298// - Else the aggregated state is TransientFailure. 299func (lb *lbBalancer) aggregateSubConnStates() connectivity.State { 300 var numConnecting uint64 301 302 for _, sc := range lb.subConns { 303 if state, ok := lb.scStates[sc]; ok { 304 switch state { 305 case connectivity.Ready: 306 return connectivity.Ready 307 case connectivity.Connecting, connectivity.Idle: 308 numConnecting++ 309 } 310 } 311 } 312 if numConnecting > 0 { 313 return connectivity.Connecting 314 } 315 return connectivity.TransientFailure 316} 317 318func (lb *lbBalancer) UpdateSubConnState(sc balancer.SubConn, scs balancer.SubConnState) { 319 s := scs.ConnectivityState 320 if logger.V(2) { 321 logger.Infof("lbBalancer: handle SubConn state change: %p, %v", sc, s) 322 } 323 lb.mu.Lock() 324 defer lb.mu.Unlock() 325 326 oldS, ok := lb.scStates[sc] 327 if !ok { 328 if logger.V(2) { 329 logger.Infof("lbBalancer: got state changes for an unknown SubConn: %p, %v", sc, s) 330 } 331 return 332 } 333 lb.scStates[sc] = s 334 switch s { 335 case connectivity.Idle: 336 sc.Connect() 337 case connectivity.Shutdown: 338 // When an address was removed by resolver, b called RemoveSubConn but 339 // kept the sc's state in scStates. Remove state for this sc here. 340 delete(lb.scStates, sc) 341 case connectivity.TransientFailure: 342 lb.connErr = scs.ConnectionError 343 } 344 // Force regenerate picker if 345 // - this sc became ready from not-ready 346 // - this sc became not-ready from ready 347 lb.updateStateAndPicker((oldS == connectivity.Ready) != (s == connectivity.Ready), false) 348 349 // Enter fallback when the aggregated state is not Ready and the connection 350 // to remote balancer is lost. 351 if lb.state != connectivity.Ready { 352 if !lb.inFallback && !lb.remoteBalancerConnected { 353 // Enter fallback. 354 lb.refreshSubConns(lb.resolvedBackendAddrs, true, lb.usePickFirst) 355 } 356 } 357} 358 359// updateStateAndPicker re-calculate the aggregated state, and regenerate picker 360// if overall state is changed. 361// 362// If forceRegeneratePicker is true, picker will be regenerated. 363func (lb *lbBalancer) updateStateAndPicker(forceRegeneratePicker bool, resetDrop bool) { 364 oldAggrState := lb.state 365 lb.state = lb.aggregateSubConnStates() 366 // Regenerate picker when one of the following happens: 367 // - caller wants to regenerate 368 // - the aggregated state changed 369 if forceRegeneratePicker || (lb.state != oldAggrState) { 370 lb.regeneratePicker(resetDrop) 371 } 372 373 lb.cc.UpdateState(balancer.State{ConnectivityState: lb.state, Picker: lb.picker}) 374} 375 376// fallbackToBackendsAfter blocks for fallbackTimeout and falls back to use 377// resolved backends (backends received from resolver, not from remote balancer) 378// if no connection to remote balancers was successful. 379func (lb *lbBalancer) fallbackToBackendsAfter(fallbackTimeout time.Duration) { 380 timer := time.NewTimer(fallbackTimeout) 381 defer timer.Stop() 382 select { 383 case <-timer.C: 384 case <-lb.doneCh: 385 return 386 } 387 lb.mu.Lock() 388 if lb.inFallback || lb.serverListReceived { 389 lb.mu.Unlock() 390 return 391 } 392 // Enter fallback. 393 lb.refreshSubConns(lb.resolvedBackendAddrs, true, lb.usePickFirst) 394 lb.mu.Unlock() 395} 396 397func (lb *lbBalancer) handleServiceConfig(gc *grpclbServiceConfig) { 398 lb.mu.Lock() 399 defer lb.mu.Unlock() 400 401 newUsePickFirst := childIsPickFirst(gc) 402 if lb.usePickFirst == newUsePickFirst { 403 return 404 } 405 if logger.V(2) { 406 logger.Infof("lbBalancer: switching mode, new usePickFirst: %+v", newUsePickFirst) 407 } 408 lb.refreshSubConns(lb.backendAddrs, lb.inFallback, newUsePickFirst) 409} 410 411func (lb *lbBalancer) ResolverError(error) { 412 // Ignore resolver errors. GRPCLB is not selected unless the resolver 413 // works at least once. 414} 415 416func (lb *lbBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error { 417 if logger.V(2) { 418 logger.Infof("lbBalancer: UpdateClientConnState: %+v", ccs) 419 } 420 gc, _ := ccs.BalancerConfig.(*grpclbServiceConfig) 421 lb.handleServiceConfig(gc) 422 423 addrs := ccs.ResolverState.Addresses 424 425 var remoteBalancerAddrs, backendAddrs []resolver.Address 426 for _, a := range addrs { 427 if a.Type == resolver.GRPCLB { 428 a.Type = resolver.Backend 429 remoteBalancerAddrs = append(remoteBalancerAddrs, a) 430 } else { 431 backendAddrs = append(backendAddrs, a) 432 } 433 } 434 if sd := grpclbstate.Get(ccs.ResolverState); sd != nil { 435 // Override any balancer addresses provided via 436 // ccs.ResolverState.Addresses. 437 remoteBalancerAddrs = sd.BalancerAddresses 438 } 439 440 if len(backendAddrs)+len(remoteBalancerAddrs) == 0 { 441 // There should be at least one address, either grpclb server or 442 // fallback. Empty address is not valid. 443 return balancer.ErrBadResolverState 444 } 445 446 if len(remoteBalancerAddrs) == 0 { 447 if lb.ccRemoteLB != nil { 448 lb.ccRemoteLB.close() 449 lb.ccRemoteLB = nil 450 } 451 } else if lb.ccRemoteLB == nil { 452 // First time receiving resolved addresses, create a cc to remote 453 // balancers. 454 lb.newRemoteBalancerCCWrapper() 455 // Start the fallback goroutine. 456 go lb.fallbackToBackendsAfter(lb.fallbackTimeout) 457 } 458 459 if lb.ccRemoteLB != nil { 460 // cc to remote balancers uses lb.manualResolver. Send the updated remote 461 // balancer addresses to it through manualResolver. 462 lb.manualResolver.UpdateState(resolver.State{Addresses: remoteBalancerAddrs}) 463 } 464 465 lb.mu.Lock() 466 lb.resolvedBackendAddrs = backendAddrs 467 if len(remoteBalancerAddrs) == 0 || lb.inFallback { 468 // If there's no remote balancer address in ClientConn update, grpclb 469 // enters fallback mode immediately. 470 // 471 // If a new update is received while grpclb is in fallback, update the 472 // list of backends being used to the new fallback backends. 473 lb.refreshSubConns(lb.resolvedBackendAddrs, true, lb.usePickFirst) 474 } 475 lb.mu.Unlock() 476 return nil 477} 478 479func (lb *lbBalancer) Close() { 480 select { 481 case <-lb.doneCh: 482 return 483 default: 484 } 485 close(lb.doneCh) 486 if lb.ccRemoteLB != nil { 487 lb.ccRemoteLB.close() 488 } 489 lb.cc.close() 490} 491