1/* 2 * 3 * Copyright 2020 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package clustermanager 20 21import ( 22 "fmt" 23 "sync" 24 25 "google.golang.org/grpc/balancer" 26 "google.golang.org/grpc/balancer/base" 27 "google.golang.org/grpc/connectivity" 28 "google.golang.org/grpc/internal/grpclog" 29) 30 31type subBalancerState struct { 32 state balancer.State 33 // stateToAggregate is the connectivity state used only for state 34 // aggregation. It could be different from state.ConnectivityState. For 35 // example when a sub-balancer transitions from TransientFailure to 36 // connecting, state.ConnectivityState is Connecting, but stateToAggregate 37 // is still TransientFailure. 38 stateToAggregate connectivity.State 39} 40 41func (s *subBalancerState) String() string { 42 return fmt.Sprintf("picker:%p,state:%v,stateToAggregate:%v", s.state.Picker, s.state.ConnectivityState, s.stateToAggregate) 43} 44 45type balancerStateAggregator struct { 46 cc balancer.ClientConn 47 logger *grpclog.PrefixLogger 48 49 mu sync.Mutex 50 // If started is false, no updates should be sent to the parent cc. A closed 51 // sub-balancer could still send pickers to this aggregator. This makes sure 52 // that no updates will be forwarded to parent when the whole balancer group 53 // and states aggregator is closed. 54 started bool 55 // All balancer IDs exist as keys in this map, even if balancer group is not 56 // started. 57 // 58 // If an ID is not in map, it's either removed or never added. 59 idToPickerState map[string]*subBalancerState 60} 61 62func newBalancerStateAggregator(cc balancer.ClientConn, logger *grpclog.PrefixLogger) *balancerStateAggregator { 63 return &balancerStateAggregator{ 64 cc: cc, 65 logger: logger, 66 idToPickerState: make(map[string]*subBalancerState), 67 } 68} 69 70// Start starts the aggregator. It can be called after Close to restart the 71// aggretator. 72func (bsa *balancerStateAggregator) start() { 73 bsa.mu.Lock() 74 defer bsa.mu.Unlock() 75 bsa.started = true 76} 77 78// Close closes the aggregator. When the aggregator is closed, it won't call 79// parent ClientConn to update balancer state. 80func (bsa *balancerStateAggregator) close() { 81 bsa.mu.Lock() 82 defer bsa.mu.Unlock() 83 bsa.started = false 84 bsa.clearStates() 85} 86 87// add adds a sub-balancer state with weight. It adds a place holder, and waits 88// for the real sub-balancer to update state. 89// 90// This is called when there's a new child. 91func (bsa *balancerStateAggregator) add(id string) { 92 bsa.mu.Lock() 93 defer bsa.mu.Unlock() 94 bsa.idToPickerState[id] = &subBalancerState{ 95 // Start everything in CONNECTING, so if one of the sub-balancers 96 // reports TransientFailure, the RPCs will still wait for the other 97 // sub-balancers. 98 state: balancer.State{ 99 ConnectivityState: connectivity.Connecting, 100 Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable), 101 }, 102 stateToAggregate: connectivity.Connecting, 103 } 104} 105 106// remove removes the sub-balancer state. Future updates from this sub-balancer, 107// if any, will be ignored. 108// 109// This is called when a child is removed. 110func (bsa *balancerStateAggregator) remove(id string) { 111 bsa.mu.Lock() 112 defer bsa.mu.Unlock() 113 if _, ok := bsa.idToPickerState[id]; !ok { 114 return 115 } 116 // Remove id and picker from picker map. This also results in future updates 117 // for this ID to be ignored. 118 delete(bsa.idToPickerState, id) 119} 120 121// UpdateState is called to report a balancer state change from sub-balancer. 122// It's usually called by the balancer group. 123// 124// It calls parent ClientConn's UpdateState with the new aggregated state. 125func (bsa *balancerStateAggregator) UpdateState(id string, state balancer.State) { 126 bsa.mu.Lock() 127 defer bsa.mu.Unlock() 128 pickerSt, ok := bsa.idToPickerState[id] 129 if !ok { 130 // All state starts with an entry in pickStateMap. If ID is not in map, 131 // it's either removed, or never existed. 132 return 133 } 134 if !(pickerSt.state.ConnectivityState == connectivity.TransientFailure && state.ConnectivityState == connectivity.Connecting) { 135 // If old state is TransientFailure, and new state is Connecting, don't 136 // update the state, to prevent the aggregated state from being always 137 // CONNECTING. Otherwise, stateToAggregate is the same as 138 // state.ConnectivityState. 139 pickerSt.stateToAggregate = state.ConnectivityState 140 } 141 pickerSt.state = state 142 143 if !bsa.started { 144 return 145 } 146 bsa.cc.UpdateState(bsa.build()) 147} 148 149// clearState Reset everything to init state (Connecting) but keep the entry in 150// map (to keep the weight). 151// 152// Caller must hold bsa.mu. 153func (bsa *balancerStateAggregator) clearStates() { 154 for _, pState := range bsa.idToPickerState { 155 pState.state = balancer.State{ 156 ConnectivityState: connectivity.Connecting, 157 Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable), 158 } 159 pState.stateToAggregate = connectivity.Connecting 160 } 161} 162 163// buildAndUpdate combines the sub-state from each sub-balancer into one state, 164// and update it to parent ClientConn. 165func (bsa *balancerStateAggregator) buildAndUpdate() { 166 bsa.mu.Lock() 167 defer bsa.mu.Unlock() 168 if !bsa.started { 169 return 170 } 171 bsa.cc.UpdateState(bsa.build()) 172} 173 174// build combines sub-states into one. The picker will do a child pick. 175// 176// Caller must hold bsa.mu. 177func (bsa *balancerStateAggregator) build() balancer.State { 178 // TODO: the majority of this function (and UpdateState) is exactly the same 179 // as weighted_target's state aggregator. Try to make a general utility 180 // function/struct to handle the logic. 181 // 182 // One option: make a SubBalancerState that handles Update(State), including 183 // handling the special connecting after ready, as in UpdateState(). Then a 184 // function to calculate the aggregated connectivity state as in this 185 // function. 186 // 187 // TODO: use balancer.ConnectivityStateEvaluator to calculate the aggregated 188 // state. 189 var readyN, connectingN, idleN int 190 for _, ps := range bsa.idToPickerState { 191 switch ps.stateToAggregate { 192 case connectivity.Ready: 193 readyN++ 194 case connectivity.Connecting: 195 connectingN++ 196 case connectivity.Idle: 197 idleN++ 198 } 199 } 200 var aggregatedState connectivity.State 201 switch { 202 case readyN > 0: 203 aggregatedState = connectivity.Ready 204 case connectingN > 0: 205 aggregatedState = connectivity.Connecting 206 case idleN > 0: 207 aggregatedState = connectivity.Idle 208 default: 209 aggregatedState = connectivity.TransientFailure 210 } 211 212 // The picker's return error might not be consistent with the 213 // aggregatedState. Because for this LB policy, we want to always build 214 // picker with all sub-pickers (not only ready sub-pickers), so even if the 215 // overall state is Ready, pick for certain RPCs can behave like Connecting 216 // or TransientFailure. 217 bsa.logger.Infof("Child pickers: %+v", bsa.idToPickerState) 218 return balancer.State{ 219 ConnectivityState: aggregatedState, 220 Picker: newPickerGroup(bsa.idToPickerState), 221 } 222} 223