1/* 2 * 3 * Copyright 2016 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package grpc 20 21import ( 22 "context" 23 "net" 24 "sync" 25 26 "google.golang.org/grpc/codes" 27 "google.golang.org/grpc/credentials" 28 "google.golang.org/grpc/grpclog" 29 "google.golang.org/grpc/naming" 30 "google.golang.org/grpc/status" 31) 32 33// Address represents a server the client connects to. 34// 35// Deprecated: please use package balancer. 36type Address struct { 37 // Addr is the server address on which a connection will be established. 38 Addr string 39 // Metadata is the information associated with Addr, which may be used 40 // to make load balancing decision. 41 Metadata interface{} 42} 43 44// BalancerConfig specifies the configurations for Balancer. 45// 46// Deprecated: please use package balancer. May be removed in a future 1.x release. 47type BalancerConfig struct { 48 // DialCreds is the transport credential the Balancer implementation can 49 // use to dial to a remote load balancer server. The Balancer implementations 50 // can ignore this if it does not need to talk to another party securely. 51 DialCreds credentials.TransportCredentials 52 // Dialer is the custom dialer the Balancer implementation can use to dial 53 // to a remote load balancer server. The Balancer implementations 54 // can ignore this if it doesn't need to talk to remote balancer. 55 Dialer func(context.Context, string) (net.Conn, error) 56} 57 58// BalancerGetOptions configures a Get call. 59// 60// Deprecated: please use package balancer. May be removed in a future 1.x release. 61type BalancerGetOptions struct { 62 // BlockingWait specifies whether Get should block when there is no 63 // connected address. 64 BlockingWait bool 65} 66 67// Balancer chooses network addresses for RPCs. 68// 69// Deprecated: please use package balancer. May be removed in a future 1.x release. 70type Balancer interface { 71 // Start does the initialization work to bootstrap a Balancer. For example, 72 // this function may start the name resolution and watch the updates. It will 73 // be called when dialing. 74 Start(target string, config BalancerConfig) error 75 // Up informs the Balancer that gRPC has a connection to the server at 76 // addr. It returns down which is called once the connection to addr gets 77 // lost or closed. 78 // TODO: It is not clear how to construct and take advantage of the meaningful error 79 // parameter for down. Need realistic demands to guide. 80 Up(addr Address) (down func(error)) 81 // Get gets the address of a server for the RPC corresponding to ctx. 82 // i) If it returns a connected address, gRPC internals issues the RPC on the 83 // connection to this address; 84 // ii) If it returns an address on which the connection is under construction 85 // (initiated by Notify(...)) but not connected, gRPC internals 86 // * fails RPC if the RPC is fail-fast and connection is in the TransientFailure or 87 // Shutdown state; 88 // or 89 // * issues RPC on the connection otherwise. 90 // iii) If it returns an address on which the connection does not exist, gRPC 91 // internals treats it as an error and will fail the corresponding RPC. 92 // 93 // Therefore, the following is the recommended rule when writing a custom Balancer. 94 // If opts.BlockingWait is true, it should return a connected address or 95 // block if there is no connected address. It should respect the timeout or 96 // cancellation of ctx when blocking. If opts.BlockingWait is false (for fail-fast 97 // RPCs), it should return an address it has notified via Notify(...) immediately 98 // instead of blocking. 99 // 100 // The function returns put which is called once the rpc has completed or failed. 101 // put can collect and report RPC stats to a remote load balancer. 102 // 103 // This function should only return the errors Balancer cannot recover by itself. 104 // gRPC internals will fail the RPC if an error is returned. 105 Get(ctx context.Context, opts BalancerGetOptions) (addr Address, put func(), err error) 106 // Notify returns a channel that is used by gRPC internals to watch the addresses 107 // gRPC needs to connect. The addresses might be from a name resolver or remote 108 // load balancer. gRPC internals will compare it with the existing connected 109 // addresses. If the address Balancer notified is not in the existing connected 110 // addresses, gRPC starts to connect the address. If an address in the existing 111 // connected addresses is not in the notification list, the corresponding connection 112 // is shutdown gracefully. Otherwise, there are no operations to take. Note that 113 // the Address slice must be the full list of the Addresses which should be connected. 114 // It is NOT delta. 115 Notify() <-chan []Address 116 // Close shuts down the balancer. 117 Close() error 118} 119 120// RoundRobin returns a Balancer that selects addresses round-robin. It uses r to watch 121// the name resolution updates and updates the addresses available correspondingly. 122// 123// Deprecated: please use package balancer/roundrobin. May be removed in a future 1.x release. 124func RoundRobin(r naming.Resolver) Balancer { 125 return &roundRobin{r: r} 126} 127 128type addrInfo struct { 129 addr Address 130 connected bool 131} 132 133type roundRobin struct { 134 r naming.Resolver 135 w naming.Watcher 136 addrs []*addrInfo // all the addresses the client should potentially connect 137 mu sync.Mutex 138 addrCh chan []Address // the channel to notify gRPC internals the list of addresses the client should connect to. 139 next int // index of the next address to return for Get() 140 waitCh chan struct{} // the channel to block when there is no connected address available 141 done bool // The Balancer is closed. 142} 143 144func (rr *roundRobin) watchAddrUpdates() error { 145 updates, err := rr.w.Next() 146 if err != nil { 147 grpclog.Warningf("grpc: the naming watcher stops working due to %v.", err) 148 return err 149 } 150 rr.mu.Lock() 151 defer rr.mu.Unlock() 152 for _, update := range updates { 153 addr := Address{ 154 Addr: update.Addr, 155 Metadata: update.Metadata, 156 } 157 switch update.Op { 158 case naming.Add: 159 var exist bool 160 for _, v := range rr.addrs { 161 if addr == v.addr { 162 exist = true 163 grpclog.Infoln("grpc: The name resolver wanted to add an existing address: ", addr) 164 break 165 } 166 } 167 if exist { 168 continue 169 } 170 rr.addrs = append(rr.addrs, &addrInfo{addr: addr}) 171 case naming.Delete: 172 for i, v := range rr.addrs { 173 if addr == v.addr { 174 copy(rr.addrs[i:], rr.addrs[i+1:]) 175 rr.addrs = rr.addrs[:len(rr.addrs)-1] 176 break 177 } 178 } 179 default: 180 grpclog.Errorln("Unknown update.Op ", update.Op) 181 } 182 } 183 // Make a copy of rr.addrs and write it onto rr.addrCh so that gRPC internals gets notified. 184 open := make([]Address, len(rr.addrs)) 185 for i, v := range rr.addrs { 186 open[i] = v.addr 187 } 188 if rr.done { 189 return ErrClientConnClosing 190 } 191 select { 192 case <-rr.addrCh: 193 default: 194 } 195 rr.addrCh <- open 196 return nil 197} 198 199func (rr *roundRobin) Start(target string, config BalancerConfig) error { 200 rr.mu.Lock() 201 defer rr.mu.Unlock() 202 if rr.done { 203 return ErrClientConnClosing 204 } 205 if rr.r == nil { 206 // If there is no name resolver installed, it is not needed to 207 // do name resolution. In this case, target is added into rr.addrs 208 // as the only address available and rr.addrCh stays nil. 209 rr.addrs = append(rr.addrs, &addrInfo{addr: Address{Addr: target}}) 210 return nil 211 } 212 w, err := rr.r.Resolve(target) 213 if err != nil { 214 return err 215 } 216 rr.w = w 217 rr.addrCh = make(chan []Address, 1) 218 go func() { 219 for { 220 if err := rr.watchAddrUpdates(); err != nil { 221 return 222 } 223 } 224 }() 225 return nil 226} 227 228// Up sets the connected state of addr and sends notification if there are pending 229// Get() calls. 230func (rr *roundRobin) Up(addr Address) func(error) { 231 rr.mu.Lock() 232 defer rr.mu.Unlock() 233 var cnt int 234 for _, a := range rr.addrs { 235 if a.addr == addr { 236 if a.connected { 237 return nil 238 } 239 a.connected = true 240 } 241 if a.connected { 242 cnt++ 243 } 244 } 245 // addr is only one which is connected. Notify the Get() callers who are blocking. 246 if cnt == 1 && rr.waitCh != nil { 247 close(rr.waitCh) 248 rr.waitCh = nil 249 } 250 return func(err error) { 251 rr.down(addr, err) 252 } 253} 254 255// down unsets the connected state of addr. 256func (rr *roundRobin) down(addr Address, err error) { 257 rr.mu.Lock() 258 defer rr.mu.Unlock() 259 for _, a := range rr.addrs { 260 if addr == a.addr { 261 a.connected = false 262 break 263 } 264 } 265} 266 267// Get returns the next addr in the rotation. 268func (rr *roundRobin) Get(ctx context.Context, opts BalancerGetOptions) (addr Address, put func(), err error) { 269 var ch chan struct{} 270 rr.mu.Lock() 271 if rr.done { 272 rr.mu.Unlock() 273 err = ErrClientConnClosing 274 return 275 } 276 277 if len(rr.addrs) > 0 { 278 if rr.next >= len(rr.addrs) { 279 rr.next = 0 280 } 281 next := rr.next 282 for { 283 a := rr.addrs[next] 284 next = (next + 1) % len(rr.addrs) 285 if a.connected { 286 addr = a.addr 287 rr.next = next 288 rr.mu.Unlock() 289 return 290 } 291 if next == rr.next { 292 // Has iterated all the possible address but none is connected. 293 break 294 } 295 } 296 } 297 if !opts.BlockingWait { 298 if len(rr.addrs) == 0 { 299 rr.mu.Unlock() 300 err = status.Errorf(codes.Unavailable, "there is no address available") 301 return 302 } 303 // Returns the next addr on rr.addrs for failfast RPCs. 304 addr = rr.addrs[rr.next].addr 305 rr.next++ 306 rr.mu.Unlock() 307 return 308 } 309 // Wait on rr.waitCh for non-failfast RPCs. 310 if rr.waitCh == nil { 311 ch = make(chan struct{}) 312 rr.waitCh = ch 313 } else { 314 ch = rr.waitCh 315 } 316 rr.mu.Unlock() 317 for { 318 select { 319 case <-ctx.Done(): 320 err = ctx.Err() 321 return 322 case <-ch: 323 rr.mu.Lock() 324 if rr.done { 325 rr.mu.Unlock() 326 err = ErrClientConnClosing 327 return 328 } 329 330 if len(rr.addrs) > 0 { 331 if rr.next >= len(rr.addrs) { 332 rr.next = 0 333 } 334 next := rr.next 335 for { 336 a := rr.addrs[next] 337 next = (next + 1) % len(rr.addrs) 338 if a.connected { 339 addr = a.addr 340 rr.next = next 341 rr.mu.Unlock() 342 return 343 } 344 if next == rr.next { 345 // Has iterated all the possible address but none is connected. 346 break 347 } 348 } 349 } 350 // The newly added addr got removed by Down() again. 351 if rr.waitCh == nil { 352 ch = make(chan struct{}) 353 rr.waitCh = ch 354 } else { 355 ch = rr.waitCh 356 } 357 rr.mu.Unlock() 358 } 359 } 360} 361 362func (rr *roundRobin) Notify() <-chan []Address { 363 return rr.addrCh 364} 365 366func (rr *roundRobin) Close() error { 367 rr.mu.Lock() 368 defer rr.mu.Unlock() 369 if rr.done { 370 return errBalancerClosed 371 } 372 rr.done = true 373 if rr.w != nil { 374 rr.w.Close() 375 } 376 if rr.waitCh != nil { 377 close(rr.waitCh) 378 rr.waitCh = nil 379 } 380 if rr.addrCh != nil { 381 close(rr.addrCh) 382 } 383 return nil 384} 385 386// pickFirst is used to test multi-addresses in one addrConn in which all addresses share the same addrConn. 387// It is a wrapper around roundRobin balancer. The logic of all methods works fine because balancer.Get() 388// returns the only address Up by resetTransport(). 389type pickFirst struct { 390 *roundRobin 391} 392