1/* 2 * 3 * Copyright 2019 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package edsbalancer 19 20import ( 21 "errors" 22 "fmt" 23 "time" 24 25 "google.golang.org/grpc/balancer" 26 "google.golang.org/grpc/balancer/base" 27 "google.golang.org/grpc/connectivity" 28) 29 30var errAllPrioritiesRemoved = errors.New("eds: no locality is provided, all priorities are removed") 31 32// handlePriorityChange handles priority after EDS adds/removes a 33// priority. 34// 35// - If all priorities were deleted, unset priorityInUse, and set parent 36// ClientConn to TransientFailure 37// - If priorityInUse wasn't set, this is either the first EDS resp, or the 38// previous EDS resp deleted everything. Set priorityInUse to 0, and start 0. 39// - If priorityInUse was deleted, send the picker from the new lowest priority 40// to parent ClientConn, and set priorityInUse to the new lowest. 41// - If priorityInUse has a non-Ready state, and also there's a priority lower 42// than priorityInUse (which means a lower priority was added), set the next 43// priority as new priorityInUse, and start the bg. 44func (edsImpl *edsBalancerImpl) handlePriorityChange() { 45 edsImpl.priorityMu.Lock() 46 defer edsImpl.priorityMu.Unlock() 47 48 // Everything was removed by EDS. 49 if !edsImpl.priorityLowest.isSet() { 50 edsImpl.priorityInUse = newPriorityTypeUnset() 51 // Stop the init timer. This can happen if the only priority is removed 52 // shortly after it's added. 53 if timer := edsImpl.priorityInitTimer; timer != nil { 54 timer.Stop() 55 edsImpl.priorityInitTimer = nil 56 } 57 edsImpl.cc.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure, Picker: base.NewErrPicker(errAllPrioritiesRemoved)}) 58 return 59 } 60 61 // priorityInUse wasn't set, use 0. 62 if !edsImpl.priorityInUse.isSet() { 63 edsImpl.logger.Infof("Switching priority from unset to %v", 0) 64 edsImpl.startPriority(newPriorityType(0)) 65 return 66 } 67 68 // priorityInUse was deleted, use the new lowest. 69 if _, ok := edsImpl.priorityToLocalities[edsImpl.priorityInUse]; !ok { 70 oldP := edsImpl.priorityInUse 71 edsImpl.priorityInUse = edsImpl.priorityLowest 72 edsImpl.logger.Infof("Switching priority from %v to %v, because former was deleted", oldP, edsImpl.priorityInUse) 73 if s, ok := edsImpl.priorityToState[edsImpl.priorityLowest]; ok { 74 edsImpl.cc.UpdateState(*s) 75 } else { 76 // If state for priorityLowest is not found, this means priorityLowest was 77 // started, but never sent any update. The init timer fired and 78 // triggered the next priority. The old_priorityInUse (that was just 79 // deleted EDS) was picked later. 80 // 81 // We don't have an old state to send to parent, but we also don't 82 // want parent to keep using picker from old_priorityInUse. Send an 83 // update to trigger block picks until a new picker is ready. 84 edsImpl.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Connecting, Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable)}) 85 } 86 return 87 } 88 89 // priorityInUse is not ready, look for next priority, and use if found. 90 if s, ok := edsImpl.priorityToState[edsImpl.priorityInUse]; ok && s.ConnectivityState != connectivity.Ready { 91 pNext := edsImpl.priorityInUse.nextLower() 92 if _, ok := edsImpl.priorityToLocalities[pNext]; ok { 93 edsImpl.logger.Infof("Switching priority from %v to %v, because latter was added, and former wasn't Ready") 94 edsImpl.startPriority(pNext) 95 } 96 } 97} 98 99// startPriority sets priorityInUse to p, and starts the balancer group for p. 100// It also starts a timer to fall to next priority after timeout. 101// 102// Caller must hold priorityMu, priority must exist, and edsImpl.priorityInUse 103// must be non-nil. 104func (edsImpl *edsBalancerImpl) startPriority(priority priorityType) { 105 edsImpl.priorityInUse = priority 106 p := edsImpl.priorityToLocalities[priority] 107 // NOTE: this will eventually send addresses to sub-balancers. If the 108 // sub-balancer tries to update picker, it will result in a deadlock on 109 // priorityMu in the update is handled synchronously. The deadlock is 110 // currently avoided by handling balancer update in a goroutine (the run 111 // goroutine in the parent eds balancer). When priority balancer is split 112 // into its own, this asynchronous state handling needs to be copied. 113 p.stateAggregator.Start() 114 p.bg.Start() 115 // startPriority can be called when 116 // 1. first EDS resp, start p0 117 // 2. a high priority goes Failure, start next 118 // 3. a high priority init timeout, start next 119 // 120 // In all the cases, the existing init timer is either closed, also already 121 // expired. There's no need to close the old timer. 122 edsImpl.priorityInitTimer = time.AfterFunc(defaultPriorityInitTimeout, func() { 123 edsImpl.priorityMu.Lock() 124 defer edsImpl.priorityMu.Unlock() 125 if !edsImpl.priorityInUse.isSet() || !edsImpl.priorityInUse.equal(priority) { 126 return 127 } 128 edsImpl.priorityInitTimer = nil 129 pNext := priority.nextLower() 130 if _, ok := edsImpl.priorityToLocalities[pNext]; ok { 131 edsImpl.startPriority(pNext) 132 } 133 }) 134} 135 136// handlePriorityWithNewState start/close priorities based on the connectivity 137// state. It returns whether the state should be forwarded to parent ClientConn. 138func (edsImpl *edsBalancerImpl) handlePriorityWithNewState(priority priorityType, s balancer.State) bool { 139 edsImpl.priorityMu.Lock() 140 defer edsImpl.priorityMu.Unlock() 141 142 if !edsImpl.priorityInUse.isSet() { 143 edsImpl.logger.Infof("eds: received picker update when no priority is in use (EDS returned an empty list)") 144 return false 145 } 146 147 if edsImpl.priorityInUse.higherThan(priority) { 148 // Lower priorities should all be closed, this is an unexpected update. 149 edsImpl.logger.Infof("eds: received picker update from priority lower then priorityInUse") 150 return false 151 } 152 153 bState, ok := edsImpl.priorityToState[priority] 154 if !ok { 155 bState = &balancer.State{} 156 edsImpl.priorityToState[priority] = bState 157 } 158 oldState := bState.ConnectivityState 159 *bState = s 160 161 switch s.ConnectivityState { 162 case connectivity.Ready: 163 return edsImpl.handlePriorityWithNewStateReady(priority) 164 case connectivity.TransientFailure: 165 return edsImpl.handlePriorityWithNewStateTransientFailure(priority) 166 case connectivity.Connecting: 167 return edsImpl.handlePriorityWithNewStateConnecting(priority, oldState) 168 default: 169 // New state is Idle, should never happen. Don't forward. 170 return false 171 } 172} 173 174// handlePriorityWithNewStateReady handles state Ready and decides whether to 175// forward update or not. 176// 177// An update with state Ready: 178// - If it's from higher priority: 179// - Forward the update 180// - Set the priority as priorityInUse 181// - Close all priorities lower than this one 182// - If it's from priorityInUse: 183// - Forward and do nothing else 184// 185// Caller must make sure priorityInUse is not higher than priority. 186// 187// Caller must hold priorityMu. 188func (edsImpl *edsBalancerImpl) handlePriorityWithNewStateReady(priority priorityType) bool { 189 // If one priority higher or equal to priorityInUse goes Ready, stop the 190 // init timer. If update is from higher than priorityInUse, 191 // priorityInUse will be closed, and the init timer will become useless. 192 if timer := edsImpl.priorityInitTimer; timer != nil { 193 timer.Stop() 194 edsImpl.priorityInitTimer = nil 195 } 196 197 if edsImpl.priorityInUse.lowerThan(priority) { 198 edsImpl.logger.Infof("Switching priority from %v to %v, because latter became Ready", edsImpl.priorityInUse, priority) 199 edsImpl.priorityInUse = priority 200 for i := priority.nextLower(); !i.lowerThan(edsImpl.priorityLowest); i = i.nextLower() { 201 bgwc := edsImpl.priorityToLocalities[i] 202 bgwc.stateAggregator.Stop() 203 bgwc.bg.Close() 204 } 205 return true 206 } 207 return true 208} 209 210// handlePriorityWithNewStateTransientFailure handles state TransientFailure and 211// decides whether to forward update or not. 212// 213// An update with state Failure: 214// - If it's from a higher priority: 215// - Do not forward, and do nothing 216// - If it's from priorityInUse: 217// - If there's no lower: 218// - Forward and do nothing else 219// - If there's a lower priority: 220// - Forward 221// - Set lower as priorityInUse 222// - Start lower 223// 224// Caller must make sure priorityInUse is not higher than priority. 225// 226// Caller must hold priorityMu. 227func (edsImpl *edsBalancerImpl) handlePriorityWithNewStateTransientFailure(priority priorityType) bool { 228 if edsImpl.priorityInUse.lowerThan(priority) { 229 return false 230 } 231 // priorityInUse sends a failure. Stop its init timer. 232 if timer := edsImpl.priorityInitTimer; timer != nil { 233 timer.Stop() 234 edsImpl.priorityInitTimer = nil 235 } 236 pNext := priority.nextLower() 237 if _, okNext := edsImpl.priorityToLocalities[pNext]; !okNext { 238 return true 239 } 240 edsImpl.logger.Infof("Switching priority from %v to %v, because former became TransientFailure", priority, pNext) 241 edsImpl.startPriority(pNext) 242 return true 243} 244 245// handlePriorityWithNewStateConnecting handles state Connecting and decides 246// whether to forward update or not. 247// 248// An update with state Connecting: 249// - If it's from a higher priority 250// - Do nothing 251// - If it's from priorityInUse, the behavior depends on previous state. 252// 253// When new state is Connecting, the behavior depends on previous state. If the 254// previous state was Ready, this is a transition out from Ready to Connecting. 255// Assuming there are multiple backends in the same priority, this mean we are 256// in a bad situation and we should failover to the next priority (Side note: 257// the current connectivity state aggregating algorhtim (e.g. round-robin) is 258// not handling this right, because if many backends all go from Ready to 259// Connecting, the overall situation is more like TransientFailure, not 260// Connecting). 261// 262// If the previous state was Idle, we don't do anything special with failure, 263// and simply forward the update. The init timer should be in process, will 264// handle failover if it timeouts. If the previous state was TransientFailure, 265// we do not forward, because the lower priority is in use. 266// 267// Caller must make sure priorityInUse is not higher than priority. 268// 269// Caller must hold priorityMu. 270func (edsImpl *edsBalancerImpl) handlePriorityWithNewStateConnecting(priority priorityType, oldState connectivity.State) bool { 271 if edsImpl.priorityInUse.lowerThan(priority) { 272 return false 273 } 274 275 switch oldState { 276 case connectivity.Ready: 277 pNext := priority.nextLower() 278 if _, okNext := edsImpl.priorityToLocalities[pNext]; !okNext { 279 return true 280 } 281 edsImpl.logger.Infof("Switching priority from %v to %v, because former became Connecting from Ready", priority, pNext) 282 edsImpl.startPriority(pNext) 283 return true 284 case connectivity.Idle: 285 return true 286 case connectivity.TransientFailure: 287 return false 288 default: 289 // Old state is Connecting or Shutdown. Don't forward. 290 return false 291 } 292} 293 294// priorityType represents the priority from EDS response. 295// 296// 0 is the highest priority. The bigger the number, the lower the priority. 297type priorityType struct { 298 set bool 299 p uint32 300} 301 302func newPriorityType(p uint32) priorityType { 303 return priorityType{ 304 set: true, 305 p: p, 306 } 307} 308 309func newPriorityTypeUnset() priorityType { 310 return priorityType{} 311} 312 313func (p priorityType) isSet() bool { 314 return p.set 315} 316 317func (p priorityType) equal(p2 priorityType) bool { 318 if !p.isSet() && !p2.isSet() { 319 return true 320 } 321 if !p.isSet() || !p2.isSet() { 322 return false 323 } 324 return p == p2 325} 326 327func (p priorityType) higherThan(p2 priorityType) bool { 328 if !p.isSet() || !p2.isSet() { 329 // TODO(menghanl): return an appropriate value instead of panic. 330 panic("priority unset") 331 } 332 return p.p < p2.p 333} 334 335func (p priorityType) lowerThan(p2 priorityType) bool { 336 if !p.isSet() || !p2.isSet() { 337 // TODO(menghanl): return an appropriate value instead of panic. 338 panic("priority unset") 339 } 340 return p.p > p2.p 341} 342 343func (p priorityType) nextLower() priorityType { 344 if !p.isSet() { 345 panic("priority unset") 346 } 347 return priorityType{ 348 set: true, 349 p: p.p + 1, 350 } 351} 352 353func (p priorityType) String() string { 354 if !p.set { 355 return "Nil" 356 } 357 return fmt.Sprint(p.p) 358} 359