1/*
2 *
3 * Copyright 2020 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package xdsrouting
20
21import (
22	"fmt"
23	"sync"
24
25	"google.golang.org/grpc/balancer"
26	"google.golang.org/grpc/balancer/base"
27	"google.golang.org/grpc/connectivity"
28	"google.golang.org/grpc/internal/grpclog"
29)
30
31type subBalancerState struct {
32	state balancer.State
33	// stateToAggregate is the connectivity state used only for state
34	// aggregation. It could be different from state.ConnectivityState. For
35	// example when a sub-balancer transitions from TransientFailure to
36	// connecting, state.ConnectivityState is Connecting, but stateToAggregate
37	// is still TransientFailure.
38	stateToAggregate connectivity.State
39}
40
41func (s *subBalancerState) String() string {
42	return fmt.Sprintf("picker:%p,state:%v,stateToAggregate:%v", s.state.Picker, s.state.ConnectivityState, s.stateToAggregate)
43}
44
45type balancerStateAggregator struct {
46	cc     balancer.ClientConn
47	logger *grpclog.PrefixLogger
48
49	mu sync.Mutex
50	// routes, one for each matcher.
51	routes []route
52	// If started is false, no updates should be sent to the parent cc. A closed
53	// sub-balancer could still send pickers to this aggregator. This makes sure
54	// that no updates will be forwarded to parent when the whole balancer group
55	// and states aggregator is closed.
56	started bool
57	// All balancer IDs exist as keys in this map, even if balancer group is not
58	// started.
59	//
60	// If an ID is not in map, it's either removed or never added.
61	idToPickerState map[string]*subBalancerState
62}
63
64func newBalancerStateAggregator(cc balancer.ClientConn, logger *grpclog.PrefixLogger) *balancerStateAggregator {
65	return &balancerStateAggregator{
66		cc:              cc,
67		logger:          logger,
68		idToPickerState: make(map[string]*subBalancerState),
69	}
70}
71
72// Start starts the aggregator. It can be called after Close to restart the
73// aggretator.
74func (rbsa *balancerStateAggregator) start() {
75	rbsa.mu.Lock()
76	defer rbsa.mu.Unlock()
77	rbsa.started = true
78}
79
80// Close closes the aggregator. When the aggregator is closed, it won't call
81// parent ClientConn to update balancer state.
82func (rbsa *balancerStateAggregator) close() {
83	rbsa.mu.Lock()
84	defer rbsa.mu.Unlock()
85	rbsa.started = false
86	rbsa.clearStates()
87}
88
89// add adds a sub-balancer state with weight. It adds a place holder, and waits
90// for the real sub-balancer to update state.
91//
92// This is called when there's a new action.
93func (rbsa *balancerStateAggregator) add(id string) {
94	rbsa.mu.Lock()
95	defer rbsa.mu.Unlock()
96	rbsa.idToPickerState[id] = &subBalancerState{
97		// Start everything in CONNECTING, so if one of the sub-balancers
98		// reports TransientFailure, the RPCs will still wait for the other
99		// sub-balancers.
100		state: balancer.State{
101			ConnectivityState: connectivity.Connecting,
102			Picker:            base.NewErrPicker(balancer.ErrNoSubConnAvailable),
103		},
104		stateToAggregate: connectivity.Connecting,
105	}
106}
107
108// remove removes the sub-balancer state. Future updates from this sub-balancer,
109// if any, will be ignored.
110//
111// This is called when an action is removed.
112func (rbsa *balancerStateAggregator) remove(id string) {
113	rbsa.mu.Lock()
114	defer rbsa.mu.Unlock()
115	if _, ok := rbsa.idToPickerState[id]; !ok {
116		return
117	}
118	// Remove id and picker from picker map. This also results in future updates
119	// for this ID to be ignored.
120	delete(rbsa.idToPickerState, id)
121}
122
123// updateRoutes updates the routes. Note that it doesn't trigger an update to
124// the parent ClientConn. The caller should decide when it's necessary, and call
125// buildAndUpdate.
126func (rbsa *balancerStateAggregator) updateRoutes(newRoutes []route) {
127	rbsa.mu.Lock()
128	defer rbsa.mu.Unlock()
129	rbsa.routes = newRoutes
130}
131
132// UpdateState is called to report a balancer state change from sub-balancer.
133// It's usually called by the balancer group.
134//
135// It calls parent ClientConn's UpdateState with the new aggregated state.
136func (rbsa *balancerStateAggregator) UpdateState(id string, state balancer.State) {
137	rbsa.mu.Lock()
138	defer rbsa.mu.Unlock()
139	pickerSt, ok := rbsa.idToPickerState[id]
140	if !ok {
141		// All state starts with an entry in pickStateMap. If ID is not in map,
142		// it's either removed, or never existed.
143		return
144	}
145	if !(pickerSt.state.ConnectivityState == connectivity.TransientFailure && state.ConnectivityState == connectivity.Connecting) {
146		// If old state is TransientFailure, and new state is Connecting, don't
147		// update the state, to prevent the aggregated state from being always
148		// CONNECTING. Otherwise, stateToAggregate is the same as
149		// state.ConnectivityState.
150		pickerSt.stateToAggregate = state.ConnectivityState
151	}
152	pickerSt.state = state
153
154	if !rbsa.started {
155		return
156	}
157	rbsa.cc.UpdateState(rbsa.build())
158}
159
160// clearState Reset everything to init state (Connecting) but keep the entry in
161// map (to keep the weight).
162//
163// Caller must hold rbsa.mu.
164func (rbsa *balancerStateAggregator) clearStates() {
165	for _, pState := range rbsa.idToPickerState {
166		pState.state = balancer.State{
167			ConnectivityState: connectivity.Connecting,
168			Picker:            base.NewErrPicker(balancer.ErrNoSubConnAvailable),
169		}
170		pState.stateToAggregate = connectivity.Connecting
171	}
172}
173
174// buildAndUpdate combines the sub-state from each sub-balancer into one state,
175// and update it to parent ClientConn.
176func (rbsa *balancerStateAggregator) buildAndUpdate() {
177	rbsa.mu.Lock()
178	defer rbsa.mu.Unlock()
179	if !rbsa.started {
180		return
181	}
182	rbsa.cc.UpdateState(rbsa.build())
183}
184
185// build combines sub-states into one. The picker will do routing pick.
186//
187// Caller must hold rbsa.mu.
188func (rbsa *balancerStateAggregator) build() balancer.State {
189	// TODO: the majority of this function (and UpdateState) is exactly the same
190	// as weighted_target's state aggregator. Try to make a general utility
191	// function/struct to handle the logic.
192	//
193	// One option: make a SubBalancerState that handles Update(State), including
194	// handling the special connecting after ready, as in UpdateState(). Then a
195	// function to calculate the aggregated connectivity state as in this
196	// function.
197	var readyN, connectingN int
198	for _, ps := range rbsa.idToPickerState {
199		switch ps.stateToAggregate {
200		case connectivity.Ready:
201			readyN++
202		case connectivity.Connecting:
203			connectingN++
204		}
205	}
206	var aggregatedState connectivity.State
207	switch {
208	case readyN > 0:
209		aggregatedState = connectivity.Ready
210	case connectingN > 0:
211		aggregatedState = connectivity.Connecting
212	default:
213		aggregatedState = connectivity.TransientFailure
214	}
215
216	// The picker's return error might not be consistent with the
217	// aggregatedState. Because for routing, we want to always build picker with
218	// all sub-pickers (not even ready sub-pickers), so even if the overall
219	// state is Ready, pick for certain RPCs can behave like Connecting or
220	// TransientFailure.
221	rbsa.logger.Infof("Child pickers with routes: %s, actions: %+v", rbsa.routes, rbsa.idToPickerState)
222	return balancer.State{
223		ConnectivityState: aggregatedState,
224		Picker:            newPickerGroup(rbsa.routes, rbsa.idToPickerState),
225	}
226}
227