1/* 2 * 3 * Copyright 2017 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package grpc 20 21import ( 22 "fmt" 23 "sync" 24 25 "google.golang.org/grpc/balancer" 26 "google.golang.org/grpc/connectivity" 27 "google.golang.org/grpc/internal/buffer" 28 "google.golang.org/grpc/internal/channelz" 29 "google.golang.org/grpc/internal/grpcsync" 30 "google.golang.org/grpc/resolver" 31) 32 33// scStateUpdate contains the subConn and the new state it changed to. 34type scStateUpdate struct { 35 sc balancer.SubConn 36 state connectivity.State 37 err error 38} 39 40// ccBalancerWrapper is a wrapper on top of cc for balancers. 41// It implements balancer.ClientConn interface. 42type ccBalancerWrapper struct { 43 cc *ClientConn 44 balancerMu sync.Mutex // synchronizes calls to the balancer 45 balancer balancer.Balancer 46 scBuffer *buffer.Unbounded 47 done *grpcsync.Event 48 49 mu sync.Mutex 50 subConns map[*acBalancerWrapper]struct{} 51} 52 53func newCCBalancerWrapper(cc *ClientConn, b balancer.Builder, bopts balancer.BuildOptions) *ccBalancerWrapper { 54 ccb := &ccBalancerWrapper{ 55 cc: cc, 56 scBuffer: buffer.NewUnbounded(), 57 done: grpcsync.NewEvent(), 58 subConns: make(map[*acBalancerWrapper]struct{}), 59 } 60 go ccb.watcher() 61 ccb.balancer = b.Build(ccb, bopts) 62 return ccb 63} 64 65// watcher balancer functions sequentially, so the balancer can be implemented 66// lock-free. 67func (ccb *ccBalancerWrapper) watcher() { 68 for { 69 select { 70 case t := <-ccb.scBuffer.Get(): 71 ccb.scBuffer.Load() 72 if ccb.done.HasFired() { 73 break 74 } 75 ccb.balancerMu.Lock() 76 su := t.(*scStateUpdate) 77 ccb.balancer.UpdateSubConnState(su.sc, balancer.SubConnState{ConnectivityState: su.state, ConnectionError: su.err}) 78 ccb.balancerMu.Unlock() 79 case <-ccb.done.Done(): 80 } 81 82 if ccb.done.HasFired() { 83 ccb.balancer.Close() 84 ccb.mu.Lock() 85 scs := ccb.subConns 86 ccb.subConns = nil 87 ccb.mu.Unlock() 88 for acbw := range scs { 89 ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain) 90 } 91 ccb.UpdateState(balancer.State{ConnectivityState: connectivity.Connecting, Picker: nil}) 92 return 93 } 94 } 95} 96 97func (ccb *ccBalancerWrapper) close() { 98 ccb.done.Fire() 99} 100 101func (ccb *ccBalancerWrapper) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State, err error) { 102 // When updating addresses for a SubConn, if the address in use is not in 103 // the new addresses, the old ac will be tearDown() and a new ac will be 104 // created. tearDown() generates a state change with Shutdown state, we 105 // don't want the balancer to receive this state change. So before 106 // tearDown() on the old ac, ac.acbw (acWrapper) will be set to nil, and 107 // this function will be called with (nil, Shutdown). We don't need to call 108 // balancer method in this case. 109 if sc == nil { 110 return 111 } 112 ccb.scBuffer.Put(&scStateUpdate{ 113 sc: sc, 114 state: s, 115 err: err, 116 }) 117} 118 119func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) error { 120 ccb.balancerMu.Lock() 121 defer ccb.balancerMu.Unlock() 122 return ccb.balancer.UpdateClientConnState(*ccs) 123} 124 125func (ccb *ccBalancerWrapper) resolverError(err error) { 126 ccb.balancerMu.Lock() 127 ccb.balancer.ResolverError(err) 128 ccb.balancerMu.Unlock() 129} 130 131func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) { 132 if len(addrs) <= 0 { 133 return nil, fmt.Errorf("grpc: cannot create SubConn with empty address list") 134 } 135 ccb.mu.Lock() 136 defer ccb.mu.Unlock() 137 if ccb.subConns == nil { 138 return nil, fmt.Errorf("grpc: ClientConn balancer wrapper was closed") 139 } 140 ac, err := ccb.cc.newAddrConn(addrs, opts) 141 if err != nil { 142 return nil, err 143 } 144 acbw := &acBalancerWrapper{ac: ac} 145 acbw.ac.mu.Lock() 146 ac.acbw = acbw 147 acbw.ac.mu.Unlock() 148 ccb.subConns[acbw] = struct{}{} 149 return acbw, nil 150} 151 152func (ccb *ccBalancerWrapper) RemoveSubConn(sc balancer.SubConn) { 153 acbw, ok := sc.(*acBalancerWrapper) 154 if !ok { 155 return 156 } 157 ccb.mu.Lock() 158 defer ccb.mu.Unlock() 159 if ccb.subConns == nil { 160 return 161 } 162 delete(ccb.subConns, acbw) 163 ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain) 164} 165 166func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) { 167 ccb.mu.Lock() 168 defer ccb.mu.Unlock() 169 if ccb.subConns == nil { 170 return 171 } 172 // Update picker before updating state. Even though the ordering here does 173 // not matter, it can lead to multiple calls of Pick in the common start-up 174 // case where we wait for ready and then perform an RPC. If the picker is 175 // updated later, we could call the "connecting" picker when the state is 176 // updated, and then call the "ready" picker after the picker gets updated. 177 ccb.cc.blockingpicker.updatePicker(s.Picker) 178 ccb.cc.csMgr.updateState(s.ConnectivityState) 179} 180 181func (ccb *ccBalancerWrapper) ResolveNow(o resolver.ResolveNowOptions) { 182 ccb.cc.resolveNow(o) 183} 184 185func (ccb *ccBalancerWrapper) Target() string { 186 return ccb.cc.target 187} 188 189// acBalancerWrapper is a wrapper on top of ac for balancers. 190// It implements balancer.SubConn interface. 191type acBalancerWrapper struct { 192 mu sync.Mutex 193 ac *addrConn 194} 195 196func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) { 197 acbw.mu.Lock() 198 defer acbw.mu.Unlock() 199 if len(addrs) <= 0 { 200 acbw.ac.tearDown(errConnDrain) 201 return 202 } 203 if !acbw.ac.tryUpdateAddrs(addrs) { 204 cc := acbw.ac.cc 205 opts := acbw.ac.scopts 206 acbw.ac.mu.Lock() 207 // Set old ac.acbw to nil so the Shutdown state update will be ignored 208 // by balancer. 209 // 210 // TODO(bar) the state transition could be wrong when tearDown() old ac 211 // and creating new ac, fix the transition. 212 acbw.ac.acbw = nil 213 acbw.ac.mu.Unlock() 214 acState := acbw.ac.getState() 215 acbw.ac.tearDown(errConnDrain) 216 217 if acState == connectivity.Shutdown { 218 return 219 } 220 221 ac, err := cc.newAddrConn(addrs, opts) 222 if err != nil { 223 channelz.Warningf(logger, acbw.ac.channelzID, "acBalancerWrapper: UpdateAddresses: failed to newAddrConn: %v", err) 224 return 225 } 226 acbw.ac = ac 227 ac.mu.Lock() 228 ac.acbw = acbw 229 ac.mu.Unlock() 230 if acState != connectivity.Idle { 231 ac.connect() 232 } 233 } 234} 235 236func (acbw *acBalancerWrapper) Connect() { 237 acbw.mu.Lock() 238 defer acbw.mu.Unlock() 239 acbw.ac.connect() 240} 241 242func (acbw *acBalancerWrapper) getAddrConn() *addrConn { 243 acbw.mu.Lock() 244 defer acbw.mu.Unlock() 245 return acbw.ac 246} 247