1/*
2 *
3 * Copyright 2020 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package clustermanager
20
21import (
22	"fmt"
23	"sync"
24
25	"google.golang.org/grpc/balancer"
26	"google.golang.org/grpc/balancer/base"
27	"google.golang.org/grpc/connectivity"
28	"google.golang.org/grpc/internal/grpclog"
29)
30
31type subBalancerState struct {
32	state balancer.State
33	// stateToAggregate is the connectivity state used only for state
34	// aggregation. It could be different from state.ConnectivityState. For
35	// example when a sub-balancer transitions from TransientFailure to
36	// connecting, state.ConnectivityState is Connecting, but stateToAggregate
37	// is still TransientFailure.
38	stateToAggregate connectivity.State
39}
40
41func (s *subBalancerState) String() string {
42	return fmt.Sprintf("picker:%p,state:%v,stateToAggregate:%v", s.state.Picker, s.state.ConnectivityState, s.stateToAggregate)
43}
44
45type balancerStateAggregator struct {
46	cc     balancer.ClientConn
47	logger *grpclog.PrefixLogger
48
49	mu sync.Mutex
50	// If started is false, no updates should be sent to the parent cc. A closed
51	// sub-balancer could still send pickers to this aggregator. This makes sure
52	// that no updates will be forwarded to parent when the whole balancer group
53	// and states aggregator is closed.
54	started bool
55	// All balancer IDs exist as keys in this map, even if balancer group is not
56	// started.
57	//
58	// If an ID is not in map, it's either removed or never added.
59	idToPickerState map[string]*subBalancerState
60}
61
62func newBalancerStateAggregator(cc balancer.ClientConn, logger *grpclog.PrefixLogger) *balancerStateAggregator {
63	return &balancerStateAggregator{
64		cc:              cc,
65		logger:          logger,
66		idToPickerState: make(map[string]*subBalancerState),
67	}
68}
69
70// Start starts the aggregator. It can be called after Close to restart the
71// aggretator.
72func (bsa *balancerStateAggregator) start() {
73	bsa.mu.Lock()
74	defer bsa.mu.Unlock()
75	bsa.started = true
76}
77
78// Close closes the aggregator. When the aggregator is closed, it won't call
79// parent ClientConn to update balancer state.
80func (bsa *balancerStateAggregator) close() {
81	bsa.mu.Lock()
82	defer bsa.mu.Unlock()
83	bsa.started = false
84	bsa.clearStates()
85}
86
87// add adds a sub-balancer state with weight. It adds a place holder, and waits
88// for the real sub-balancer to update state.
89//
90// This is called when there's a new child.
91func (bsa *balancerStateAggregator) add(id string) {
92	bsa.mu.Lock()
93	defer bsa.mu.Unlock()
94	bsa.idToPickerState[id] = &subBalancerState{
95		// Start everything in CONNECTING, so if one of the sub-balancers
96		// reports TransientFailure, the RPCs will still wait for the other
97		// sub-balancers.
98		state: balancer.State{
99			ConnectivityState: connectivity.Connecting,
100			Picker:            base.NewErrPicker(balancer.ErrNoSubConnAvailable),
101		},
102		stateToAggregate: connectivity.Connecting,
103	}
104}
105
106// remove removes the sub-balancer state. Future updates from this sub-balancer,
107// if any, will be ignored.
108//
109// This is called when a child is removed.
110func (bsa *balancerStateAggregator) remove(id string) {
111	bsa.mu.Lock()
112	defer bsa.mu.Unlock()
113	if _, ok := bsa.idToPickerState[id]; !ok {
114		return
115	}
116	// Remove id and picker from picker map. This also results in future updates
117	// for this ID to be ignored.
118	delete(bsa.idToPickerState, id)
119}
120
121// UpdateState is called to report a balancer state change from sub-balancer.
122// It's usually called by the balancer group.
123//
124// It calls parent ClientConn's UpdateState with the new aggregated state.
125func (bsa *balancerStateAggregator) UpdateState(id string, state balancer.State) {
126	bsa.mu.Lock()
127	defer bsa.mu.Unlock()
128	pickerSt, ok := bsa.idToPickerState[id]
129	if !ok {
130		// All state starts with an entry in pickStateMap. If ID is not in map,
131		// it's either removed, or never existed.
132		return
133	}
134	if !(pickerSt.state.ConnectivityState == connectivity.TransientFailure && state.ConnectivityState == connectivity.Connecting) {
135		// If old state is TransientFailure, and new state is Connecting, don't
136		// update the state, to prevent the aggregated state from being always
137		// CONNECTING. Otherwise, stateToAggregate is the same as
138		// state.ConnectivityState.
139		pickerSt.stateToAggregate = state.ConnectivityState
140	}
141	pickerSt.state = state
142
143	if !bsa.started {
144		return
145	}
146	bsa.cc.UpdateState(bsa.build())
147}
148
149// clearState Reset everything to init state (Connecting) but keep the entry in
150// map (to keep the weight).
151//
152// Caller must hold bsa.mu.
153func (bsa *balancerStateAggregator) clearStates() {
154	for _, pState := range bsa.idToPickerState {
155		pState.state = balancer.State{
156			ConnectivityState: connectivity.Connecting,
157			Picker:            base.NewErrPicker(balancer.ErrNoSubConnAvailable),
158		}
159		pState.stateToAggregate = connectivity.Connecting
160	}
161}
162
163// buildAndUpdate combines the sub-state from each sub-balancer into one state,
164// and update it to parent ClientConn.
165func (bsa *balancerStateAggregator) buildAndUpdate() {
166	bsa.mu.Lock()
167	defer bsa.mu.Unlock()
168	if !bsa.started {
169		return
170	}
171	bsa.cc.UpdateState(bsa.build())
172}
173
174// build combines sub-states into one. The picker will do a child pick.
175//
176// Caller must hold bsa.mu.
177func (bsa *balancerStateAggregator) build() balancer.State {
178	// TODO: the majority of this function (and UpdateState) is exactly the same
179	// as weighted_target's state aggregator. Try to make a general utility
180	// function/struct to handle the logic.
181	//
182	// One option: make a SubBalancerState that handles Update(State), including
183	// handling the special connecting after ready, as in UpdateState(). Then a
184	// function to calculate the aggregated connectivity state as in this
185	// function.
186	var readyN, connectingN int
187	for _, ps := range bsa.idToPickerState {
188		switch ps.stateToAggregate {
189		case connectivity.Ready:
190			readyN++
191		case connectivity.Connecting:
192			connectingN++
193		}
194	}
195	var aggregatedState connectivity.State
196	switch {
197	case readyN > 0:
198		aggregatedState = connectivity.Ready
199	case connectingN > 0:
200		aggregatedState = connectivity.Connecting
201	default:
202		aggregatedState = connectivity.TransientFailure
203	}
204
205	// The picker's return error might not be consistent with the
206	// aggregatedState. Because for this LB policy, we want to always build
207	// picker with all sub-pickers (not only ready sub-pickers), so even if the
208	// overall state is Ready, pick for certain RPCs can behave like Connecting
209	// or TransientFailure.
210	bsa.logger.Infof("Child pickers: %+v", bsa.idToPickerState)
211	return balancer.State{
212		ConnectivityState: aggregatedState,
213		Picker:            newPickerGroup(bsa.idToPickerState),
214	}
215}
216