1/*
2 *
3 * Copyright 2019 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package edsbalancer
19
20import (
21	"errors"
22	"fmt"
23	"time"
24
25	"google.golang.org/grpc/balancer"
26	"google.golang.org/grpc/balancer/base"
27	"google.golang.org/grpc/connectivity"
28)
29
30var errAllPrioritiesRemoved = errors.New("eds: no locality is provided, all priorities are removed")
31
32// handlePriorityChange handles priority after EDS adds/removes a
33// priority.
34//
35// - If all priorities were deleted, unset priorityInUse, and set parent
36// ClientConn to TransientFailure
37// - If priorityInUse wasn't set, this is either the first EDS resp, or the
38// previous EDS resp deleted everything. Set priorityInUse to 0, and start 0.
39// - If priorityInUse was deleted, send the picker from the new lowest priority
40// to parent ClientConn, and set priorityInUse to the new lowest.
41// - If priorityInUse has a non-Ready state, and also there's a priority lower
42// than priorityInUse (which means a lower priority was added), set the next
43// priority as new priorityInUse, and start the bg.
44func (edsImpl *edsBalancerImpl) handlePriorityChange() {
45	edsImpl.priorityMu.Lock()
46	defer edsImpl.priorityMu.Unlock()
47
48	// Everything was removed by EDS.
49	if !edsImpl.priorityLowest.isSet() {
50		edsImpl.priorityInUse = newPriorityTypeUnset()
51		// Stop the init timer. This can happen if the only priority is removed
52		// shortly after it's added.
53		if timer := edsImpl.priorityInitTimer; timer != nil {
54			timer.Stop()
55			edsImpl.priorityInitTimer = nil
56		}
57		edsImpl.cc.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure, Picker: base.NewErrPicker(errAllPrioritiesRemoved)})
58		return
59	}
60
61	// priorityInUse wasn't set, use 0.
62	if !edsImpl.priorityInUse.isSet() {
63		edsImpl.logger.Infof("Switching priority from unset to %v", 0)
64		edsImpl.startPriority(newPriorityType(0))
65		return
66	}
67
68	// priorityInUse was deleted, use the new lowest.
69	if _, ok := edsImpl.priorityToLocalities[edsImpl.priorityInUse]; !ok {
70		oldP := edsImpl.priorityInUse
71		edsImpl.priorityInUse = edsImpl.priorityLowest
72		edsImpl.logger.Infof("Switching priority from %v to %v, because former was deleted", oldP, edsImpl.priorityInUse)
73		if s, ok := edsImpl.priorityToState[edsImpl.priorityLowest]; ok {
74			edsImpl.cc.UpdateState(*s)
75		} else {
76			// If state for priorityLowest is not found, this means priorityLowest was
77			// started, but never sent any update. The init timer fired and
78			// triggered the next priority. The old_priorityInUse (that was just
79			// deleted EDS) was picked later.
80			//
81			// We don't have an old state to send to parent, but we also don't
82			// want parent to keep using picker from old_priorityInUse. Send an
83			// update to trigger block picks until a new picker is ready.
84			edsImpl.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Connecting, Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable)})
85		}
86		return
87	}
88
89	// priorityInUse is not ready, look for next priority, and use if found.
90	if s, ok := edsImpl.priorityToState[edsImpl.priorityInUse]; ok && s.ConnectivityState != connectivity.Ready {
91		pNext := edsImpl.priorityInUse.nextLower()
92		if _, ok := edsImpl.priorityToLocalities[pNext]; ok {
93			edsImpl.logger.Infof("Switching priority from %v to %v, because latter was added, and former wasn't Ready")
94			edsImpl.startPriority(pNext)
95		}
96	}
97}
98
99// startPriority sets priorityInUse to p, and starts the balancer group for p.
100// It also starts a timer to fall to next priority after timeout.
101//
102// Caller must hold priorityMu, priority must exist, and edsImpl.priorityInUse
103// must be non-nil.
104func (edsImpl *edsBalancerImpl) startPriority(priority priorityType) {
105	edsImpl.priorityInUse = priority
106	p := edsImpl.priorityToLocalities[priority]
107	// NOTE: this will eventually send addresses to sub-balancers. If the
108	// sub-balancer tries to update picker, it will result in a deadlock on
109	// priorityMu in the update is handled synchronously. The deadlock is
110	// currently avoided by handling balancer update in a goroutine (the run
111	// goroutine in the parent eds balancer). When priority balancer is split
112	// into its own, this asynchronous state handling needs to be copied.
113	p.stateAggregator.Start()
114	p.bg.Start()
115	// startPriority can be called when
116	// 1. first EDS resp, start p0
117	// 2. a high priority goes Failure, start next
118	// 3. a high priority init timeout, start next
119	//
120	// In all the cases, the existing init timer is either closed, also already
121	// expired. There's no need to close the old timer.
122	edsImpl.priorityInitTimer = time.AfterFunc(defaultPriorityInitTimeout, func() {
123		edsImpl.priorityMu.Lock()
124		defer edsImpl.priorityMu.Unlock()
125		if !edsImpl.priorityInUse.isSet() || !edsImpl.priorityInUse.equal(priority) {
126			return
127		}
128		edsImpl.priorityInitTimer = nil
129		pNext := priority.nextLower()
130		if _, ok := edsImpl.priorityToLocalities[pNext]; ok {
131			edsImpl.startPriority(pNext)
132		}
133	})
134}
135
136// handlePriorityWithNewState start/close priorities based on the connectivity
137// state. It returns whether the state should be forwarded to parent ClientConn.
138func (edsImpl *edsBalancerImpl) handlePriorityWithNewState(priority priorityType, s balancer.State) bool {
139	edsImpl.priorityMu.Lock()
140	defer edsImpl.priorityMu.Unlock()
141
142	if !edsImpl.priorityInUse.isSet() {
143		edsImpl.logger.Infof("eds: received picker update when no priority is in use (EDS returned an empty list)")
144		return false
145	}
146
147	if edsImpl.priorityInUse.higherThan(priority) {
148		// Lower priorities should all be closed, this is an unexpected update.
149		edsImpl.logger.Infof("eds: received picker update from priority lower then priorityInUse")
150		return false
151	}
152
153	bState, ok := edsImpl.priorityToState[priority]
154	if !ok {
155		bState = &balancer.State{}
156		edsImpl.priorityToState[priority] = bState
157	}
158	oldState := bState.ConnectivityState
159	*bState = s
160
161	switch s.ConnectivityState {
162	case connectivity.Ready:
163		return edsImpl.handlePriorityWithNewStateReady(priority)
164	case connectivity.TransientFailure:
165		return edsImpl.handlePriorityWithNewStateTransientFailure(priority)
166	case connectivity.Connecting:
167		return edsImpl.handlePriorityWithNewStateConnecting(priority, oldState)
168	default:
169		// New state is Idle, should never happen. Don't forward.
170		return false
171	}
172}
173
174// handlePriorityWithNewStateReady handles state Ready and decides whether to
175// forward update or not.
176//
177// An update with state Ready:
178// - If it's from higher priority:
179//   - Forward the update
180//   - Set the priority as priorityInUse
181//   - Close all priorities lower than this one
182// - If it's from priorityInUse:
183//   - Forward and do nothing else
184//
185// Caller must make sure priorityInUse is not higher than priority.
186//
187// Caller must hold priorityMu.
188func (edsImpl *edsBalancerImpl) handlePriorityWithNewStateReady(priority priorityType) bool {
189	// If one priority higher or equal to priorityInUse goes Ready, stop the
190	// init timer. If update is from higher than priorityInUse,
191	// priorityInUse will be closed, and the init timer will become useless.
192	if timer := edsImpl.priorityInitTimer; timer != nil {
193		timer.Stop()
194		edsImpl.priorityInitTimer = nil
195	}
196
197	if edsImpl.priorityInUse.lowerThan(priority) {
198		edsImpl.logger.Infof("Switching priority from %v to %v, because latter became Ready", edsImpl.priorityInUse, priority)
199		edsImpl.priorityInUse = priority
200		for i := priority.nextLower(); !i.lowerThan(edsImpl.priorityLowest); i = i.nextLower() {
201			bgwc := edsImpl.priorityToLocalities[i]
202			bgwc.stateAggregator.Stop()
203			bgwc.bg.Close()
204		}
205		return true
206	}
207	return true
208}
209
210// handlePriorityWithNewStateTransientFailure handles state TransientFailure and
211// decides whether to forward update or not.
212//
213// An update with state Failure:
214// - If it's from a higher priority:
215//   - Do not forward, and do nothing
216// - If it's from priorityInUse:
217//   - If there's no lower:
218//     - Forward and do nothing else
219//   - If there's a lower priority:
220//     - Forward
221//     - Set lower as priorityInUse
222//     - Start lower
223//
224// Caller must make sure priorityInUse is not higher than priority.
225//
226// Caller must hold priorityMu.
227func (edsImpl *edsBalancerImpl) handlePriorityWithNewStateTransientFailure(priority priorityType) bool {
228	if edsImpl.priorityInUse.lowerThan(priority) {
229		return false
230	}
231	// priorityInUse sends a failure. Stop its init timer.
232	if timer := edsImpl.priorityInitTimer; timer != nil {
233		timer.Stop()
234		edsImpl.priorityInitTimer = nil
235	}
236	pNext := priority.nextLower()
237	if _, okNext := edsImpl.priorityToLocalities[pNext]; !okNext {
238		return true
239	}
240	edsImpl.logger.Infof("Switching priority from %v to %v, because former became TransientFailure", priority, pNext)
241	edsImpl.startPriority(pNext)
242	return true
243}
244
245// handlePriorityWithNewStateConnecting handles state Connecting and decides
246// whether to forward update or not.
247//
248// An update with state Connecting:
249// - If it's from a higher priority
250//   - Do nothing
251// - If it's from priorityInUse, the behavior depends on previous state.
252//
253// When new state is Connecting, the behavior depends on previous state. If the
254// previous state was Ready, this is a transition out from Ready to Connecting.
255// Assuming there are multiple backends in the same priority, this mean we are
256// in a bad situation and we should failover to the next priority (Side note:
257// the current connectivity state aggregating algorhtim (e.g. round-robin) is
258// not handling this right, because if many backends all go from Ready to
259// Connecting, the overall situation is more like TransientFailure, not
260// Connecting).
261//
262// If the previous state was Idle, we don't do anything special with failure,
263// and simply forward the update. The init timer should be in process, will
264// handle failover if it timeouts. If the previous state was TransientFailure,
265// we do not forward, because the lower priority is in use.
266//
267// Caller must make sure priorityInUse is not higher than priority.
268//
269// Caller must hold priorityMu.
270func (edsImpl *edsBalancerImpl) handlePriorityWithNewStateConnecting(priority priorityType, oldState connectivity.State) bool {
271	if edsImpl.priorityInUse.lowerThan(priority) {
272		return false
273	}
274
275	switch oldState {
276	case connectivity.Ready:
277		pNext := priority.nextLower()
278		if _, okNext := edsImpl.priorityToLocalities[pNext]; !okNext {
279			return true
280		}
281		edsImpl.logger.Infof("Switching priority from %v to %v, because former became Connecting from Ready", priority, pNext)
282		edsImpl.startPriority(pNext)
283		return true
284	case connectivity.Idle:
285		return true
286	case connectivity.TransientFailure:
287		return false
288	default:
289		// Old state is Connecting or Shutdown. Don't forward.
290		return false
291	}
292}
293
294// priorityType represents the priority from EDS response.
295//
296// 0 is the highest priority. The bigger the number, the lower the priority.
297type priorityType struct {
298	set bool
299	p   uint32
300}
301
302func newPriorityType(p uint32) priorityType {
303	return priorityType{
304		set: true,
305		p:   p,
306	}
307}
308
309func newPriorityTypeUnset() priorityType {
310	return priorityType{}
311}
312
313func (p priorityType) isSet() bool {
314	return p.set
315}
316
317func (p priorityType) equal(p2 priorityType) bool {
318	if !p.isSet() && !p2.isSet() {
319		return true
320	}
321	if !p.isSet() || !p2.isSet() {
322		return false
323	}
324	return p == p2
325}
326
327func (p priorityType) higherThan(p2 priorityType) bool {
328	if !p.isSet() || !p2.isSet() {
329		// TODO(menghanl): return an appropriate value instead of panic.
330		panic("priority unset")
331	}
332	return p.p < p2.p
333}
334
335func (p priorityType) lowerThan(p2 priorityType) bool {
336	if !p.isSet() || !p2.isSet() {
337		// TODO(menghanl): return an appropriate value instead of panic.
338		panic("priority unset")
339	}
340	return p.p > p2.p
341}
342
343func (p priorityType) nextLower() priorityType {
344	if !p.isSet() {
345		panic("priority unset")
346	}
347	return priorityType{
348		set: true,
349		p:   p.p + 1,
350	}
351}
352
353func (p priorityType) String() string {
354	if !p.set {
355		return "Nil"
356	}
357	return fmt.Sprint(p.p)
358}
359