1/* 2 * Copyright 2019 gRPC authors. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17// Package balancergroup implements a utility struct to bind multiple balancers 18// into one balancer. 19package balancergroup 20 21import ( 22 "fmt" 23 "sync" 24 "time" 25 26 orcapb "github.com/cncf/udpa/go/udpa/data/orca/v1" 27 "google.golang.org/grpc/xds/internal/client/load" 28 29 "google.golang.org/grpc/balancer" 30 "google.golang.org/grpc/connectivity" 31 "google.golang.org/grpc/internal/cache" 32 "google.golang.org/grpc/internal/grpclog" 33 "google.golang.org/grpc/resolver" 34) 35 36// subBalancerWrapper is used to keep the configurations that will be used to start 37// the underlying balancer. It can be called to start/stop the underlying 38// balancer. 39// 40// When the config changes, it will pass the update to the underlying balancer 41// if it exists. 42// 43// TODO: move to a separate file? 44type subBalancerWrapper struct { 45 // subBalancerWrapper is passed to the sub-balancer as a ClientConn 46 // wrapper, only to keep the state and picker. When sub-balancer is 47 // restarted while in cache, the picker needs to be resent. 48 // 49 // It also contains the sub-balancer ID, so the parent balancer group can 50 // keep track of SubConn/pickers and the sub-balancers they belong to. Some 51 // of the actions are forwarded to the parent ClientConn with no change. 52 // Some are forward to balancer group with the sub-balancer ID. 53 balancer.ClientConn 54 id string 55 group *BalancerGroup 56 57 mu sync.Mutex 58 state balancer.State 59 60 // The static part of sub-balancer. Keeps balancerBuilders and addresses. 61 // To be used when restarting sub-balancer. 62 builder balancer.Builder 63 // Options to be passed to sub-balancer at the time of creation. 64 buildOpts balancer.BuildOptions 65 // ccState is a cache of the addresses/balancer config, so when the balancer 66 // is restarted after close, it will get the previous update. It's a pointer 67 // and is set to nil at init, so when the balancer is built for the first 68 // time (not a restart), it won't receive an empty update. Note that this 69 // isn't reset to nil when the underlying balancer is closed. 70 ccState *balancer.ClientConnState 71 // The dynamic part of sub-balancer. Only used when balancer group is 72 // started. Gets cleared when sub-balancer is closed. 73 balancer balancer.Balancer 74} 75 76// UpdateState overrides balancer.ClientConn, to keep state and picker. 77func (sbc *subBalancerWrapper) UpdateState(state balancer.State) { 78 sbc.mu.Lock() 79 sbc.state = state 80 sbc.group.updateBalancerState(sbc.id, state) 81 sbc.mu.Unlock() 82} 83 84// NewSubConn overrides balancer.ClientConn, so balancer group can keep track of 85// the relation between subconns and sub-balancers. 86func (sbc *subBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) { 87 return sbc.group.newSubConn(sbc, addrs, opts) 88} 89 90func (sbc *subBalancerWrapper) updateBalancerStateWithCachedPicker() { 91 sbc.mu.Lock() 92 if sbc.state.Picker != nil { 93 sbc.group.updateBalancerState(sbc.id, sbc.state) 94 } 95 sbc.mu.Unlock() 96} 97 98func (sbc *subBalancerWrapper) startBalancer() { 99 b := sbc.builder.Build(sbc, sbc.buildOpts) 100 sbc.group.logger.Infof("Created child policy %p of type %v", b, sbc.builder.Name()) 101 sbc.balancer = b 102 if sbc.ccState != nil { 103 b.UpdateClientConnState(*sbc.ccState) 104 } 105} 106 107func (sbc *subBalancerWrapper) updateSubConnState(sc balancer.SubConn, state balancer.SubConnState) { 108 b := sbc.balancer 109 if b == nil { 110 // This sub-balancer was closed. This can happen when EDS removes a 111 // locality. The balancer for this locality was already closed, and the 112 // SubConns are being deleted. But SubConn state change can still 113 // happen. 114 return 115 } 116 b.UpdateSubConnState(sc, state) 117} 118 119func (sbc *subBalancerWrapper) updateClientConnState(s balancer.ClientConnState) error { 120 sbc.ccState = &s 121 b := sbc.balancer 122 if b == nil { 123 // This sub-balancer was closed. This should never happen because 124 // sub-balancers are closed when the locality is removed from EDS, or 125 // the balancer group is closed. There should be no further address 126 // updates when either of this happened. 127 // 128 // This will be a common case with priority support, because a 129 // sub-balancer (and the whole balancer group) could be closed because 130 // it's the lower priority, but it can still get address updates. 131 return nil 132 } 133 return b.UpdateClientConnState(s) 134} 135 136func (sbc *subBalancerWrapper) resolverError(err error) { 137 b := sbc.balancer 138 if b == nil { 139 // This sub-balancer was closed. This should never happen because 140 // sub-balancers are closed when the locality is removed from EDS, or 141 // the balancer group is closed. There should be no further address 142 // updates when either of this happened. 143 // 144 // This will be a common case with priority support, because a 145 // sub-balancer (and the whole balancer group) could be closed because 146 // it's the lower priority, but it can still get address updates. 147 return 148 } 149 b.ResolverError(err) 150} 151 152func (sbc *subBalancerWrapper) stopBalancer() { 153 sbc.balancer.Close() 154 sbc.balancer = nil 155} 156 157// BalancerGroup takes a list of balancers, and make them into one balancer. 158// 159// Note that this struct doesn't implement balancer.Balancer, because it's not 160// intended to be used directly as a balancer. It's expected to be used as a 161// sub-balancer manager by a high level balancer. 162// 163// Updates from ClientConn are forwarded to sub-balancers 164// - service config update 165// - Not implemented 166// - address update 167// - subConn state change 168// - find the corresponding balancer and forward 169// 170// Actions from sub-balances are forwarded to parent ClientConn 171// - new/remove SubConn 172// - picker update and health states change 173// - sub-pickers are sent to an aggregator provided by the parent, which 174// will group them into a group-picker. The aggregated connectivity state is 175// also handled by the aggregator. 176// - resolveNow 177// 178// Sub-balancers are only built when the balancer group is started. If the 179// balancer group is closed, the sub-balancers are also closed. And it's 180// guaranteed that no updates will be sent to parent ClientConn from a closed 181// balancer group. 182type BalancerGroup struct { 183 cc balancer.ClientConn 184 buildOpts balancer.BuildOptions 185 logger *grpclog.PrefixLogger 186 loadStore load.PerClusterReporter 187 188 // stateAggregator is where the state/picker updates will be sent to. It's 189 // provided by the parent balancer, to build a picker with all the 190 // sub-pickers. 191 stateAggregator BalancerStateAggregator 192 193 // outgoingMu guards all operations in the direction: 194 // ClientConn-->Sub-balancer. Including start, stop, resolver updates and 195 // SubConn state changes. 196 // 197 // The corresponding boolean outgoingStarted is used to stop further updates 198 // to sub-balancers after they are closed. 199 outgoingMu sync.Mutex 200 outgoingStarted bool 201 idToBalancerConfig map[string]*subBalancerWrapper 202 // Cache for sub-balancers when they are removed. 203 balancerCache *cache.TimeoutCache 204 205 // incomingMu is to make sure this balancer group doesn't send updates to cc 206 // after it's closed. 207 // 208 // We don't share the mutex to avoid deadlocks (e.g. a call to sub-balancer 209 // may call back to balancer group inline. It causes deaclock if they 210 // require the same mutex). 211 // 212 // We should never need to hold multiple locks at the same time in this 213 // struct. The case where two locks are held can only happen when the 214 // underlying balancer calls back into balancer group inline. So there's an 215 // implicit lock acquisition order that outgoingMu is locked before 216 // incomingMu. 217 218 // incomingMu guards all operations in the direction: 219 // Sub-balancer-->ClientConn. Including NewSubConn, RemoveSubConn. It also 220 // guards the map from SubConn to balancer ID, so updateSubConnState needs 221 // to hold it shortly to find the sub-balancer to forward the update. 222 // 223 // UpdateState is called by the balancer state aggretator, and it will 224 // decide when and whether to call. 225 // 226 // The corresponding boolean incomingStarted is used to stop further updates 227 // from sub-balancers after they are closed. 228 incomingMu sync.Mutex 229 incomingStarted bool // This boolean only guards calls back to ClientConn. 230 scToSubBalancer map[balancer.SubConn]*subBalancerWrapper 231} 232 233// DefaultSubBalancerCloseTimeout is defined as a variable instead of const for 234// testing. 235// 236// TODO: make it a parameter for New(). 237var DefaultSubBalancerCloseTimeout = 15 * time.Minute 238 239// New creates a new BalancerGroup. Note that the BalancerGroup 240// needs to be started to work. 241// 242// TODO(easwars): Pass an options struct instead of N args. 243func New(cc balancer.ClientConn, bOpts balancer.BuildOptions, stateAggregator BalancerStateAggregator, loadStore load.PerClusterReporter, logger *grpclog.PrefixLogger) *BalancerGroup { 244 return &BalancerGroup{ 245 cc: cc, 246 buildOpts: bOpts, 247 logger: logger, 248 loadStore: loadStore, 249 250 stateAggregator: stateAggregator, 251 252 idToBalancerConfig: make(map[string]*subBalancerWrapper), 253 balancerCache: cache.NewTimeoutCache(DefaultSubBalancerCloseTimeout), 254 scToSubBalancer: make(map[balancer.SubConn]*subBalancerWrapper), 255 } 256} 257 258// Start starts the balancer group, including building all the sub-balancers, 259// and send the existing addresses to them. 260// 261// A BalancerGroup can be closed and started later. When a BalancerGroup is 262// closed, it can still receive address updates, which will be applied when 263// restarted. 264func (bg *BalancerGroup) Start() { 265 bg.incomingMu.Lock() 266 bg.incomingStarted = true 267 bg.incomingMu.Unlock() 268 269 bg.outgoingMu.Lock() 270 if bg.outgoingStarted { 271 bg.outgoingMu.Unlock() 272 return 273 } 274 275 for _, config := range bg.idToBalancerConfig { 276 config.startBalancer() 277 } 278 bg.outgoingStarted = true 279 bg.outgoingMu.Unlock() 280} 281 282// Add adds a balancer built by builder to the group, with given id. 283func (bg *BalancerGroup) Add(id string, builder balancer.Builder) { 284 // Store data in static map, and then check to see if bg is started. 285 bg.outgoingMu.Lock() 286 var sbc *subBalancerWrapper 287 // If outgoingStarted is true, search in the cache. Otherwise, cache is 288 // guaranteed to be empty, searching is unnecessary. 289 if bg.outgoingStarted { 290 if old, ok := bg.balancerCache.Remove(id); ok { 291 sbc, _ = old.(*subBalancerWrapper) 292 if sbc != nil && sbc.builder != builder { 293 // If the sub-balancer in cache was built with a different 294 // balancer builder, don't use it, cleanup this old-balancer, 295 // and behave as sub-balancer is not found in cache. 296 // 297 // NOTE that this will also drop the cached addresses for this 298 // sub-balancer, which seems to be reasonable. 299 sbc.stopBalancer() 300 // cleanupSubConns must be done before the new balancer starts, 301 // otherwise new SubConns created by the new balancer might be 302 // removed by mistake. 303 bg.cleanupSubConns(sbc) 304 sbc = nil 305 } 306 } 307 } 308 if sbc == nil { 309 sbc = &subBalancerWrapper{ 310 ClientConn: bg.cc, 311 id: id, 312 group: bg, 313 builder: builder, 314 buildOpts: bg.buildOpts, 315 } 316 if bg.outgoingStarted { 317 // Only start the balancer if bg is started. Otherwise, we only keep the 318 // static data. 319 sbc.startBalancer() 320 } 321 } else { 322 // When brining back a sub-balancer from cache, re-send the cached 323 // picker and state. 324 sbc.updateBalancerStateWithCachedPicker() 325 } 326 bg.idToBalancerConfig[id] = sbc 327 bg.outgoingMu.Unlock() 328} 329 330// Remove removes the balancer with id from the group. 331// 332// But doesn't close the balancer. The balancer is kept in a cache, and will be 333// closed after timeout. Cleanup work (closing sub-balancer and removing 334// subconns) will be done after timeout. 335func (bg *BalancerGroup) Remove(id string) { 336 bg.outgoingMu.Lock() 337 if sbToRemove, ok := bg.idToBalancerConfig[id]; ok { 338 if bg.outgoingStarted { 339 bg.balancerCache.Add(id, sbToRemove, func() { 340 // After timeout, when sub-balancer is removed from cache, need 341 // to close the underlying sub-balancer, and remove all its 342 // subconns. 343 bg.outgoingMu.Lock() 344 if bg.outgoingStarted { 345 sbToRemove.stopBalancer() 346 } 347 bg.outgoingMu.Unlock() 348 bg.cleanupSubConns(sbToRemove) 349 }) 350 } 351 delete(bg.idToBalancerConfig, id) 352 } else { 353 bg.logger.Infof("balancer group: trying to remove a non-existing locality from balancer group: %v", id) 354 } 355 bg.outgoingMu.Unlock() 356} 357 358// bg.remove(id) doesn't do cleanup for the sub-balancer. This function does 359// cleanup after the timeout. 360func (bg *BalancerGroup) cleanupSubConns(config *subBalancerWrapper) { 361 bg.incomingMu.Lock() 362 // Remove SubConns. This is only done after the balancer is 363 // actually closed. 364 // 365 // NOTE: if NewSubConn is called by this (closed) balancer later, the 366 // SubConn will be leaked. This shouldn't happen if the balancer 367 // implementation is correct. To make sure this never happens, we need to 368 // add another layer (balancer manager) between balancer group and the 369 // sub-balancers. 370 for sc, b := range bg.scToSubBalancer { 371 if b == config { 372 bg.cc.RemoveSubConn(sc) 373 delete(bg.scToSubBalancer, sc) 374 } 375 } 376 bg.incomingMu.Unlock() 377} 378 379// Following are actions from the parent grpc.ClientConn, forward to sub-balancers. 380 381// UpdateSubConnState handles the state for the subconn. It finds the 382// corresponding balancer and forwards the update. 383func (bg *BalancerGroup) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) { 384 bg.incomingMu.Lock() 385 config, ok := bg.scToSubBalancer[sc] 386 if !ok { 387 bg.incomingMu.Unlock() 388 return 389 } 390 if state.ConnectivityState == connectivity.Shutdown { 391 // Only delete sc from the map when state changed to Shutdown. 392 delete(bg.scToSubBalancer, sc) 393 } 394 bg.incomingMu.Unlock() 395 396 bg.outgoingMu.Lock() 397 config.updateSubConnState(sc, state) 398 bg.outgoingMu.Unlock() 399} 400 401// UpdateClientConnState handles ClientState (including balancer config and 402// addresses) from resolver. It finds the balancer and forwards the update. 403func (bg *BalancerGroup) UpdateClientConnState(id string, s balancer.ClientConnState) error { 404 bg.outgoingMu.Lock() 405 defer bg.outgoingMu.Unlock() 406 if config, ok := bg.idToBalancerConfig[id]; ok { 407 return config.updateClientConnState(s) 408 } 409 return nil 410} 411 412// ResolverError forwards resolver errors to all sub-balancers. 413func (bg *BalancerGroup) ResolverError(err error) { 414 bg.outgoingMu.Lock() 415 for _, config := range bg.idToBalancerConfig { 416 config.resolverError(err) 417 } 418 bg.outgoingMu.Unlock() 419} 420 421// Following are actions from sub-balancers, forward to ClientConn. 422 423// newSubConn: forward to ClientConn, and also create a map from sc to balancer, 424// so state update will find the right balancer. 425// 426// One note about removing SubConn: only forward to ClientConn, but not delete 427// from map. Delete sc from the map only when state changes to Shutdown. Since 428// it's just forwarding the action, there's no need for a removeSubConn() 429// wrapper function. 430func (bg *BalancerGroup) newSubConn(config *subBalancerWrapper, addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) { 431 // NOTE: if balancer with id was already removed, this should also return 432 // error. But since we call balancer.stopBalancer when removing the balancer, this 433 // shouldn't happen. 434 bg.incomingMu.Lock() 435 if !bg.incomingStarted { 436 bg.incomingMu.Unlock() 437 return nil, fmt.Errorf("NewSubConn is called after balancer group is closed") 438 } 439 sc, err := bg.cc.NewSubConn(addrs, opts) 440 if err != nil { 441 bg.incomingMu.Unlock() 442 return nil, err 443 } 444 bg.scToSubBalancer[sc] = config 445 bg.incomingMu.Unlock() 446 return sc, nil 447} 448 449// updateBalancerState: forward the new state to balancer state aggregator. The 450// aggregator will create an aggregated picker and an aggregated connectivity 451// state, then forward to ClientConn. 452func (bg *BalancerGroup) updateBalancerState(id string, state balancer.State) { 453 bg.logger.Infof("Balancer state update from locality %v, new state: %+v", id, state) 454 if bg.loadStore != nil { 455 // Only wrap the picker to do load reporting if loadStore was set. 456 state.Picker = newLoadReportPicker(state.Picker, id, bg.loadStore) 457 } 458 459 // Send new state to the aggregator, without holding the incomingMu. 460 // incomingMu is to protect all calls to the parent ClientConn, this update 461 // doesn't necessary trigger a call to ClientConn, and should already be 462 // protected by aggregator's mutex if necessary. 463 if bg.stateAggregator != nil { 464 bg.stateAggregator.UpdateState(id, state) 465 } 466} 467 468// Close closes the balancer. It stops sub-balancers, and removes the subconns. 469// The BalancerGroup can be restarted later. 470func (bg *BalancerGroup) Close() { 471 bg.incomingMu.Lock() 472 if bg.incomingStarted { 473 bg.incomingStarted = false 474 // Also remove all SubConns. 475 for sc := range bg.scToSubBalancer { 476 bg.cc.RemoveSubConn(sc) 477 delete(bg.scToSubBalancer, sc) 478 } 479 } 480 bg.incomingMu.Unlock() 481 482 // Clear(true) runs clear function to close sub-balancers in cache. It 483 // must be called out of outgoing mutex. 484 bg.balancerCache.Clear(true) 485 486 bg.outgoingMu.Lock() 487 if bg.outgoingStarted { 488 bg.outgoingStarted = false 489 for _, config := range bg.idToBalancerConfig { 490 config.stopBalancer() 491 } 492 } 493 bg.outgoingMu.Unlock() 494} 495 496const ( 497 serverLoadCPUName = "cpu_utilization" 498 serverLoadMemoryName = "mem_utilization" 499) 500 501type loadReportPicker struct { 502 p balancer.Picker 503 504 locality string 505 loadStore load.PerClusterReporter 506} 507 508func newLoadReportPicker(p balancer.Picker, id string, loadStore load.PerClusterReporter) *loadReportPicker { 509 return &loadReportPicker{ 510 p: p, 511 locality: id, 512 loadStore: loadStore, 513 } 514} 515 516func (lrp *loadReportPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { 517 res, err := lrp.p.Pick(info) 518 if err != nil { 519 return res, err 520 } 521 522 lrp.loadStore.CallStarted(lrp.locality) 523 oldDone := res.Done 524 res.Done = func(info balancer.DoneInfo) { 525 if oldDone != nil { 526 oldDone(info) 527 } 528 lrp.loadStore.CallFinished(lrp.locality, info.Err) 529 530 load, ok := info.ServerLoad.(*orcapb.OrcaLoadReport) 531 if !ok { 532 return 533 } 534 lrp.loadStore.CallServerLoad(lrp.locality, serverLoadCPUName, load.CpuUtilization) 535 lrp.loadStore.CallServerLoad(lrp.locality, serverLoadMemoryName, load.MemUtilization) 536 for n, d := range load.RequestCost { 537 lrp.loadStore.CallServerLoad(lrp.locality, n, d) 538 } 539 for n, d := range load.Utilization { 540 lrp.loadStore.CallServerLoad(lrp.locality, n, d) 541 } 542 } 543 return res, err 544} 545