1/* 2 * 3 * Copyright 2016 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19//go:generate ./regenerate.sh 20 21// Package grpclb defines a grpclb balancer. 22// 23// To install grpclb balancer, import this package as: 24// import _ "google.golang.org/grpc/balancer/grpclb" 25package grpclb 26 27import ( 28 "context" 29 "errors" 30 "strconv" 31 "sync" 32 "time" 33 34 durationpb "github.com/golang/protobuf/ptypes/duration" 35 "google.golang.org/grpc" 36 "google.golang.org/grpc/balancer" 37 lbpb "google.golang.org/grpc/balancer/grpclb/grpc_lb_v1" 38 "google.golang.org/grpc/connectivity" 39 "google.golang.org/grpc/credentials" 40 "google.golang.org/grpc/grpclog" 41 "google.golang.org/grpc/internal" 42 "google.golang.org/grpc/internal/backoff" 43 "google.golang.org/grpc/resolver" 44) 45 46const ( 47 lbTokeyKey = "lb-token" 48 defaultFallbackTimeout = 10 * time.Second 49 grpclbName = "grpclb" 50) 51 52var ( 53 // defaultBackoffConfig configures the backoff strategy that's used when the 54 // init handshake in the RPC is unsuccessful. It's not for the clientconn 55 // reconnect backoff. 56 // 57 // It has the same value as the default grpc.DefaultBackoffConfig. 58 // 59 // TODO: make backoff configurable. 60 defaultBackoffConfig = backoff.Exponential{ 61 MaxDelay: 120 * time.Second, 62 } 63 errServerTerminatedConnection = errors.New("grpclb: failed to recv server list: server terminated connection") 64) 65 66func convertDuration(d *durationpb.Duration) time.Duration { 67 if d == nil { 68 return 0 69 } 70 return time.Duration(d.Seconds)*time.Second + time.Duration(d.Nanos)*time.Nanosecond 71} 72 73// Client API for LoadBalancer service. 74// Mostly copied from generated pb.go file. 75// To avoid circular dependency. 76type loadBalancerClient struct { 77 cc *grpc.ClientConn 78} 79 80func (c *loadBalancerClient) BalanceLoad(ctx context.Context, opts ...grpc.CallOption) (*balanceLoadClientStream, error) { 81 desc := &grpc.StreamDesc{ 82 StreamName: "BalanceLoad", 83 ServerStreams: true, 84 ClientStreams: true, 85 } 86 stream, err := c.cc.NewStream(ctx, desc, "/grpc.lb.v1.LoadBalancer/BalanceLoad", opts...) 87 if err != nil { 88 return nil, err 89 } 90 x := &balanceLoadClientStream{stream} 91 return x, nil 92} 93 94type balanceLoadClientStream struct { 95 grpc.ClientStream 96} 97 98func (x *balanceLoadClientStream) Send(m *lbpb.LoadBalanceRequest) error { 99 return x.ClientStream.SendMsg(m) 100} 101 102func (x *balanceLoadClientStream) Recv() (*lbpb.LoadBalanceResponse, error) { 103 m := new(lbpb.LoadBalanceResponse) 104 if err := x.ClientStream.RecvMsg(m); err != nil { 105 return nil, err 106 } 107 return m, nil 108} 109 110func init() { 111 balancer.Register(newLBBuilder()) 112} 113 114// newLBBuilder creates a builder for grpclb. 115func newLBBuilder() balancer.Builder { 116 return newLBBuilderWithFallbackTimeout(defaultFallbackTimeout) 117} 118 119// newLBBuilderWithFallbackTimeout creates a grpclb builder with the given 120// fallbackTimeout. If no response is received from the remote balancer within 121// fallbackTimeout, the backend addresses from the resolved address list will be 122// used. 123// 124// Only call this function when a non-default fallback timeout is needed. 125func newLBBuilderWithFallbackTimeout(fallbackTimeout time.Duration) balancer.Builder { 126 return &lbBuilder{ 127 fallbackTimeout: fallbackTimeout, 128 } 129} 130 131type lbBuilder struct { 132 fallbackTimeout time.Duration 133} 134 135func (b *lbBuilder) Name() string { 136 return grpclbName 137} 138 139func (b *lbBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer { 140 // This generates a manual resolver builder with a random scheme. This 141 // scheme will be used to dial to remote LB, so we can send filtered address 142 // updates to remote LB ClientConn using this manual resolver. 143 scheme := "grpclb_internal_" + strconv.FormatInt(time.Now().UnixNano(), 36) 144 r := &lbManualResolver{scheme: scheme, ccb: cc} 145 146 lb := &lbBalancer{ 147 cc: newLBCacheClientConn(cc), 148 target: opt.Target.Endpoint, 149 opt: opt, 150 fallbackTimeout: b.fallbackTimeout, 151 doneCh: make(chan struct{}), 152 153 manualResolver: r, 154 subConns: make(map[resolver.Address]balancer.SubConn), 155 scStates: make(map[balancer.SubConn]connectivity.State), 156 picker: &errPicker{err: balancer.ErrNoSubConnAvailable}, 157 clientStats: newRPCStats(), 158 backoff: defaultBackoffConfig, // TODO: make backoff configurable. 159 } 160 161 var err error 162 if opt.CredsBundle != nil { 163 lb.grpclbClientConnCreds, err = opt.CredsBundle.NewWithMode(internal.CredsBundleModeBalancer) 164 if err != nil { 165 grpclog.Warningf("lbBalancer: client connection creds NewWithMode failed: %v", err) 166 } 167 lb.grpclbBackendCreds, err = opt.CredsBundle.NewWithMode(internal.CredsBundleModeBackendFromBalancer) 168 if err != nil { 169 grpclog.Warningf("lbBalancer: backend creds NewWithMode failed: %v", err) 170 } 171 } 172 173 return lb 174} 175 176type lbBalancer struct { 177 cc *lbCacheClientConn 178 target string 179 opt balancer.BuildOptions 180 181 usePickFirst bool 182 183 // grpclbClientConnCreds is the creds bundle to be used to connect to grpclb 184 // servers. If it's nil, use the TransportCredentials from BuildOptions 185 // instead. 186 grpclbClientConnCreds credentials.Bundle 187 // grpclbBackendCreds is the creds bundle to be used for addresses that are 188 // returned by grpclb server. If it's nil, don't set anything when creating 189 // SubConns. 190 grpclbBackendCreds credentials.Bundle 191 192 fallbackTimeout time.Duration 193 doneCh chan struct{} 194 195 // manualResolver is used in the remote LB ClientConn inside grpclb. When 196 // resolved address updates are received by grpclb, filtered updates will be 197 // send to remote LB ClientConn through this resolver. 198 manualResolver *lbManualResolver 199 // The ClientConn to talk to the remote balancer. 200 ccRemoteLB *grpc.ClientConn 201 // backoff for calling remote balancer. 202 backoff backoff.Strategy 203 204 // Support client side load reporting. Each picker gets a reference to this, 205 // and will update its content. 206 clientStats *rpcStats 207 208 mu sync.Mutex // guards everything following. 209 // The full server list including drops, used to check if the newly received 210 // serverList contains anything new. Each generate picker will also have 211 // reference to this list to do the first layer pick. 212 fullServerList []*lbpb.Server 213 // Backend addresses. It's kept so the addresses are available when 214 // switching between round_robin and pickfirst. 215 backendAddrs []resolver.Address 216 // All backends addresses, with metadata set to nil. This list contains all 217 // backend addresses in the same order and with the same duplicates as in 218 // serverlist. When generating picker, a SubConn slice with the same order 219 // but with only READY SCs will be gerenated. 220 backendAddrsWithoutMetadata []resolver.Address 221 // Roundrobin functionalities. 222 state connectivity.State 223 subConns map[resolver.Address]balancer.SubConn // Used to new/remove SubConn. 224 scStates map[balancer.SubConn]connectivity.State // Used to filter READY SubConns. 225 picker balancer.Picker 226 // Support fallback to resolved backend addresses if there's no response 227 // from remote balancer within fallbackTimeout. 228 remoteBalancerConnected bool 229 serverListReceived bool 230 inFallback bool 231 // resolvedBackendAddrs is resolvedAddrs minus remote balancers. It's set 232 // when resolved address updates are received, and read in the goroutine 233 // handling fallback. 234 resolvedBackendAddrs []resolver.Address 235} 236 237// regeneratePicker takes a snapshot of the balancer, and generates a picker from 238// it. The picker 239// - always returns ErrTransientFailure if the balancer is in TransientFailure, 240// - does two layer roundrobin pick otherwise. 241// Caller must hold lb.mu. 242func (lb *lbBalancer) regeneratePicker(resetDrop bool) { 243 if lb.state == connectivity.TransientFailure { 244 lb.picker = &errPicker{err: balancer.ErrTransientFailure} 245 return 246 } 247 248 if lb.state == connectivity.Connecting { 249 lb.picker = &errPicker{err: balancer.ErrNoSubConnAvailable} 250 return 251 } 252 253 var readySCs []balancer.SubConn 254 if lb.usePickFirst { 255 for _, sc := range lb.subConns { 256 readySCs = append(readySCs, sc) 257 break 258 } 259 } else { 260 for _, a := range lb.backendAddrsWithoutMetadata { 261 if sc, ok := lb.subConns[a]; ok { 262 if st, ok := lb.scStates[sc]; ok && st == connectivity.Ready { 263 readySCs = append(readySCs, sc) 264 } 265 } 266 } 267 } 268 269 if len(readySCs) <= 0 { 270 // If there's no ready SubConns, always re-pick. This is to avoid drops 271 // unless at least one SubConn is ready. Otherwise we may drop more 272 // often than want because of drops + re-picks(which become re-drops). 273 // 274 // This doesn't seem to be necessary after the connecting check above. 275 // Kept for safety. 276 lb.picker = &errPicker{err: balancer.ErrNoSubConnAvailable} 277 return 278 } 279 if lb.inFallback { 280 lb.picker = newRRPicker(readySCs) 281 return 282 } 283 if resetDrop { 284 lb.picker = newLBPicker(lb.fullServerList, readySCs, lb.clientStats) 285 return 286 } 287 prevLBPicker, ok := lb.picker.(*lbPicker) 288 if !ok { 289 lb.picker = newLBPicker(lb.fullServerList, readySCs, lb.clientStats) 290 return 291 } 292 prevLBPicker.updateReadySCs(readySCs) 293} 294 295// aggregateSubConnStats calculate the aggregated state of SubConns in 296// lb.SubConns. These SubConns are subconns in use (when switching between 297// fallback and grpclb). lb.scState contains states for all SubConns, including 298// those in cache (SubConns are cached for 10 seconds after remove). 299// 300// The aggregated state is: 301// - If at least one SubConn in Ready, the aggregated state is Ready; 302// - Else if at least one SubConn in Connecting, the aggregated state is Connecting; 303// - Else the aggregated state is TransientFailure. 304func (lb *lbBalancer) aggregateSubConnStates() connectivity.State { 305 var numConnecting uint64 306 307 for _, sc := range lb.subConns { 308 if state, ok := lb.scStates[sc]; ok { 309 switch state { 310 case connectivity.Ready: 311 return connectivity.Ready 312 case connectivity.Connecting: 313 numConnecting++ 314 } 315 } 316 } 317 if numConnecting > 0 { 318 return connectivity.Connecting 319 } 320 return connectivity.TransientFailure 321} 322 323func (lb *lbBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) { 324 panic("not used") 325} 326 327func (lb *lbBalancer) UpdateSubConnState(sc balancer.SubConn, scs balancer.SubConnState) { 328 s := scs.ConnectivityState 329 if grpclog.V(2) { 330 grpclog.Infof("lbBalancer: handle SubConn state change: %p, %v", sc, s) 331 } 332 lb.mu.Lock() 333 defer lb.mu.Unlock() 334 335 oldS, ok := lb.scStates[sc] 336 if !ok { 337 if grpclog.V(2) { 338 grpclog.Infof("lbBalancer: got state changes for an unknown SubConn: %p, %v", sc, s) 339 } 340 return 341 } 342 lb.scStates[sc] = s 343 switch s { 344 case connectivity.Idle: 345 sc.Connect() 346 case connectivity.Shutdown: 347 // When an address was removed by resolver, b called RemoveSubConn but 348 // kept the sc's state in scStates. Remove state for this sc here. 349 delete(lb.scStates, sc) 350 } 351 // Force regenerate picker if 352 // - this sc became ready from not-ready 353 // - this sc became not-ready from ready 354 lb.updateStateAndPicker((oldS == connectivity.Ready) != (s == connectivity.Ready), false) 355 356 // Enter fallback when the aggregated state is not Ready and the connection 357 // to remote balancer is lost. 358 if lb.state != connectivity.Ready { 359 if !lb.inFallback && !lb.remoteBalancerConnected { 360 // Enter fallback. 361 lb.refreshSubConns(lb.resolvedBackendAddrs, true, lb.usePickFirst) 362 } 363 } 364} 365 366// updateStateAndPicker re-calculate the aggregated state, and regenerate picker 367// if overall state is changed. 368// 369// If forceRegeneratePicker is true, picker will be regenerated. 370func (lb *lbBalancer) updateStateAndPicker(forceRegeneratePicker bool, resetDrop bool) { 371 oldAggrState := lb.state 372 lb.state = lb.aggregateSubConnStates() 373 // Regenerate picker when one of the following happens: 374 // - caller wants to regenerate 375 // - the aggregated state changed 376 if forceRegeneratePicker || (lb.state != oldAggrState) { 377 lb.regeneratePicker(resetDrop) 378 } 379 380 lb.cc.UpdateBalancerState(lb.state, lb.picker) 381} 382 383// fallbackToBackendsAfter blocks for fallbackTimeout and falls back to use 384// resolved backends (backends received from resolver, not from remote balancer) 385// if no connection to remote balancers was successful. 386func (lb *lbBalancer) fallbackToBackendsAfter(fallbackTimeout time.Duration) { 387 timer := time.NewTimer(fallbackTimeout) 388 defer timer.Stop() 389 select { 390 case <-timer.C: 391 case <-lb.doneCh: 392 return 393 } 394 lb.mu.Lock() 395 if lb.inFallback || lb.serverListReceived { 396 lb.mu.Unlock() 397 return 398 } 399 // Enter fallback. 400 lb.refreshSubConns(lb.resolvedBackendAddrs, true, lb.usePickFirst) 401 lb.mu.Unlock() 402} 403 404// HandleResolvedAddrs sends the updated remoteLB addresses to remoteLB 405// clientConn. The remoteLB clientConn will handle creating/removing remoteLB 406// connections. 407func (lb *lbBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) { 408 panic("not used") 409} 410 411func (lb *lbBalancer) handleServiceConfig(gc *grpclbServiceConfig) { 412 lb.mu.Lock() 413 defer lb.mu.Unlock() 414 415 newUsePickFirst := childIsPickFirst(gc) 416 if lb.usePickFirst == newUsePickFirst { 417 return 418 } 419 if grpclog.V(2) { 420 grpclog.Infof("lbBalancer: switching mode, new usePickFirst: %+v", newUsePickFirst) 421 } 422 lb.refreshSubConns(lb.backendAddrs, lb.inFallback, newUsePickFirst) 423} 424 425func (lb *lbBalancer) UpdateClientConnState(ccs balancer.ClientConnState) { 426 if grpclog.V(2) { 427 grpclog.Infof("lbBalancer: UpdateClientConnState: %+v", ccs) 428 } 429 gc, _ := ccs.BalancerConfig.(*grpclbServiceConfig) 430 lb.handleServiceConfig(gc) 431 432 addrs := ccs.ResolverState.Addresses 433 if len(addrs) <= 0 { 434 return 435 } 436 437 var remoteBalancerAddrs, backendAddrs []resolver.Address 438 for _, a := range addrs { 439 if a.Type == resolver.GRPCLB { 440 a.Type = resolver.Backend 441 remoteBalancerAddrs = append(remoteBalancerAddrs, a) 442 } else { 443 backendAddrs = append(backendAddrs, a) 444 } 445 } 446 447 if lb.ccRemoteLB == nil { 448 if len(remoteBalancerAddrs) <= 0 { 449 grpclog.Errorf("grpclb: no remote balancer address is available, should never happen") 450 return 451 } 452 // First time receiving resolved addresses, create a cc to remote 453 // balancers. 454 lb.dialRemoteLB(remoteBalancerAddrs[0].ServerName) 455 // Start the fallback goroutine. 456 go lb.fallbackToBackendsAfter(lb.fallbackTimeout) 457 } 458 459 // cc to remote balancers uses lb.manualResolver. Send the updated remote 460 // balancer addresses to it through manualResolver. 461 lb.manualResolver.UpdateState(resolver.State{Addresses: remoteBalancerAddrs}) 462 463 lb.mu.Lock() 464 lb.resolvedBackendAddrs = backendAddrs 465 if lb.inFallback { 466 // This means we received a new list of resolved backends, and we are 467 // still in fallback mode. Need to update the list of backends we are 468 // using to the new list of backends. 469 lb.refreshSubConns(lb.resolvedBackendAddrs, true, lb.usePickFirst) 470 } 471 lb.mu.Unlock() 472} 473 474func (lb *lbBalancer) Close() { 475 select { 476 case <-lb.doneCh: 477 return 478 default: 479 } 480 close(lb.doneCh) 481 if lb.ccRemoteLB != nil { 482 lb.ccRemoteLB.Close() 483 } 484 lb.cc.close() 485} 486