1/* 2 * 3 * Copyright 2017 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package grpc 20 21import ( 22 "fmt" 23 "sync" 24 25 "google.golang.org/grpc/balancer" 26 "google.golang.org/grpc/connectivity" 27 "google.golang.org/grpc/internal/buffer" 28 "google.golang.org/grpc/internal/channelz" 29 "google.golang.org/grpc/internal/grpcsync" 30 "google.golang.org/grpc/resolver" 31) 32 33// scStateUpdate contains the subConn and the new state it changed to. 34type scStateUpdate struct { 35 sc balancer.SubConn 36 state connectivity.State 37 err error 38} 39 40// ccBalancerWrapper is a wrapper on top of cc for balancers. 41// It implements balancer.ClientConn interface. 42type ccBalancerWrapper struct { 43 cc *ClientConn 44 balancerMu sync.Mutex // synchronizes calls to the balancer 45 balancer balancer.Balancer 46 updateCh *buffer.Unbounded 47 closed *grpcsync.Event 48 done *grpcsync.Event 49 50 mu sync.Mutex 51 subConns map[*acBalancerWrapper]struct{} 52} 53 54func newCCBalancerWrapper(cc *ClientConn, b balancer.Builder, bopts balancer.BuildOptions) *ccBalancerWrapper { 55 ccb := &ccBalancerWrapper{ 56 cc: cc, 57 updateCh: buffer.NewUnbounded(), 58 closed: grpcsync.NewEvent(), 59 done: grpcsync.NewEvent(), 60 subConns: make(map[*acBalancerWrapper]struct{}), 61 } 62 go ccb.watcher() 63 ccb.balancer = b.Build(ccb, bopts) 64 return ccb 65} 66 67// watcher balancer functions sequentially, so the balancer can be implemented 68// lock-free. 69func (ccb *ccBalancerWrapper) watcher() { 70 for { 71 select { 72 case t := <-ccb.updateCh.Get(): 73 ccb.updateCh.Load() 74 if ccb.closed.HasFired() { 75 break 76 } 77 switch u := t.(type) { 78 case *scStateUpdate: 79 ccb.balancerMu.Lock() 80 ccb.balancer.UpdateSubConnState(u.sc, balancer.SubConnState{ConnectivityState: u.state, ConnectionError: u.err}) 81 ccb.balancerMu.Unlock() 82 case *acBalancerWrapper: 83 ccb.mu.Lock() 84 if ccb.subConns != nil { 85 delete(ccb.subConns, u) 86 ccb.cc.removeAddrConn(u.getAddrConn(), errConnDrain) 87 } 88 ccb.mu.Unlock() 89 default: 90 logger.Errorf("ccBalancerWrapper.watcher: unknown update %+v, type %T", t, t) 91 } 92 case <-ccb.closed.Done(): 93 } 94 95 if ccb.closed.HasFired() { 96 ccb.balancerMu.Lock() 97 ccb.balancer.Close() 98 ccb.balancerMu.Unlock() 99 ccb.mu.Lock() 100 scs := ccb.subConns 101 ccb.subConns = nil 102 ccb.mu.Unlock() 103 ccb.UpdateState(balancer.State{ConnectivityState: connectivity.Connecting, Picker: nil}) 104 ccb.done.Fire() 105 // Fire done before removing the addr conns. We can safely unblock 106 // ccb.close and allow the removeAddrConns to happen 107 // asynchronously. 108 for acbw := range scs { 109 ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain) 110 } 111 return 112 } 113 } 114} 115 116func (ccb *ccBalancerWrapper) close() { 117 ccb.closed.Fire() 118 <-ccb.done.Done() 119} 120 121func (ccb *ccBalancerWrapper) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State, err error) { 122 // When updating addresses for a SubConn, if the address in use is not in 123 // the new addresses, the old ac will be tearDown() and a new ac will be 124 // created. tearDown() generates a state change with Shutdown state, we 125 // don't want the balancer to receive this state change. So before 126 // tearDown() on the old ac, ac.acbw (acWrapper) will be set to nil, and 127 // this function will be called with (nil, Shutdown). We don't need to call 128 // balancer method in this case. 129 if sc == nil { 130 return 131 } 132 ccb.updateCh.Put(&scStateUpdate{ 133 sc: sc, 134 state: s, 135 err: err, 136 }) 137} 138 139func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) error { 140 ccb.balancerMu.Lock() 141 defer ccb.balancerMu.Unlock() 142 return ccb.balancer.UpdateClientConnState(*ccs) 143} 144 145func (ccb *ccBalancerWrapper) resolverError(err error) { 146 ccb.balancerMu.Lock() 147 ccb.balancer.ResolverError(err) 148 ccb.balancerMu.Unlock() 149} 150 151func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) { 152 if len(addrs) <= 0 { 153 return nil, fmt.Errorf("grpc: cannot create SubConn with empty address list") 154 } 155 ccb.mu.Lock() 156 defer ccb.mu.Unlock() 157 if ccb.subConns == nil { 158 return nil, fmt.Errorf("grpc: ClientConn balancer wrapper was closed") 159 } 160 ac, err := ccb.cc.newAddrConn(addrs, opts) 161 if err != nil { 162 return nil, err 163 } 164 acbw := &acBalancerWrapper{ac: ac} 165 acbw.ac.mu.Lock() 166 ac.acbw = acbw 167 acbw.ac.mu.Unlock() 168 ccb.subConns[acbw] = struct{}{} 169 return acbw, nil 170} 171 172func (ccb *ccBalancerWrapper) RemoveSubConn(sc balancer.SubConn) { 173 // The RemoveSubConn() is handled in the run() goroutine, to avoid deadlock 174 // during switchBalancer() if the old balancer calls RemoveSubConn() in its 175 // Close(). 176 ccb.updateCh.Put(sc) 177} 178 179func (ccb *ccBalancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) { 180 acbw, ok := sc.(*acBalancerWrapper) 181 if !ok { 182 return 183 } 184 acbw.UpdateAddresses(addrs) 185} 186 187func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) { 188 ccb.mu.Lock() 189 defer ccb.mu.Unlock() 190 if ccb.subConns == nil { 191 return 192 } 193 // Update picker before updating state. Even though the ordering here does 194 // not matter, it can lead to multiple calls of Pick in the common start-up 195 // case where we wait for ready and then perform an RPC. If the picker is 196 // updated later, we could call the "connecting" picker when the state is 197 // updated, and then call the "ready" picker after the picker gets updated. 198 ccb.cc.blockingpicker.updatePicker(s.Picker) 199 ccb.cc.csMgr.updateState(s.ConnectivityState) 200} 201 202func (ccb *ccBalancerWrapper) ResolveNow(o resolver.ResolveNowOptions) { 203 ccb.cc.resolveNow(o) 204} 205 206func (ccb *ccBalancerWrapper) Target() string { 207 return ccb.cc.target 208} 209 210// acBalancerWrapper is a wrapper on top of ac for balancers. 211// It implements balancer.SubConn interface. 212type acBalancerWrapper struct { 213 mu sync.Mutex 214 ac *addrConn 215} 216 217func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) { 218 acbw.mu.Lock() 219 defer acbw.mu.Unlock() 220 if len(addrs) <= 0 { 221 acbw.ac.cc.removeAddrConn(acbw.ac, errConnDrain) 222 return 223 } 224 if !acbw.ac.tryUpdateAddrs(addrs) { 225 cc := acbw.ac.cc 226 opts := acbw.ac.scopts 227 acbw.ac.mu.Lock() 228 // Set old ac.acbw to nil so the Shutdown state update will be ignored 229 // by balancer. 230 // 231 // TODO(bar) the state transition could be wrong when tearDown() old ac 232 // and creating new ac, fix the transition. 233 acbw.ac.acbw = nil 234 acbw.ac.mu.Unlock() 235 acState := acbw.ac.getState() 236 acbw.ac.cc.removeAddrConn(acbw.ac, errConnDrain) 237 238 if acState == connectivity.Shutdown { 239 return 240 } 241 242 ac, err := cc.newAddrConn(addrs, opts) 243 if err != nil { 244 channelz.Warningf(logger, acbw.ac.channelzID, "acBalancerWrapper: UpdateAddresses: failed to newAddrConn: %v", err) 245 return 246 } 247 acbw.ac = ac 248 ac.mu.Lock() 249 ac.acbw = acbw 250 ac.mu.Unlock() 251 if acState != connectivity.Idle { 252 ac.connect() 253 } 254 } 255} 256 257func (acbw *acBalancerWrapper) Connect() { 258 acbw.mu.Lock() 259 defer acbw.mu.Unlock() 260 acbw.ac.connect() 261} 262 263func (acbw *acBalancerWrapper) getAddrConn() *addrConn { 264 acbw.mu.Lock() 265 defer acbw.mu.Unlock() 266 return acbw.ac 267} 268