1/* 2 * 3 * Copyright 2020 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package xdsrouting 20 21import ( 22 "fmt" 23 "sync" 24 25 "google.golang.org/grpc/balancer" 26 "google.golang.org/grpc/balancer/base" 27 "google.golang.org/grpc/connectivity" 28 "google.golang.org/grpc/internal/grpclog" 29) 30 31type subBalancerState struct { 32 state balancer.State 33 // stateToAggregate is the connectivity state used only for state 34 // aggregation. It could be different from state.ConnectivityState. For 35 // example when a sub-balancer transitions from TransientFailure to 36 // connecting, state.ConnectivityState is Connecting, but stateToAggregate 37 // is still TransientFailure. 38 stateToAggregate connectivity.State 39} 40 41func (s *subBalancerState) String() string { 42 return fmt.Sprintf("picker:%p,state:%v,stateToAggregate:%v", s.state.Picker, s.state.ConnectivityState, s.stateToAggregate) 43} 44 45type balancerStateAggregator struct { 46 cc balancer.ClientConn 47 logger *grpclog.PrefixLogger 48 49 mu sync.Mutex 50 // routes, one for each matcher. 51 routes []route 52 // If started is false, no updates should be sent to the parent cc. A closed 53 // sub-balancer could still send pickers to this aggregator. This makes sure 54 // that no updates will be forwarded to parent when the whole balancer group 55 // and states aggregator is closed. 56 started bool 57 // All balancer IDs exist as keys in this map, even if balancer group is not 58 // started. 59 // 60 // If an ID is not in map, it's either removed or never added. 61 idToPickerState map[string]*subBalancerState 62} 63 64func newBalancerStateAggregator(cc balancer.ClientConn, logger *grpclog.PrefixLogger) *balancerStateAggregator { 65 return &balancerStateAggregator{ 66 cc: cc, 67 logger: logger, 68 idToPickerState: make(map[string]*subBalancerState), 69 } 70} 71 72// Start starts the aggregator. It can be called after Close to restart the 73// aggretator. 74func (rbsa *balancerStateAggregator) start() { 75 rbsa.mu.Lock() 76 defer rbsa.mu.Unlock() 77 rbsa.started = true 78} 79 80// Close closes the aggregator. When the aggregator is closed, it won't call 81// parent ClientConn to update balancer state. 82func (rbsa *balancerStateAggregator) close() { 83 rbsa.mu.Lock() 84 defer rbsa.mu.Unlock() 85 rbsa.started = false 86 rbsa.clearStates() 87} 88 89// add adds a sub-balancer state with weight. It adds a place holder, and waits 90// for the real sub-balancer to update state. 91// 92// This is called when there's a new action. 93func (rbsa *balancerStateAggregator) add(id string) { 94 rbsa.mu.Lock() 95 defer rbsa.mu.Unlock() 96 rbsa.idToPickerState[id] = &subBalancerState{ 97 // Start everything in CONNECTING, so if one of the sub-balancers 98 // reports TransientFailure, the RPCs will still wait for the other 99 // sub-balancers. 100 state: balancer.State{ 101 ConnectivityState: connectivity.Connecting, 102 Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable), 103 }, 104 stateToAggregate: connectivity.Connecting, 105 } 106} 107 108// remove removes the sub-balancer state. Future updates from this sub-balancer, 109// if any, will be ignored. 110// 111// This is called when an action is removed. 112func (rbsa *balancerStateAggregator) remove(id string) { 113 rbsa.mu.Lock() 114 defer rbsa.mu.Unlock() 115 if _, ok := rbsa.idToPickerState[id]; !ok { 116 return 117 } 118 // Remove id and picker from picker map. This also results in future updates 119 // for this ID to be ignored. 120 delete(rbsa.idToPickerState, id) 121} 122 123// updateRoutes updates the routes. Note that it doesn't trigger an update to 124// the parent ClientConn. The caller should decide when it's necessary, and call 125// buildAndUpdate. 126func (rbsa *balancerStateAggregator) updateRoutes(newRoutes []route) { 127 rbsa.mu.Lock() 128 defer rbsa.mu.Unlock() 129 rbsa.routes = newRoutes 130} 131 132// UpdateState is called to report a balancer state change from sub-balancer. 133// It's usually called by the balancer group. 134// 135// It calls parent ClientConn's UpdateState with the new aggregated state. 136func (rbsa *balancerStateAggregator) UpdateState(id string, state balancer.State) { 137 rbsa.mu.Lock() 138 defer rbsa.mu.Unlock() 139 pickerSt, ok := rbsa.idToPickerState[id] 140 if !ok { 141 // All state starts with an entry in pickStateMap. If ID is not in map, 142 // it's either removed, or never existed. 143 return 144 } 145 if !(pickerSt.state.ConnectivityState == connectivity.TransientFailure && state.ConnectivityState == connectivity.Connecting) { 146 // If old state is TransientFailure, and new state is Connecting, don't 147 // update the state, to prevent the aggregated state from being always 148 // CONNECTING. Otherwise, stateToAggregate is the same as 149 // state.ConnectivityState. 150 pickerSt.stateToAggregate = state.ConnectivityState 151 } 152 pickerSt.state = state 153 154 if !rbsa.started { 155 return 156 } 157 rbsa.cc.UpdateState(rbsa.build()) 158} 159 160// clearState Reset everything to init state (Connecting) but keep the entry in 161// map (to keep the weight). 162// 163// Caller must hold rbsa.mu. 164func (rbsa *balancerStateAggregator) clearStates() { 165 for _, pState := range rbsa.idToPickerState { 166 pState.state = balancer.State{ 167 ConnectivityState: connectivity.Connecting, 168 Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable), 169 } 170 pState.stateToAggregate = connectivity.Connecting 171 } 172} 173 174// buildAndUpdate combines the sub-state from each sub-balancer into one state, 175// and update it to parent ClientConn. 176func (rbsa *balancerStateAggregator) buildAndUpdate() { 177 rbsa.mu.Lock() 178 defer rbsa.mu.Unlock() 179 if !rbsa.started { 180 return 181 } 182 rbsa.cc.UpdateState(rbsa.build()) 183} 184 185// build combines sub-states into one. The picker will do routing pick. 186// 187// Caller must hold rbsa.mu. 188func (rbsa *balancerStateAggregator) build() balancer.State { 189 // TODO: the majority of this function (and UpdateState) is exactly the same 190 // as weighted_target's state aggregator. Try to make a general utility 191 // function/struct to handle the logic. 192 // 193 // One option: make a SubBalancerState that handles Update(State), including 194 // handling the special connecting after ready, as in UpdateState(). Then a 195 // function to calculate the aggregated connectivity state as in this 196 // function. 197 var readyN, connectingN int 198 for _, ps := range rbsa.idToPickerState { 199 switch ps.stateToAggregate { 200 case connectivity.Ready: 201 readyN++ 202 case connectivity.Connecting: 203 connectingN++ 204 } 205 } 206 var aggregatedState connectivity.State 207 switch { 208 case readyN > 0: 209 aggregatedState = connectivity.Ready 210 case connectingN > 0: 211 aggregatedState = connectivity.Connecting 212 default: 213 aggregatedState = connectivity.TransientFailure 214 } 215 216 // The picker's return error might not be consistent with the 217 // aggregatedState. Because for routing, we want to always build picker with 218 // all sub-pickers (not even ready sub-pickers), so even if the overall 219 // state is Ready, pick for certain RPCs can behave like Connecting or 220 // TransientFailure. 221 rbsa.logger.Infof("Child pickers with routes: %s, actions: %+v", rbsa.routes, rbsa.idToPickerState) 222 return balancer.State{ 223 ConnectivityState: aggregatedState, 224 Picker: newPickerGroup(rbsa.routes, rbsa.idToPickerState), 225 } 226} 227