1/*
2 *
3 * Copyright 2019 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package edsbalancer
19
20import (
21	"time"
22
23	"google.golang.org/grpc/balancer"
24	"google.golang.org/grpc/balancer/base"
25	"google.golang.org/grpc/connectivity"
26	"google.golang.org/grpc/grpclog"
27)
28
29// handlePriorityChange handles priority after EDS adds/removes a
30// priority.
31//
32// - If all priorities were deleted, unset priorityInUse, and set parent
33// ClientConn to TransientFailure
34// - If priorityInUse wasn't set, this is either the first EDS resp, or the
35// previous EDS resp deleted everything. Set priorityInUse to 0, and start 0.
36// - If priorityInUse was deleted, send the picker from the new lowest priority
37// to parent ClientConn, and set priorityInUse to the new lowest.
38// - If priorityInUse has a non-Ready state, and also there's a priority lower
39// than priorityInUse (which means a lower priority was added), set the next
40// priority as new priorityInUse, and start the bg.
41func (xdsB *EDSBalancer) handlePriorityChange() {
42	xdsB.priorityMu.Lock()
43	defer xdsB.priorityMu.Unlock()
44
45	// Everything was removed by EDS.
46	if !xdsB.priorityLowest.isSet() {
47		xdsB.priorityInUse = newPriorityTypeUnset()
48		xdsB.cc.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure, Picker: base.NewErrPickerV2(balancer.ErrTransientFailure)})
49		return
50	}
51
52	// priorityInUse wasn't set, use 0.
53	if !xdsB.priorityInUse.isSet() {
54		xdsB.startPriority(newPriorityType(0))
55		return
56	}
57
58	// priorityInUse was deleted, use the new lowest.
59	if _, ok := xdsB.priorityToLocalities[xdsB.priorityInUse]; !ok {
60		xdsB.priorityInUse = xdsB.priorityLowest
61		if s, ok := xdsB.priorityToState[xdsB.priorityLowest]; ok {
62			xdsB.cc.UpdateState(*s)
63		} else {
64			// If state for priorityLowest is not found, this means priorityLowest was
65			// started, but never sent any update. The init timer fired and
66			// triggered the next priority. The old_priorityInUse (that was just
67			// deleted EDS) was picked later.
68			//
69			// We don't have an old state to send to parent, but we also don't
70			// want parent to keep using picker from old_priorityInUse. Send an
71			// update to trigger block picks until a new picker is ready.
72			xdsB.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Connecting, Picker: base.NewErrPickerV2(balancer.ErrNoSubConnAvailable)})
73		}
74		return
75	}
76
77	// priorityInUse is not ready, look for next priority, and use if found.
78	if s, ok := xdsB.priorityToState[xdsB.priorityInUse]; ok && s.ConnectivityState != connectivity.Ready {
79		pNext := xdsB.priorityInUse.nextLower()
80		if _, ok := xdsB.priorityToLocalities[pNext]; ok {
81			xdsB.startPriority(pNext)
82		}
83	}
84}
85
86// startPriority sets priorityInUse to p, and starts the balancer group for p.
87// It also starts a timer to fall to next priority after timeout.
88//
89// Caller must hold priorityMu, priority must exist, and xdsB.priorityInUse must
90// be non-nil.
91func (xdsB *EDSBalancer) startPriority(priority priorityType) {
92	xdsB.priorityInUse = priority
93	p := xdsB.priorityToLocalities[priority]
94	// NOTE: this will eventually send addresses to sub-balancers. If the
95	// sub-balancer tries to update picker, it will result in a deadlock on
96	// priorityMu. But it's not an expected behavior for the balancer to
97	// update picker when handling addresses.
98	p.bg.start()
99	// startPriority can be called when
100	// 1. first EDS resp, start p0
101	// 2. a high priority goes Failure, start next
102	// 3. a high priority init timeout, start next
103	//
104	// In all the cases, the existing init timer is either closed, also already
105	// expired. There's no need to close the old timer.
106	xdsB.priorityInitTimer = time.AfterFunc(defaultPriorityInitTimeout, func() {
107		xdsB.priorityMu.Lock()
108		defer xdsB.priorityMu.Unlock()
109		if !xdsB.priorityInUse.equal(priority) {
110			return
111		}
112		xdsB.priorityInitTimer = nil
113		pNext := priority.nextLower()
114		if _, ok := xdsB.priorityToLocalities[pNext]; ok {
115			xdsB.startPriority(pNext)
116		}
117	})
118}
119
120// handlePriorityWithNewState start/close priorities based on the connectivity
121// state. It returns whether the state should be forwarded to parent ClientConn.
122func (xdsB *EDSBalancer) handlePriorityWithNewState(priority priorityType, s balancer.State) bool {
123	xdsB.priorityMu.Lock()
124	defer xdsB.priorityMu.Unlock()
125
126	if !xdsB.priorityInUse.isSet() {
127		grpclog.Infof("eds: received picker update when no priority is in use (EDS returned an empty list)")
128		return false
129	}
130
131	if xdsB.priorityInUse.higherThan(priority) {
132		// Lower priorities should all be closed, this is an unexpected update.
133		grpclog.Infof("eds: received picker update from priority lower then priorityInUse")
134		return false
135	}
136
137	bState, ok := xdsB.priorityToState[priority]
138	if !ok {
139		bState = &balancer.State{}
140		xdsB.priorityToState[priority] = bState
141	}
142	oldState := bState.ConnectivityState
143	*bState = s
144
145	switch s.ConnectivityState {
146	case connectivity.Ready:
147		return xdsB.handlePriorityWithNewStateReady(priority)
148	case connectivity.TransientFailure:
149		return xdsB.handlePriorityWithNewStateTransientFailure(priority)
150	case connectivity.Connecting:
151		return xdsB.handlePriorityWithNewStateConnecting(priority, oldState)
152	default:
153		// New state is Idle, should never happen. Don't forward.
154		return false
155	}
156}
157
158// handlePriorityWithNewStateReady handles state Ready and decides whether to
159// forward update or not.
160//
161// An update with state Ready:
162// - If it's from higher priority:
163//   - Forward the update
164//   - Set the priority as priorityInUse
165//   - Close all priorities lower than this one
166// - If it's from priorityInUse:
167//   - Forward and do nothing else
168//
169// Caller must make sure priorityInUse is not higher than priority.
170//
171// Caller must hold priorityMu.
172func (xdsB *EDSBalancer) handlePriorityWithNewStateReady(priority priorityType) bool {
173	// If one priority higher or equal to priorityInUse goes Ready, stop the
174	// init timer. If update is from higher than priorityInUse,
175	// priorityInUse will be closed, and the init timer will become useless.
176	if timer := xdsB.priorityInitTimer; timer != nil {
177		timer.Stop()
178		xdsB.priorityInitTimer = nil
179	}
180
181	if xdsB.priorityInUse.lowerThan(priority) {
182		xdsB.priorityInUse = priority
183		for i := priority.nextLower(); !i.lowerThan(xdsB.priorityLowest); i = i.nextLower() {
184			xdsB.priorityToLocalities[i].bg.close()
185		}
186		return true
187	}
188	return true
189}
190
191// handlePriorityWithNewStateTransientFailure handles state TransientFailure and
192// decides whether to forward update or not.
193//
194// An update with state Failure:
195// - If it's from a higher priority:
196//   - Do not forward, and do nothing
197// - If it's from priorityInUse:
198//   - If there's no lower:
199//     - Forward and do nothing else
200//   - If there's a lower priority:
201//     - Forward
202//     - Set lower as priorityInUse
203//     - Start lower
204//
205// Caller must make sure priorityInUse is not higher than priority.
206//
207// Caller must hold priorityMu.
208func (xdsB *EDSBalancer) handlePriorityWithNewStateTransientFailure(priority priorityType) bool {
209	if xdsB.priorityInUse.lowerThan(priority) {
210		return false
211	}
212	// priorityInUse sends a failure. Stop its init timer.
213	if timer := xdsB.priorityInitTimer; timer != nil {
214		timer.Stop()
215		xdsB.priorityInitTimer = nil
216	}
217	pNext := priority.nextLower()
218	if _, okNext := xdsB.priorityToLocalities[pNext]; !okNext {
219		return true
220	}
221	xdsB.startPriority(pNext)
222	return true
223}
224
225// handlePriorityWithNewStateConnecting handles state Connecting and decides
226// whether to forward update or not.
227//
228// An update with state Connecting:
229// - If it's from a higher priority
230//   - Do nothing
231// - If it's from priorityInUse, the behavior depends on previous state.
232//
233// When new state is Connecting, the behavior depends on previous state. If the
234// previous state was Ready, this is a transition out from Ready to Connecting.
235// Assuming there are multiple backends in the same priority, this mean we are
236// in a bad situation and we should failover to the next priority (Side note:
237// the current connectivity state aggregating algorhtim (e.g. round-robin) is
238// not handling this right, because if many backends all go from Ready to
239// Connecting, the overall situation is more like TransientFailure, not
240// Connecting).
241//
242// If the previous state was Idle, we don't do anything special with failure,
243// and simply forward the update. The init timer should be in process, will
244// handle failover if it timeouts. If the previous state was TransientFailure,
245// we do not forward, because the lower priority is in use.
246//
247// Caller must make sure priorityInUse is not higher than priority.
248//
249// Caller must hold priorityMu.
250func (xdsB *EDSBalancer) handlePriorityWithNewStateConnecting(priority priorityType, oldState connectivity.State) bool {
251	if xdsB.priorityInUse.lowerThan(priority) {
252		return false
253	}
254
255	switch oldState {
256	case connectivity.Ready:
257		pNext := priority.nextLower()
258		if _, okNext := xdsB.priorityToLocalities[pNext]; !okNext {
259			return true
260		}
261		xdsB.startPriority(pNext)
262		return true
263	case connectivity.Idle:
264		return true
265	case connectivity.TransientFailure:
266		return false
267	default:
268		// Old state is Connecting or Shutdown. Don't forward.
269		return false
270	}
271}
272
273// priorityType represents the priority from EDS response.
274//
275// 0 is the highest priority. The bigger the number, the lower the priority.
276type priorityType struct {
277	set bool
278	p   uint32
279}
280
281func newPriorityType(p uint32) priorityType {
282	return priorityType{
283		set: true,
284		p:   p,
285	}
286}
287
288func newPriorityTypeUnset() priorityType {
289	return priorityType{}
290}
291
292func (p priorityType) isSet() bool {
293	return p.set
294}
295
296func (p priorityType) equal(p2 priorityType) bool {
297	if !p.isSet() || !p2.isSet() {
298		panic("priority unset")
299	}
300	return p == p2
301}
302
303func (p priorityType) higherThan(p2 priorityType) bool {
304	if !p.isSet() || !p2.isSet() {
305		panic("priority unset")
306	}
307	return p.p < p2.p
308}
309
310func (p priorityType) lowerThan(p2 priorityType) bool {
311	if !p.isSet() || !p2.isSet() {
312		panic("priority unset")
313	}
314	return p.p > p2.p
315}
316
317func (p priorityType) nextLower() priorityType {
318	if !p.isSet() {
319		panic("priority unset")
320	}
321	return priorityType{
322		set: true,
323		p:   p.p + 1,
324	}
325}
326