1/* 2 * 3 * Copyright 2019 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package edsbalancer 19 20import ( 21 "time" 22 23 "google.golang.org/grpc/balancer" 24 "google.golang.org/grpc/balancer/base" 25 "google.golang.org/grpc/connectivity" 26 "google.golang.org/grpc/grpclog" 27) 28 29// handlePriorityChange handles priority after EDS adds/removes a 30// priority. 31// 32// - If all priorities were deleted, unset priorityInUse, and set parent 33// ClientConn to TransientFailure 34// - If priorityInUse wasn't set, this is either the first EDS resp, or the 35// previous EDS resp deleted everything. Set priorityInUse to 0, and start 0. 36// - If priorityInUse was deleted, send the picker from the new lowest priority 37// to parent ClientConn, and set priorityInUse to the new lowest. 38// - If priorityInUse has a non-Ready state, and also there's a priority lower 39// than priorityInUse (which means a lower priority was added), set the next 40// priority as new priorityInUse, and start the bg. 41func (xdsB *EDSBalancer) handlePriorityChange() { 42 xdsB.priorityMu.Lock() 43 defer xdsB.priorityMu.Unlock() 44 45 // Everything was removed by EDS. 46 if !xdsB.priorityLowest.isSet() { 47 xdsB.priorityInUse = newPriorityTypeUnset() 48 xdsB.cc.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure, Picker: base.NewErrPickerV2(balancer.ErrTransientFailure)}) 49 return 50 } 51 52 // priorityInUse wasn't set, use 0. 53 if !xdsB.priorityInUse.isSet() { 54 xdsB.startPriority(newPriorityType(0)) 55 return 56 } 57 58 // priorityInUse was deleted, use the new lowest. 59 if _, ok := xdsB.priorityToLocalities[xdsB.priorityInUse]; !ok { 60 xdsB.priorityInUse = xdsB.priorityLowest 61 if s, ok := xdsB.priorityToState[xdsB.priorityLowest]; ok { 62 xdsB.cc.UpdateState(*s) 63 } else { 64 // If state for priorityLowest is not found, this means priorityLowest was 65 // started, but never sent any update. The init timer fired and 66 // triggered the next priority. The old_priorityInUse (that was just 67 // deleted EDS) was picked later. 68 // 69 // We don't have an old state to send to parent, but we also don't 70 // want parent to keep using picker from old_priorityInUse. Send an 71 // update to trigger block picks until a new picker is ready. 72 xdsB.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Connecting, Picker: base.NewErrPickerV2(balancer.ErrNoSubConnAvailable)}) 73 } 74 return 75 } 76 77 // priorityInUse is not ready, look for next priority, and use if found. 78 if s, ok := xdsB.priorityToState[xdsB.priorityInUse]; ok && s.ConnectivityState != connectivity.Ready { 79 pNext := xdsB.priorityInUse.nextLower() 80 if _, ok := xdsB.priorityToLocalities[pNext]; ok { 81 xdsB.startPriority(pNext) 82 } 83 } 84} 85 86// startPriority sets priorityInUse to p, and starts the balancer group for p. 87// It also starts a timer to fall to next priority after timeout. 88// 89// Caller must hold priorityMu, priority must exist, and xdsB.priorityInUse must 90// be non-nil. 91func (xdsB *EDSBalancer) startPriority(priority priorityType) { 92 xdsB.priorityInUse = priority 93 p := xdsB.priorityToLocalities[priority] 94 // NOTE: this will eventually send addresses to sub-balancers. If the 95 // sub-balancer tries to update picker, it will result in a deadlock on 96 // priorityMu. But it's not an expected behavior for the balancer to 97 // update picker when handling addresses. 98 p.bg.start() 99 // startPriority can be called when 100 // 1. first EDS resp, start p0 101 // 2. a high priority goes Failure, start next 102 // 3. a high priority init timeout, start next 103 // 104 // In all the cases, the existing init timer is either closed, also already 105 // expired. There's no need to close the old timer. 106 xdsB.priorityInitTimer = time.AfterFunc(defaultPriorityInitTimeout, func() { 107 xdsB.priorityMu.Lock() 108 defer xdsB.priorityMu.Unlock() 109 if !xdsB.priorityInUse.equal(priority) { 110 return 111 } 112 xdsB.priorityInitTimer = nil 113 pNext := priority.nextLower() 114 if _, ok := xdsB.priorityToLocalities[pNext]; ok { 115 xdsB.startPriority(pNext) 116 } 117 }) 118} 119 120// handlePriorityWithNewState start/close priorities based on the connectivity 121// state. It returns whether the state should be forwarded to parent ClientConn. 122func (xdsB *EDSBalancer) handlePriorityWithNewState(priority priorityType, s balancer.State) bool { 123 xdsB.priorityMu.Lock() 124 defer xdsB.priorityMu.Unlock() 125 126 if !xdsB.priorityInUse.isSet() { 127 grpclog.Infof("eds: received picker update when no priority is in use (EDS returned an empty list)") 128 return false 129 } 130 131 if xdsB.priorityInUse.higherThan(priority) { 132 // Lower priorities should all be closed, this is an unexpected update. 133 grpclog.Infof("eds: received picker update from priority lower then priorityInUse") 134 return false 135 } 136 137 bState, ok := xdsB.priorityToState[priority] 138 if !ok { 139 bState = &balancer.State{} 140 xdsB.priorityToState[priority] = bState 141 } 142 oldState := bState.ConnectivityState 143 *bState = s 144 145 switch s.ConnectivityState { 146 case connectivity.Ready: 147 return xdsB.handlePriorityWithNewStateReady(priority) 148 case connectivity.TransientFailure: 149 return xdsB.handlePriorityWithNewStateTransientFailure(priority) 150 case connectivity.Connecting: 151 return xdsB.handlePriorityWithNewStateConnecting(priority, oldState) 152 default: 153 // New state is Idle, should never happen. Don't forward. 154 return false 155 } 156} 157 158// handlePriorityWithNewStateReady handles state Ready and decides whether to 159// forward update or not. 160// 161// An update with state Ready: 162// - If it's from higher priority: 163// - Forward the update 164// - Set the priority as priorityInUse 165// - Close all priorities lower than this one 166// - If it's from priorityInUse: 167// - Forward and do nothing else 168// 169// Caller must make sure priorityInUse is not higher than priority. 170// 171// Caller must hold priorityMu. 172func (xdsB *EDSBalancer) handlePriorityWithNewStateReady(priority priorityType) bool { 173 // If one priority higher or equal to priorityInUse goes Ready, stop the 174 // init timer. If update is from higher than priorityInUse, 175 // priorityInUse will be closed, and the init timer will become useless. 176 if timer := xdsB.priorityInitTimer; timer != nil { 177 timer.Stop() 178 xdsB.priorityInitTimer = nil 179 } 180 181 if xdsB.priorityInUse.lowerThan(priority) { 182 xdsB.priorityInUse = priority 183 for i := priority.nextLower(); !i.lowerThan(xdsB.priorityLowest); i = i.nextLower() { 184 xdsB.priorityToLocalities[i].bg.close() 185 } 186 return true 187 } 188 return true 189} 190 191// handlePriorityWithNewStateTransientFailure handles state TransientFailure and 192// decides whether to forward update or not. 193// 194// An update with state Failure: 195// - If it's from a higher priority: 196// - Do not forward, and do nothing 197// - If it's from priorityInUse: 198// - If there's no lower: 199// - Forward and do nothing else 200// - If there's a lower priority: 201// - Forward 202// - Set lower as priorityInUse 203// - Start lower 204// 205// Caller must make sure priorityInUse is not higher than priority. 206// 207// Caller must hold priorityMu. 208func (xdsB *EDSBalancer) handlePriorityWithNewStateTransientFailure(priority priorityType) bool { 209 if xdsB.priorityInUse.lowerThan(priority) { 210 return false 211 } 212 // priorityInUse sends a failure. Stop its init timer. 213 if timer := xdsB.priorityInitTimer; timer != nil { 214 timer.Stop() 215 xdsB.priorityInitTimer = nil 216 } 217 pNext := priority.nextLower() 218 if _, okNext := xdsB.priorityToLocalities[pNext]; !okNext { 219 return true 220 } 221 xdsB.startPriority(pNext) 222 return true 223} 224 225// handlePriorityWithNewStateConnecting handles state Connecting and decides 226// whether to forward update or not. 227// 228// An update with state Connecting: 229// - If it's from a higher priority 230// - Do nothing 231// - If it's from priorityInUse, the behavior depends on previous state. 232// 233// When new state is Connecting, the behavior depends on previous state. If the 234// previous state was Ready, this is a transition out from Ready to Connecting. 235// Assuming there are multiple backends in the same priority, this mean we are 236// in a bad situation and we should failover to the next priority (Side note: 237// the current connectivity state aggregating algorhtim (e.g. round-robin) is 238// not handling this right, because if many backends all go from Ready to 239// Connecting, the overall situation is more like TransientFailure, not 240// Connecting). 241// 242// If the previous state was Idle, we don't do anything special with failure, 243// and simply forward the update. The init timer should be in process, will 244// handle failover if it timeouts. If the previous state was TransientFailure, 245// we do not forward, because the lower priority is in use. 246// 247// Caller must make sure priorityInUse is not higher than priority. 248// 249// Caller must hold priorityMu. 250func (xdsB *EDSBalancer) handlePriorityWithNewStateConnecting(priority priorityType, oldState connectivity.State) bool { 251 if xdsB.priorityInUse.lowerThan(priority) { 252 return false 253 } 254 255 switch oldState { 256 case connectivity.Ready: 257 pNext := priority.nextLower() 258 if _, okNext := xdsB.priorityToLocalities[pNext]; !okNext { 259 return true 260 } 261 xdsB.startPriority(pNext) 262 return true 263 case connectivity.Idle: 264 return true 265 case connectivity.TransientFailure: 266 return false 267 default: 268 // Old state is Connecting or Shutdown. Don't forward. 269 return false 270 } 271} 272 273// priorityType represents the priority from EDS response. 274// 275// 0 is the highest priority. The bigger the number, the lower the priority. 276type priorityType struct { 277 set bool 278 p uint32 279} 280 281func newPriorityType(p uint32) priorityType { 282 return priorityType{ 283 set: true, 284 p: p, 285 } 286} 287 288func newPriorityTypeUnset() priorityType { 289 return priorityType{} 290} 291 292func (p priorityType) isSet() bool { 293 return p.set 294} 295 296func (p priorityType) equal(p2 priorityType) bool { 297 if !p.isSet() || !p2.isSet() { 298 panic("priority unset") 299 } 300 return p == p2 301} 302 303func (p priorityType) higherThan(p2 priorityType) bool { 304 if !p.isSet() || !p2.isSet() { 305 panic("priority unset") 306 } 307 return p.p < p2.p 308} 309 310func (p priorityType) lowerThan(p2 priorityType) bool { 311 if !p.isSet() || !p2.isSet() { 312 panic("priority unset") 313 } 314 return p.p > p2.p 315} 316 317func (p priorityType) nextLower() priorityType { 318 if !p.isSet() { 319 panic("priority unset") 320 } 321 return priorityType{ 322 set: true, 323 p: p.p + 1, 324 } 325} 326