1/* 2 * 3 * Copyright 2017 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package grpc 20 21import ( 22 "fmt" 23 "sync" 24 25 "google.golang.org/grpc/balancer" 26 "google.golang.org/grpc/connectivity" 27 "google.golang.org/grpc/grpclog" 28 "google.golang.org/grpc/resolver" 29) 30 31// scStateUpdate contains the subConn and the new state it changed to. 32type scStateUpdate struct { 33 sc balancer.SubConn 34 state connectivity.State 35} 36 37// scStateUpdateBuffer is an unbounded channel for scStateChangeTuple. 38// TODO make a general purpose buffer that uses interface{}. 39type scStateUpdateBuffer struct { 40 c chan *scStateUpdate 41 mu sync.Mutex 42 backlog []*scStateUpdate 43} 44 45func newSCStateUpdateBuffer() *scStateUpdateBuffer { 46 return &scStateUpdateBuffer{ 47 c: make(chan *scStateUpdate, 1), 48 } 49} 50 51func (b *scStateUpdateBuffer) put(t *scStateUpdate) { 52 b.mu.Lock() 53 defer b.mu.Unlock() 54 if len(b.backlog) == 0 { 55 select { 56 case b.c <- t: 57 return 58 default: 59 } 60 } 61 b.backlog = append(b.backlog, t) 62} 63 64func (b *scStateUpdateBuffer) load() { 65 b.mu.Lock() 66 defer b.mu.Unlock() 67 if len(b.backlog) > 0 { 68 select { 69 case b.c <- b.backlog[0]: 70 b.backlog[0] = nil 71 b.backlog = b.backlog[1:] 72 default: 73 } 74 } 75} 76 77// get returns the channel that the scStateUpdate will be sent to. 78// 79// Upon receiving, the caller should call load to send another 80// scStateChangeTuple onto the channel if there is any. 81func (b *scStateUpdateBuffer) get() <-chan *scStateUpdate { 82 return b.c 83} 84 85// ccBalancerWrapper is a wrapper on top of cc for balancers. 86// It implements balancer.ClientConn interface. 87type ccBalancerWrapper struct { 88 cc *ClientConn 89 balancer balancer.Balancer 90 stateChangeQueue *scStateUpdateBuffer 91 resolverUpdateCh chan *resolver.State 92 done chan struct{} 93 94 mu sync.Mutex 95 subConns map[*acBalancerWrapper]struct{} 96} 97 98func newCCBalancerWrapper(cc *ClientConn, b balancer.Builder, bopts balancer.BuildOptions) *ccBalancerWrapper { 99 ccb := &ccBalancerWrapper{ 100 cc: cc, 101 stateChangeQueue: newSCStateUpdateBuffer(), 102 resolverUpdateCh: make(chan *resolver.State, 1), 103 done: make(chan struct{}), 104 subConns: make(map[*acBalancerWrapper]struct{}), 105 } 106 go ccb.watcher() 107 ccb.balancer = b.Build(ccb, bopts) 108 return ccb 109} 110 111// watcher balancer functions sequentially, so the balancer can be implemented 112// lock-free. 113func (ccb *ccBalancerWrapper) watcher() { 114 for { 115 select { 116 case t := <-ccb.stateChangeQueue.get(): 117 ccb.stateChangeQueue.load() 118 select { 119 case <-ccb.done: 120 ccb.balancer.Close() 121 return 122 default: 123 } 124 if ub, ok := ccb.balancer.(balancer.V2Balancer); ok { 125 ub.UpdateSubConnState(t.sc, balancer.SubConnState{ConnectivityState: t.state}) 126 } else { 127 ccb.balancer.HandleSubConnStateChange(t.sc, t.state) 128 } 129 case s := <-ccb.resolverUpdateCh: 130 select { 131 case <-ccb.done: 132 ccb.balancer.Close() 133 return 134 default: 135 } 136 if ub, ok := ccb.balancer.(balancer.V2Balancer); ok { 137 ub.UpdateResolverState(*s) 138 } else { 139 ccb.balancer.HandleResolvedAddrs(s.Addresses, nil) 140 } 141 case <-ccb.done: 142 } 143 144 select { 145 case <-ccb.done: 146 ccb.balancer.Close() 147 ccb.mu.Lock() 148 scs := ccb.subConns 149 ccb.subConns = nil 150 ccb.mu.Unlock() 151 for acbw := range scs { 152 ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain) 153 } 154 return 155 default: 156 } 157 } 158} 159 160func (ccb *ccBalancerWrapper) close() { 161 close(ccb.done) 162} 163 164func (ccb *ccBalancerWrapper) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State) { 165 // When updating addresses for a SubConn, if the address in use is not in 166 // the new addresses, the old ac will be tearDown() and a new ac will be 167 // created. tearDown() generates a state change with Shutdown state, we 168 // don't want the balancer to receive this state change. So before 169 // tearDown() on the old ac, ac.acbw (acWrapper) will be set to nil, and 170 // this function will be called with (nil, Shutdown). We don't need to call 171 // balancer method in this case. 172 if sc == nil { 173 return 174 } 175 ccb.stateChangeQueue.put(&scStateUpdate{ 176 sc: sc, 177 state: s, 178 }) 179} 180 181func (ccb *ccBalancerWrapper) updateResolverState(s resolver.State) { 182 if ccb.cc.curBalancerName != grpclbName { 183 // Filter any grpclb addresses since we don't have the grpclb balancer. 184 for i := 0; i < len(s.Addresses); { 185 if s.Addresses[i].Type == resolver.GRPCLB { 186 copy(s.Addresses[i:], s.Addresses[i+1:]) 187 s.Addresses = s.Addresses[:len(s.Addresses)-1] 188 continue 189 } 190 i++ 191 } 192 } 193 select { 194 case <-ccb.resolverUpdateCh: 195 default: 196 } 197 ccb.resolverUpdateCh <- &s 198} 199 200func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) { 201 if len(addrs) <= 0 { 202 return nil, fmt.Errorf("grpc: cannot create SubConn with empty address list") 203 } 204 ccb.mu.Lock() 205 defer ccb.mu.Unlock() 206 if ccb.subConns == nil { 207 return nil, fmt.Errorf("grpc: ClientConn balancer wrapper was closed") 208 } 209 ac, err := ccb.cc.newAddrConn(addrs, opts) 210 if err != nil { 211 return nil, err 212 } 213 acbw := &acBalancerWrapper{ac: ac} 214 acbw.ac.mu.Lock() 215 ac.acbw = acbw 216 acbw.ac.mu.Unlock() 217 ccb.subConns[acbw] = struct{}{} 218 return acbw, nil 219} 220 221func (ccb *ccBalancerWrapper) RemoveSubConn(sc balancer.SubConn) { 222 acbw, ok := sc.(*acBalancerWrapper) 223 if !ok { 224 return 225 } 226 ccb.mu.Lock() 227 defer ccb.mu.Unlock() 228 if ccb.subConns == nil { 229 return 230 } 231 delete(ccb.subConns, acbw) 232 ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain) 233} 234 235func (ccb *ccBalancerWrapper) UpdateBalancerState(s connectivity.State, p balancer.Picker) { 236 ccb.mu.Lock() 237 defer ccb.mu.Unlock() 238 if ccb.subConns == nil { 239 return 240 } 241 // Update picker before updating state. Even though the ordering here does 242 // not matter, it can lead to multiple calls of Pick in the common start-up 243 // case where we wait for ready and then perform an RPC. If the picker is 244 // updated later, we could call the "connecting" picker when the state is 245 // updated, and then call the "ready" picker after the picker gets updated. 246 ccb.cc.blockingpicker.updatePicker(p) 247 ccb.cc.csMgr.updateState(s) 248} 249 250func (ccb *ccBalancerWrapper) ResolveNow(o resolver.ResolveNowOption) { 251 ccb.cc.resolveNow(o) 252} 253 254func (ccb *ccBalancerWrapper) Target() string { 255 return ccb.cc.target 256} 257 258// acBalancerWrapper is a wrapper on top of ac for balancers. 259// It implements balancer.SubConn interface. 260type acBalancerWrapper struct { 261 mu sync.Mutex 262 ac *addrConn 263} 264 265func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) { 266 acbw.mu.Lock() 267 defer acbw.mu.Unlock() 268 if len(addrs) <= 0 { 269 acbw.ac.tearDown(errConnDrain) 270 return 271 } 272 if !acbw.ac.tryUpdateAddrs(addrs) { 273 cc := acbw.ac.cc 274 opts := acbw.ac.scopts 275 acbw.ac.mu.Lock() 276 // Set old ac.acbw to nil so the Shutdown state update will be ignored 277 // by balancer. 278 // 279 // TODO(bar) the state transition could be wrong when tearDown() old ac 280 // and creating new ac, fix the transition. 281 acbw.ac.acbw = nil 282 acbw.ac.mu.Unlock() 283 acState := acbw.ac.getState() 284 acbw.ac.tearDown(errConnDrain) 285 286 if acState == connectivity.Shutdown { 287 return 288 } 289 290 ac, err := cc.newAddrConn(addrs, opts) 291 if err != nil { 292 grpclog.Warningf("acBalancerWrapper: UpdateAddresses: failed to newAddrConn: %v", err) 293 return 294 } 295 acbw.ac = ac 296 ac.mu.Lock() 297 ac.acbw = acbw 298 ac.mu.Unlock() 299 if acState != connectivity.Idle { 300 ac.connect() 301 } 302 } 303} 304 305func (acbw *acBalancerWrapper) Connect() { 306 acbw.mu.Lock() 307 defer acbw.mu.Unlock() 308 acbw.ac.connect() 309} 310 311func (acbw *acBalancerWrapper) getAddrConn() *addrConn { 312 acbw.mu.Lock() 313 defer acbw.mu.Unlock() 314 return acbw.ac 315} 316