1/* 2 * Copyright 2019 gRPC authors. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17package edsbalancer 18 19import ( 20 "encoding/json" 21 "reflect" 22 "sync" 23 "time" 24 25 "google.golang.org/grpc/balancer" 26 "google.golang.org/grpc/balancer/roundrobin" 27 "google.golang.org/grpc/balancer/weightedroundrobin" 28 "google.golang.org/grpc/codes" 29 "google.golang.org/grpc/connectivity" 30 "google.golang.org/grpc/grpclog" 31 "google.golang.org/grpc/resolver" 32 "google.golang.org/grpc/status" 33 "google.golang.org/grpc/xds/internal" 34 "google.golang.org/grpc/xds/internal/balancer/lrs" 35 xdsclient "google.golang.org/grpc/xds/internal/client" 36) 37 38// TODO: make this a environment variable? 39var defaultPriorityInitTimeout = 10 * time.Second 40 41type localityConfig struct { 42 weight uint32 43 addrs []resolver.Address 44} 45 46// balancerGroupWithConfig contains the localities with the same priority. It 47// manages all localities using a balancerGroup. 48type balancerGroupWithConfig struct { 49 bg *balancerGroup 50 configs map[internal.Locality]*localityConfig 51} 52 53// edsBalancerImpl does load balancing based on the EDS responses. Note that it 54// doesn't implement the balancer interface. It's intended to be used by a high 55// level balancer implementation. 56// 57// The localities are picked as weighted round robin. A configurable child 58// policy is used to manage endpoints in each locality. 59type edsBalancerImpl struct { 60 cc balancer.ClientConn 61 62 subBalancerBuilder balancer.Builder 63 loadStore lrs.Store 64 priorityToLocalities map[priorityType]*balancerGroupWithConfig 65 66 // There's no need to hold any mutexes at the same time. The order to take 67 // mutex should be: priorityMu > subConnMu, but this is implicit via 68 // balancers (starting balancer with next priority while holding priorityMu, 69 // and the balancer may create new SubConn). 70 71 priorityMu sync.Mutex 72 // priorities are pointers, and will be nil when EDS returns empty result. 73 priorityInUse priorityType 74 priorityLowest priorityType 75 priorityToState map[priorityType]*balancer.State 76 // The timer to give a priority 10 seconds to connect. And if the priority 77 // doesn't go into Ready/Failure, start the next priority. 78 // 79 // One timer is enough because there can be at most one priority in init 80 // state. 81 priorityInitTimer *time.Timer 82 83 subConnMu sync.Mutex 84 subConnToPriority map[balancer.SubConn]priorityType 85 86 pickerMu sync.Mutex 87 drops []*dropper 88 innerState balancer.State // The state of the picker without drop support. 89} 90 91// newEDSBalancerImpl create a new edsBalancerImpl. 92func newEDSBalancerImpl(cc balancer.ClientConn, loadStore lrs.Store) *edsBalancerImpl { 93 edsImpl := &edsBalancerImpl{ 94 cc: cc, 95 subBalancerBuilder: balancer.Get(roundrobin.Name), 96 97 priorityToLocalities: make(map[priorityType]*balancerGroupWithConfig), 98 priorityToState: make(map[priorityType]*balancer.State), 99 subConnToPriority: make(map[balancer.SubConn]priorityType), 100 loadStore: loadStore, 101 } 102 // Don't start balancer group here. Start it when handling the first EDS 103 // response. Otherwise the balancer group will be started with round-robin, 104 // and if users specify a different sub-balancer, all balancers in balancer 105 // group will be closed and recreated when sub-balancer update happens. 106 return edsImpl 107} 108 109// HandleChildPolicy updates the child balancers handling endpoints. Child 110// policy is roundrobin by default. If the specified balancer is not installed, 111// the old child balancer will be used. 112// 113// HandleChildPolicy and HandleEDSResponse must be called by the same goroutine. 114func (edsImpl *edsBalancerImpl) HandleChildPolicy(name string, config json.RawMessage) { 115 if edsImpl.subBalancerBuilder.Name() == name { 116 return 117 } 118 newSubBalancerBuilder := balancer.Get(name) 119 if newSubBalancerBuilder == nil { 120 grpclog.Infof("edsBalancerImpl: failed to find balancer with name %q, keep using %q", name, edsImpl.subBalancerBuilder.Name()) 121 return 122 } 123 edsImpl.subBalancerBuilder = newSubBalancerBuilder 124 for _, bgwc := range edsImpl.priorityToLocalities { 125 if bgwc == nil { 126 continue 127 } 128 for id, config := range bgwc.configs { 129 // TODO: (eds) add support to balancer group to support smoothly 130 // switching sub-balancers (keep old balancer around until new 131 // balancer becomes ready). 132 bgwc.bg.remove(id) 133 bgwc.bg.add(id, config.weight, edsImpl.subBalancerBuilder) 134 bgwc.bg.handleResolvedAddrs(id, config.addrs) 135 } 136 } 137} 138 139// updateDrops compares new drop policies with the old. If they are different, 140// it updates the drop policies and send ClientConn an updated picker. 141func (edsImpl *edsBalancerImpl) updateDrops(dropPolicies []xdsclient.OverloadDropConfig) { 142 var ( 143 newDrops []*dropper 144 dropsChanged bool 145 ) 146 for i, dropPolicy := range dropPolicies { 147 var ( 148 numerator = dropPolicy.Numerator 149 denominator = dropPolicy.Denominator 150 ) 151 newDrops = append(newDrops, newDropper(numerator, denominator, dropPolicy.Category)) 152 153 // The following reading edsImpl.drops doesn't need mutex because it can only 154 // be updated by the code following. 155 if dropsChanged { 156 continue 157 } 158 if i >= len(edsImpl.drops) { 159 dropsChanged = true 160 continue 161 } 162 if oldDrop := edsImpl.drops[i]; numerator != oldDrop.numerator || denominator != oldDrop.denominator { 163 dropsChanged = true 164 } 165 } 166 if dropsChanged { 167 edsImpl.pickerMu.Lock() 168 edsImpl.drops = newDrops 169 if edsImpl.innerState.Picker != nil { 170 // Update picker with old inner picker, new drops. 171 edsImpl.cc.UpdateState(balancer.State{ 172 ConnectivityState: edsImpl.innerState.ConnectivityState, 173 Picker: newDropPicker(edsImpl.innerState.Picker, newDrops, edsImpl.loadStore)}, 174 ) 175 } 176 edsImpl.pickerMu.Unlock() 177 } 178} 179 180// HandleEDSResponse handles the EDS response and creates/deletes localities and 181// SubConns. It also handles drops. 182// 183// HandleChildPolicy and HandleEDSResponse must be called by the same goroutine. 184func (edsImpl *edsBalancerImpl) HandleEDSResponse(edsResp *xdsclient.EDSUpdate) { 185 // TODO: Unhandled fields from EDS response: 186 // - edsResp.GetPolicy().GetOverprovisioningFactor() 187 // - locality.GetPriority() 188 // - lbEndpoint.GetMetadata(): contains BNS name, send to sub-balancers 189 // - as service config or as resolved address 190 // - if socketAddress is not ip:port 191 // - socketAddress.GetNamedPort(), socketAddress.GetResolverName() 192 // - resolve endpoint's name with another resolver 193 194 edsImpl.updateDrops(edsResp.Drops) 195 196 // Filter out all localities with weight 0. 197 // 198 // Locality weighted load balancer can be enabled by setting an option in 199 // CDS, and the weight of each locality. Currently, without the guarantee 200 // that CDS is always sent, we assume locality weighted load balance is 201 // always enabled, and ignore all weight 0 localities. 202 // 203 // In the future, we should look at the config in CDS response and decide 204 // whether locality weight matters. 205 newLocalitiesWithPriority := make(map[priorityType][]xdsclient.Locality) 206 for _, locality := range edsResp.Localities { 207 if locality.Weight == 0 { 208 continue 209 } 210 priority := newPriorityType(locality.Priority) 211 newLocalitiesWithPriority[priority] = append(newLocalitiesWithPriority[priority], locality) 212 } 213 214 var ( 215 priorityLowest priorityType 216 priorityChanged bool 217 ) 218 219 for priority, newLocalities := range newLocalitiesWithPriority { 220 if !priorityLowest.isSet() || priorityLowest.higherThan(priority) { 221 priorityLowest = priority 222 } 223 224 bgwc, ok := edsImpl.priorityToLocalities[priority] 225 if !ok { 226 // Create balancer group if it's never created (this is the first 227 // time this priority is received). We don't start it here. It may 228 // be started when necessary (e.g. when higher is down, or if it's a 229 // new lowest priority). 230 bgwc = &balancerGroupWithConfig{ 231 bg: newBalancerGroup( 232 edsImpl.ccWrapperWithPriority(priority), edsImpl.loadStore, 233 ), 234 configs: make(map[internal.Locality]*localityConfig), 235 } 236 edsImpl.priorityToLocalities[priority] = bgwc 237 priorityChanged = true 238 } 239 edsImpl.handleEDSResponsePerPriority(bgwc, newLocalities) 240 } 241 edsImpl.priorityLowest = priorityLowest 242 243 // Delete priorities that are removed in the latest response, and also close 244 // the balancer group. 245 for p, bgwc := range edsImpl.priorityToLocalities { 246 if _, ok := newLocalitiesWithPriority[p]; !ok { 247 delete(edsImpl.priorityToLocalities, p) 248 bgwc.bg.close() 249 delete(edsImpl.priorityToState, p) 250 priorityChanged = true 251 } 252 } 253 254 // If priority was added/removed, it may affect the balancer group to use. 255 // E.g. priorityInUse was removed, or all priorities are down, and a new 256 // lower priority was added. 257 if priorityChanged { 258 edsImpl.handlePriorityChange() 259 } 260} 261 262func (edsImpl *edsBalancerImpl) handleEDSResponsePerPriority(bgwc *balancerGroupWithConfig, newLocalities []xdsclient.Locality) { 263 // newLocalitiesSet contains all names of localities in the new EDS response 264 // for the same priority. It's used to delete localities that are removed in 265 // the new EDS response. 266 newLocalitiesSet := make(map[internal.Locality]struct{}) 267 for _, locality := range newLocalities { 268 // One balancer for each locality. 269 270 lid := locality.ID 271 newLocalitiesSet[lid] = struct{}{} 272 273 newWeight := locality.Weight 274 var newAddrs []resolver.Address 275 for _, lbEndpoint := range locality.Endpoints { 276 // Filter out all "unhealthy" endpoints (unknown and 277 // healthy are both considered to be healthy: 278 // https://www.envoyproxy.io/docs/envoy/latest/api-v2/api/v2/core/health_check.proto#envoy-api-enum-core-healthstatus). 279 if lbEndpoint.HealthStatus != xdsclient.EndpointHealthStatusHealthy && 280 lbEndpoint.HealthStatus != xdsclient.EndpointHealthStatusUnknown { 281 continue 282 } 283 284 address := resolver.Address{ 285 Addr: lbEndpoint.Address, 286 } 287 if edsImpl.subBalancerBuilder.Name() == weightedroundrobin.Name && lbEndpoint.Weight != 0 { 288 address.Metadata = &weightedroundrobin.AddrInfo{ 289 Weight: lbEndpoint.Weight, 290 } 291 } 292 newAddrs = append(newAddrs, address) 293 } 294 var weightChanged, addrsChanged bool 295 config, ok := bgwc.configs[lid] 296 if !ok { 297 // A new balancer, add it to balancer group and balancer map. 298 bgwc.bg.add(lid, newWeight, edsImpl.subBalancerBuilder) 299 config = &localityConfig{ 300 weight: newWeight, 301 } 302 bgwc.configs[lid] = config 303 304 // weightChanged is false for new locality, because there's no need 305 // to update weight in bg. 306 addrsChanged = true 307 } else { 308 // Compare weight and addrs. 309 if config.weight != newWeight { 310 weightChanged = true 311 } 312 if !reflect.DeepEqual(config.addrs, newAddrs) { 313 addrsChanged = true 314 } 315 } 316 317 if weightChanged { 318 config.weight = newWeight 319 bgwc.bg.changeWeight(lid, newWeight) 320 } 321 322 if addrsChanged { 323 config.addrs = newAddrs 324 bgwc.bg.handleResolvedAddrs(lid, newAddrs) 325 } 326 } 327 328 // Delete localities that are removed in the latest response. 329 for lid := range bgwc.configs { 330 if _, ok := newLocalitiesSet[lid]; !ok { 331 bgwc.bg.remove(lid) 332 delete(bgwc.configs, lid) 333 } 334 } 335} 336 337// HandleSubConnStateChange handles the state change and update pickers accordingly. 338func (edsImpl *edsBalancerImpl) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) { 339 edsImpl.subConnMu.Lock() 340 var bgwc *balancerGroupWithConfig 341 if p, ok := edsImpl.subConnToPriority[sc]; ok { 342 if s == connectivity.Shutdown { 343 // Only delete sc from the map when state changed to Shutdown. 344 delete(edsImpl.subConnToPriority, sc) 345 } 346 bgwc = edsImpl.priorityToLocalities[p] 347 } 348 edsImpl.subConnMu.Unlock() 349 if bgwc == nil { 350 grpclog.Infof("edsBalancerImpl: priority not found for sc state change") 351 return 352 } 353 if bg := bgwc.bg; bg != nil { 354 bg.handleSubConnStateChange(sc, s) 355 } 356} 357 358// updateState first handles priority, and then wraps picker in a drop picker 359// before forwarding the update. 360func (edsImpl *edsBalancerImpl) updateState(priority priorityType, s balancer.State) { 361 _, ok := edsImpl.priorityToLocalities[priority] 362 if !ok { 363 grpclog.Infof("eds: received picker update from unknown priority") 364 return 365 } 366 367 if edsImpl.handlePriorityWithNewState(priority, s) { 368 edsImpl.pickerMu.Lock() 369 defer edsImpl.pickerMu.Unlock() 370 edsImpl.innerState = s 371 // Don't reset drops when it's a state change. 372 edsImpl.cc.UpdateState(balancer.State{ConnectivityState: s.ConnectivityState, Picker: newDropPicker(s.Picker, edsImpl.drops, edsImpl.loadStore)}) 373 } 374} 375 376func (edsImpl *edsBalancerImpl) ccWrapperWithPriority(priority priorityType) *edsBalancerWrapperCC { 377 return &edsBalancerWrapperCC{ 378 ClientConn: edsImpl.cc, 379 priority: priority, 380 parent: edsImpl, 381 } 382} 383 384// edsBalancerWrapperCC implements the balancer.ClientConn API and get passed to 385// each balancer group. It contains the locality priority. 386type edsBalancerWrapperCC struct { 387 balancer.ClientConn 388 priority priorityType 389 parent *edsBalancerImpl 390} 391 392func (ebwcc *edsBalancerWrapperCC) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) { 393 return ebwcc.parent.newSubConn(ebwcc.priority, addrs, opts) 394} 395func (ebwcc *edsBalancerWrapperCC) UpdateBalancerState(state connectivity.State, picker balancer.Picker) { 396 grpclog.Fatalln("not implemented") 397} 398func (ebwcc *edsBalancerWrapperCC) UpdateState(state balancer.State) { 399 ebwcc.parent.updateState(ebwcc.priority, state) 400} 401 402func (edsImpl *edsBalancerImpl) newSubConn(priority priorityType, addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) { 403 sc, err := edsImpl.cc.NewSubConn(addrs, opts) 404 if err != nil { 405 return nil, err 406 } 407 edsImpl.subConnMu.Lock() 408 edsImpl.subConnToPriority[sc] = priority 409 edsImpl.subConnMu.Unlock() 410 return sc, nil 411} 412 413// Close closes the balancer. 414func (edsImpl *edsBalancerImpl) Close() { 415 for _, bgwc := range edsImpl.priorityToLocalities { 416 if bg := bgwc.bg; bg != nil { 417 bg.close() 418 } 419 } 420} 421 422type dropPicker struct { 423 drops []*dropper 424 p balancer.V2Picker 425 loadStore lrs.Store 426} 427 428func newDropPicker(p balancer.V2Picker, drops []*dropper, loadStore lrs.Store) *dropPicker { 429 return &dropPicker{ 430 drops: drops, 431 p: p, 432 loadStore: loadStore, 433 } 434} 435 436func (d *dropPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { 437 var ( 438 drop bool 439 category string 440 ) 441 for _, dp := range d.drops { 442 if dp.drop() { 443 drop = true 444 category = dp.category 445 break 446 } 447 } 448 if drop { 449 if d.loadStore != nil { 450 d.loadStore.CallDropped(category) 451 } 452 return balancer.PickResult{}, status.Errorf(codes.Unavailable, "RPC is dropped") 453 } 454 // TODO: (eds) don't drop unless the inner picker is READY. Similar to 455 // https://github.com/grpc/grpc-go/issues/2622. 456 return d.p.Pick(info) 457} 458