1// Copyright 2018 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package balancer 16 17import ( 18 "fmt" 19 "strconv" 20 "sync" 21 "time" 22 23 "go.etcd.io/etcd/clientv3/balancer/picker" 24 25 "go.uber.org/zap" 26 "google.golang.org/grpc/balancer" 27 "google.golang.org/grpc/connectivity" 28 "google.golang.org/grpc/resolver" 29 _ "google.golang.org/grpc/resolver/dns" // register DNS resolver 30 _ "google.golang.org/grpc/resolver/passthrough" // register passthrough resolver 31) 32 33// RegisterBuilder creates and registers a builder. Since this function calls balancer.Register, it 34// must be invoked at initialization time. 35func RegisterBuilder(cfg Config) { 36 bb := &builder{cfg} 37 balancer.Register(bb) 38 39 bb.cfg.Logger.Debug( 40 "registered balancer", 41 zap.String("policy", bb.cfg.Policy.String()), 42 zap.String("name", bb.cfg.Name), 43 ) 44} 45 46type builder struct { 47 cfg Config 48} 49 50// Build is called initially when creating "ccBalancerWrapper". 51// "grpc.Dial" is called to this client connection. 52// Then, resolved addresses will be handled via "HandleResolvedAddrs". 53func (b *builder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer { 54 bb := &baseBalancer{ 55 id: strconv.FormatInt(time.Now().UnixNano(), 36), 56 policy: b.cfg.Policy, 57 name: b.cfg.Name, 58 lg: b.cfg.Logger, 59 60 addrToSc: make(map[resolver.Address]balancer.SubConn), 61 scToAddr: make(map[balancer.SubConn]resolver.Address), 62 scToSt: make(map[balancer.SubConn]connectivity.State), 63 64 currentConn: nil, 65 csEvltr: &connectivityStateEvaluator{}, 66 67 // initialize picker always returns "ErrNoSubConnAvailable" 68 Picker: picker.NewErr(balancer.ErrNoSubConnAvailable), 69 } 70 if bb.lg == nil { 71 bb.lg = zap.NewNop() 72 } 73 74 // TODO: support multiple connections 75 bb.mu.Lock() 76 bb.currentConn = cc 77 bb.mu.Unlock() 78 79 bb.lg.Info( 80 "built balancer", 81 zap.String("balancer-id", bb.id), 82 zap.String("policy", bb.policy.String()), 83 zap.String("resolver-target", cc.Target()), 84 ) 85 return bb 86} 87 88// Name implements "grpc/balancer.Builder" interface. 89func (b *builder) Name() string { return b.cfg.Name } 90 91// Balancer defines client balancer interface. 92type Balancer interface { 93 // Balancer is called on specified client connection. Client initiates gRPC 94 // connection with "grpc.Dial(addr, grpc.WithBalancerName)", and then those resolved 95 // addresses are passed to "grpc/balancer.Balancer.HandleResolvedAddrs". 96 // For each resolved address, balancer calls "balancer.ClientConn.NewSubConn". 97 // "grpc/balancer.Balancer.HandleSubConnStateChange" is called when connectivity state 98 // changes, thus requires failover logic in this method. 99 balancer.Balancer 100 101 // Picker calls "Pick" for every client request. 102 picker.Picker 103} 104 105type baseBalancer struct { 106 id string 107 policy picker.Policy 108 name string 109 lg *zap.Logger 110 111 mu sync.RWMutex 112 113 addrToSc map[resolver.Address]balancer.SubConn 114 scToAddr map[balancer.SubConn]resolver.Address 115 scToSt map[balancer.SubConn]connectivity.State 116 117 currentConn balancer.ClientConn 118 currentState connectivity.State 119 csEvltr *connectivityStateEvaluator 120 121 picker.Picker 122} 123 124// HandleResolvedAddrs implements "grpc/balancer.Balancer" interface. 125// gRPC sends initial or updated resolved addresses from "Build". 126func (bb *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) { 127 if err != nil { 128 bb.lg.Warn("HandleResolvedAddrs called with error", zap.String("balancer-id", bb.id), zap.Error(err)) 129 return 130 } 131 bb.lg.Info("resolved", zap.String("balancer-id", bb.id), zap.Strings("addresses", addrsToStrings(addrs))) 132 133 bb.mu.Lock() 134 defer bb.mu.Unlock() 135 136 resolved := make(map[resolver.Address]struct{}) 137 for _, addr := range addrs { 138 resolved[addr] = struct{}{} 139 if _, ok := bb.addrToSc[addr]; !ok { 140 sc, err := bb.currentConn.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{}) 141 if err != nil { 142 bb.lg.Warn("NewSubConn failed", zap.String("balancer-id", bb.id), zap.Error(err), zap.String("address", addr.Addr)) 143 continue 144 } 145 bb.addrToSc[addr] = sc 146 bb.scToAddr[sc] = addr 147 bb.scToSt[sc] = connectivity.Idle 148 sc.Connect() 149 } 150 } 151 152 for addr, sc := range bb.addrToSc { 153 if _, ok := resolved[addr]; !ok { 154 // was removed by resolver or failed to create subconn 155 bb.currentConn.RemoveSubConn(sc) 156 delete(bb.addrToSc, addr) 157 158 bb.lg.Info( 159 "removed subconn", 160 zap.String("balancer-id", bb.id), 161 zap.String("address", addr.Addr), 162 zap.String("subconn", scToString(sc)), 163 ) 164 165 // Keep the state of this sc in bb.scToSt until sc's state becomes Shutdown. 166 // The entry will be deleted in HandleSubConnStateChange. 167 // (DO NOT) delete(bb.scToAddr, sc) 168 // (DO NOT) delete(bb.scToSt, sc) 169 } 170 } 171} 172 173// HandleSubConnStateChange implements "grpc/balancer.Balancer" interface. 174func (bb *baseBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) { 175 bb.mu.Lock() 176 defer bb.mu.Unlock() 177 178 old, ok := bb.scToSt[sc] 179 if !ok { 180 bb.lg.Warn( 181 "state change for an unknown subconn", 182 zap.String("balancer-id", bb.id), 183 zap.String("subconn", scToString(sc)), 184 zap.String("state", s.String()), 185 ) 186 return 187 } 188 189 bb.lg.Info( 190 "state changed", 191 zap.String("balancer-id", bb.id), 192 zap.Bool("connected", s == connectivity.Ready), 193 zap.String("subconn", scToString(sc)), 194 zap.String("address", bb.scToAddr[sc].Addr), 195 zap.String("old-state", old.String()), 196 zap.String("new-state", s.String()), 197 ) 198 199 bb.scToSt[sc] = s 200 switch s { 201 case connectivity.Idle: 202 sc.Connect() 203 case connectivity.Shutdown: 204 // When an address was removed by resolver, b called RemoveSubConn but 205 // kept the sc's state in scToSt. Remove state for this sc here. 206 delete(bb.scToAddr, sc) 207 delete(bb.scToSt, sc) 208 } 209 210 oldAggrState := bb.currentState 211 bb.currentState = bb.csEvltr.recordTransition(old, s) 212 213 // Regenerate picker when one of the following happens: 214 // - this sc became ready from not-ready 215 // - this sc became not-ready from ready 216 // - the aggregated state of balancer became TransientFailure from non-TransientFailure 217 // - the aggregated state of balancer became non-TransientFailure from TransientFailure 218 if (s == connectivity.Ready) != (old == connectivity.Ready) || 219 (bb.currentState == connectivity.TransientFailure) != (oldAggrState == connectivity.TransientFailure) { 220 bb.regeneratePicker() 221 } 222 223 bb.currentConn.UpdateBalancerState(bb.currentState, bb.Picker) 224 return 225} 226 227func (bb *baseBalancer) regeneratePicker() { 228 if bb.currentState == connectivity.TransientFailure { 229 bb.lg.Info( 230 "generated transient error picker", 231 zap.String("balancer-id", bb.id), 232 zap.String("policy", bb.policy.String()), 233 ) 234 bb.Picker = picker.NewErr(balancer.ErrTransientFailure) 235 return 236 } 237 238 // only pass ready subconns to picker 239 scs := make([]balancer.SubConn, 0) 240 addrToSc := make(map[resolver.Address]balancer.SubConn) 241 scToAddr := make(map[balancer.SubConn]resolver.Address) 242 for addr, sc := range bb.addrToSc { 243 if st, ok := bb.scToSt[sc]; ok && st == connectivity.Ready { 244 scs = append(scs, sc) 245 addrToSc[addr] = sc 246 scToAddr[sc] = addr 247 } 248 } 249 250 switch bb.policy { 251 case picker.RoundrobinBalanced: 252 bb.Picker = picker.NewRoundrobinBalanced(bb.lg, scs, addrToSc, scToAddr) 253 254 default: 255 panic(fmt.Errorf("invalid balancer picker policy (%d)", bb.policy)) 256 } 257 258 bb.lg.Info( 259 "generated picker", 260 zap.String("balancer-id", bb.id), 261 zap.String("policy", bb.policy.String()), 262 zap.Strings("subconn-ready", scsToStrings(addrToSc)), 263 zap.Int("subconn-size", len(addrToSc)), 264 ) 265} 266 267// Close implements "grpc/balancer.Balancer" interface. 268// Close is a nop because base balancer doesn't have internal state to clean up, 269// and it doesn't need to call RemoveSubConn for the SubConns. 270func (bb *baseBalancer) Close() { 271 // TODO 272} 273