1/* 2 * 3 * Copyright 2017 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package grpc 20 21import ( 22 "fmt" 23 "sync" 24 25 "google.golang.org/grpc/balancer" 26 "google.golang.org/grpc/connectivity" 27 "google.golang.org/grpc/internal/buffer" 28 "google.golang.org/grpc/internal/channelz" 29 "google.golang.org/grpc/internal/grpcsync" 30 "google.golang.org/grpc/resolver" 31) 32 33// scStateUpdate contains the subConn and the new state it changed to. 34type scStateUpdate struct { 35 sc balancer.SubConn 36 state connectivity.State 37 err error 38} 39 40// ccBalancerWrapper is a wrapper on top of cc for balancers. 41// It implements balancer.ClientConn interface. 42type ccBalancerWrapper struct { 43 cc *ClientConn 44 balancerMu sync.Mutex // synchronizes calls to the balancer 45 balancer balancer.Balancer 46 scBuffer *buffer.Unbounded 47 done *grpcsync.Event 48 49 mu sync.Mutex 50 subConns map[*acBalancerWrapper]struct{} 51} 52 53func newCCBalancerWrapper(cc *ClientConn, b balancer.Builder, bopts balancer.BuildOptions) *ccBalancerWrapper { 54 ccb := &ccBalancerWrapper{ 55 cc: cc, 56 scBuffer: buffer.NewUnbounded(), 57 done: grpcsync.NewEvent(), 58 subConns: make(map[*acBalancerWrapper]struct{}), 59 } 60 go ccb.watcher() 61 ccb.balancer = b.Build(ccb, bopts) 62 return ccb 63} 64 65// watcher balancer functions sequentially, so the balancer can be implemented 66// lock-free. 67func (ccb *ccBalancerWrapper) watcher() { 68 for { 69 select { 70 case t := <-ccb.scBuffer.Get(): 71 ccb.scBuffer.Load() 72 if ccb.done.HasFired() { 73 break 74 } 75 ccb.balancerMu.Lock() 76 su := t.(*scStateUpdate) 77 if ub, ok := ccb.balancer.(balancer.V2Balancer); ok { 78 ub.UpdateSubConnState(su.sc, balancer.SubConnState{ConnectivityState: su.state, ConnectionError: su.err}) 79 } else { 80 ccb.balancer.HandleSubConnStateChange(su.sc, su.state) 81 } 82 ccb.balancerMu.Unlock() 83 case <-ccb.done.Done(): 84 } 85 86 if ccb.done.HasFired() { 87 ccb.balancer.Close() 88 ccb.mu.Lock() 89 scs := ccb.subConns 90 ccb.subConns = nil 91 ccb.mu.Unlock() 92 for acbw := range scs { 93 ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain) 94 } 95 ccb.UpdateState(balancer.State{ConnectivityState: connectivity.Connecting, Picker: nil}) 96 return 97 } 98 } 99} 100 101func (ccb *ccBalancerWrapper) close() { 102 ccb.done.Fire() 103} 104 105func (ccb *ccBalancerWrapper) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State, err error) { 106 // When updating addresses for a SubConn, if the address in use is not in 107 // the new addresses, the old ac will be tearDown() and a new ac will be 108 // created. tearDown() generates a state change with Shutdown state, we 109 // don't want the balancer to receive this state change. So before 110 // tearDown() on the old ac, ac.acbw (acWrapper) will be set to nil, and 111 // this function will be called with (nil, Shutdown). We don't need to call 112 // balancer method in this case. 113 if sc == nil { 114 return 115 } 116 ccb.scBuffer.Put(&scStateUpdate{ 117 sc: sc, 118 state: s, 119 err: err, 120 }) 121} 122 123func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) error { 124 ccb.balancerMu.Lock() 125 defer ccb.balancerMu.Unlock() 126 if ub, ok := ccb.balancer.(balancer.V2Balancer); ok { 127 return ub.UpdateClientConnState(*ccs) 128 } 129 ccb.balancer.HandleResolvedAddrs(ccs.ResolverState.Addresses, nil) 130 return nil 131} 132 133func (ccb *ccBalancerWrapper) resolverError(err error) { 134 if ub, ok := ccb.balancer.(balancer.V2Balancer); ok { 135 ccb.balancerMu.Lock() 136 ub.ResolverError(err) 137 ccb.balancerMu.Unlock() 138 } 139} 140 141func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) { 142 if len(addrs) <= 0 { 143 return nil, fmt.Errorf("grpc: cannot create SubConn with empty address list") 144 } 145 ccb.mu.Lock() 146 defer ccb.mu.Unlock() 147 if ccb.subConns == nil { 148 return nil, fmt.Errorf("grpc: ClientConn balancer wrapper was closed") 149 } 150 ac, err := ccb.cc.newAddrConn(addrs, opts) 151 if err != nil { 152 return nil, err 153 } 154 acbw := &acBalancerWrapper{ac: ac} 155 acbw.ac.mu.Lock() 156 ac.acbw = acbw 157 acbw.ac.mu.Unlock() 158 ccb.subConns[acbw] = struct{}{} 159 return acbw, nil 160} 161 162func (ccb *ccBalancerWrapper) RemoveSubConn(sc balancer.SubConn) { 163 acbw, ok := sc.(*acBalancerWrapper) 164 if !ok { 165 return 166 } 167 ccb.mu.Lock() 168 defer ccb.mu.Unlock() 169 if ccb.subConns == nil { 170 return 171 } 172 delete(ccb.subConns, acbw) 173 ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain) 174} 175 176func (ccb *ccBalancerWrapper) UpdateBalancerState(s connectivity.State, p balancer.Picker) { 177 ccb.mu.Lock() 178 defer ccb.mu.Unlock() 179 if ccb.subConns == nil { 180 return 181 } 182 // Update picker before updating state. Even though the ordering here does 183 // not matter, it can lead to multiple calls of Pick in the common start-up 184 // case where we wait for ready and then perform an RPC. If the picker is 185 // updated later, we could call the "connecting" picker when the state is 186 // updated, and then call the "ready" picker after the picker gets updated. 187 ccb.cc.blockingpicker.updatePicker(p) 188 ccb.cc.csMgr.updateState(s) 189} 190 191func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) { 192 ccb.mu.Lock() 193 defer ccb.mu.Unlock() 194 if ccb.subConns == nil { 195 return 196 } 197 // Update picker before updating state. Even though the ordering here does 198 // not matter, it can lead to multiple calls of Pick in the common start-up 199 // case where we wait for ready and then perform an RPC. If the picker is 200 // updated later, we could call the "connecting" picker when the state is 201 // updated, and then call the "ready" picker after the picker gets updated. 202 ccb.cc.blockingpicker.updatePickerV2(s.Picker) 203 ccb.cc.csMgr.updateState(s.ConnectivityState) 204} 205 206func (ccb *ccBalancerWrapper) ResolveNow(o resolver.ResolveNowOptions) { 207 ccb.cc.resolveNow(o) 208} 209 210func (ccb *ccBalancerWrapper) Target() string { 211 return ccb.cc.target 212} 213 214// acBalancerWrapper is a wrapper on top of ac for balancers. 215// It implements balancer.SubConn interface. 216type acBalancerWrapper struct { 217 mu sync.Mutex 218 ac *addrConn 219} 220 221func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) { 222 acbw.mu.Lock() 223 defer acbw.mu.Unlock() 224 if len(addrs) <= 0 { 225 acbw.ac.tearDown(errConnDrain) 226 return 227 } 228 if !acbw.ac.tryUpdateAddrs(addrs) { 229 cc := acbw.ac.cc 230 opts := acbw.ac.scopts 231 acbw.ac.mu.Lock() 232 // Set old ac.acbw to nil so the Shutdown state update will be ignored 233 // by balancer. 234 // 235 // TODO(bar) the state transition could be wrong when tearDown() old ac 236 // and creating new ac, fix the transition. 237 acbw.ac.acbw = nil 238 acbw.ac.mu.Unlock() 239 acState := acbw.ac.getState() 240 acbw.ac.tearDown(errConnDrain) 241 242 if acState == connectivity.Shutdown { 243 return 244 } 245 246 ac, err := cc.newAddrConn(addrs, opts) 247 if err != nil { 248 channelz.Warningf(acbw.ac.channelzID, "acBalancerWrapper: UpdateAddresses: failed to newAddrConn: %v", err) 249 return 250 } 251 acbw.ac = ac 252 ac.mu.Lock() 253 ac.acbw = acbw 254 ac.mu.Unlock() 255 if acState != connectivity.Idle { 256 ac.connect() 257 } 258 } 259} 260 261func (acbw *acBalancerWrapper) Connect() { 262 acbw.mu.Lock() 263 defer acbw.mu.Unlock() 264 acbw.ac.connect() 265} 266 267func (acbw *acBalancerWrapper) getAddrConn() *addrConn { 268 acbw.mu.Lock() 269 defer acbw.mu.Unlock() 270 return acbw.ac 271} 272