1// Copyright 2018 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package balancer 16 17import ( 18 "context" 19 "errors" 20 "io/ioutil" 21 "net/url" 22 "strings" 23 "sync" 24 "time" 25 26 "google.golang.org/grpc" 27 "google.golang.org/grpc/codes" 28 "google.golang.org/grpc/grpclog" 29 healthpb "google.golang.org/grpc/health/grpc_health_v1" 30 "google.golang.org/grpc/status" 31) 32 33// TODO: replace with something better 34var lg = grpclog.NewLoggerV2(ioutil.Discard, ioutil.Discard, ioutil.Discard) 35 36const ( 37 minHealthRetryDuration = 3 * time.Second 38 unknownService = "unknown service grpc.health.v1.Health" 39) 40 41// ErrNoAddrAvailable is returned by Get() when the balancer does not have 42// any active connection to endpoints at the time. 43// This error is returned only when opts.BlockingWait is true. 44var ErrNoAddrAvailable = status.Error(codes.Unavailable, "there is no address available") 45 46type NotifyMsg int 47 48const ( 49 NotifyReset NotifyMsg = iota 50 NotifyNext 51) 52 53// GRPC17Health does the bare minimum to expose multiple eps 54// to the grpc reconnection code path 55type GRPC17Health struct { 56 // addrs are the client's endpoint addresses for grpc 57 addrs []grpc.Address 58 59 // eps holds the raw endpoints from the client 60 eps []string 61 62 // notifyCh notifies grpc of the set of addresses for connecting 63 notifyCh chan []grpc.Address 64 65 // readyc closes once the first connection is up 66 readyc chan struct{} 67 readyOnce sync.Once 68 69 // healthCheck checks an endpoint's health. 70 healthCheck func(ep string) (bool, error) 71 healthCheckTimeout time.Duration 72 73 unhealthyMu sync.RWMutex 74 unhealthyHostPorts map[string]time.Time 75 76 // mu protects all fields below. 77 mu sync.RWMutex 78 79 // upc closes when pinAddr transitions from empty to non-empty or the balancer closes. 80 upc chan struct{} 81 82 // downc closes when grpc calls down() on pinAddr 83 downc chan struct{} 84 85 // stopc is closed to signal updateNotifyLoop should stop. 86 stopc chan struct{} 87 stopOnce sync.Once 88 wg sync.WaitGroup 89 90 // donec closes when all goroutines are exited 91 donec chan struct{} 92 93 // updateAddrsC notifies updateNotifyLoop to update addrs. 94 updateAddrsC chan NotifyMsg 95 96 // grpc issues TLS cert checks using the string passed into dial so 97 // that string must be the host. To recover the full scheme://host URL, 98 // have a map from hosts to the original endpoint. 99 hostPort2ep map[string]string 100 101 // pinAddr is the currently pinned address; set to the empty string on 102 // initialization and shutdown. 103 pinAddr string 104 105 closed bool 106} 107 108// DialFunc defines gRPC dial function. 109type DialFunc func(ep string, dopts ...grpc.DialOption) (*grpc.ClientConn, error) 110 111// NewGRPC17Health returns a new health balancer with gRPC v1.7. 112func NewGRPC17Health( 113 eps []string, 114 timeout time.Duration, 115 dialFunc DialFunc, 116) *GRPC17Health { 117 notifyCh := make(chan []grpc.Address) 118 addrs := eps2addrs(eps) 119 hb := &GRPC17Health{ 120 addrs: addrs, 121 eps: eps, 122 notifyCh: notifyCh, 123 readyc: make(chan struct{}), 124 healthCheck: func(ep string) (bool, error) { return grpcHealthCheck(ep, dialFunc) }, 125 unhealthyHostPorts: make(map[string]time.Time), 126 upc: make(chan struct{}), 127 stopc: make(chan struct{}), 128 downc: make(chan struct{}), 129 donec: make(chan struct{}), 130 updateAddrsC: make(chan NotifyMsg), 131 hostPort2ep: getHostPort2ep(eps), 132 } 133 if timeout < minHealthRetryDuration { 134 timeout = minHealthRetryDuration 135 } 136 hb.healthCheckTimeout = timeout 137 138 close(hb.downc) 139 go hb.updateNotifyLoop() 140 hb.wg.Add(1) 141 go func() { 142 defer hb.wg.Done() 143 hb.updateUnhealthy() 144 }() 145 return hb 146} 147 148func (b *GRPC17Health) Start(target string, config grpc.BalancerConfig) error { return nil } 149 150func (b *GRPC17Health) ConnectNotify() <-chan struct{} { 151 b.mu.Lock() 152 defer b.mu.Unlock() 153 return b.upc 154} 155 156func (b *GRPC17Health) UpdateAddrsC() chan NotifyMsg { return b.updateAddrsC } 157func (b *GRPC17Health) StopC() chan struct{} { return b.stopc } 158 159func (b *GRPC17Health) Ready() <-chan struct{} { return b.readyc } 160 161func (b *GRPC17Health) Endpoint(hostPort string) string { 162 b.mu.RLock() 163 defer b.mu.RUnlock() 164 return b.hostPort2ep[hostPort] 165} 166 167func (b *GRPC17Health) Pinned() string { 168 b.mu.RLock() 169 defer b.mu.RUnlock() 170 return b.pinAddr 171} 172 173func (b *GRPC17Health) HostPortError(hostPort string, err error) { 174 if b.Endpoint(hostPort) == "" { 175 lg.Infof("clientv3/balancer: %q is stale (skip marking as unhealthy on %q)", hostPort, err.Error()) 176 return 177 } 178 179 b.unhealthyMu.Lock() 180 b.unhealthyHostPorts[hostPort] = time.Now() 181 b.unhealthyMu.Unlock() 182 lg.Infof("clientv3/balancer: %q is marked unhealthy (%q)", hostPort, err.Error()) 183} 184 185func (b *GRPC17Health) removeUnhealthy(hostPort, msg string) { 186 if b.Endpoint(hostPort) == "" { 187 lg.Infof("clientv3/balancer: %q was not in unhealthy (%q)", hostPort, msg) 188 return 189 } 190 191 b.unhealthyMu.Lock() 192 delete(b.unhealthyHostPorts, hostPort) 193 b.unhealthyMu.Unlock() 194 lg.Infof("clientv3/balancer: %q is removed from unhealthy (%q)", hostPort, msg) 195} 196 197func (b *GRPC17Health) countUnhealthy() (count int) { 198 b.unhealthyMu.RLock() 199 count = len(b.unhealthyHostPorts) 200 b.unhealthyMu.RUnlock() 201 return count 202} 203 204func (b *GRPC17Health) isUnhealthy(hostPort string) (unhealthy bool) { 205 b.unhealthyMu.RLock() 206 _, unhealthy = b.unhealthyHostPorts[hostPort] 207 b.unhealthyMu.RUnlock() 208 return unhealthy 209} 210 211func (b *GRPC17Health) cleanupUnhealthy() { 212 b.unhealthyMu.Lock() 213 for k, v := range b.unhealthyHostPorts { 214 if time.Since(v) > b.healthCheckTimeout { 215 delete(b.unhealthyHostPorts, k) 216 lg.Infof("clientv3/balancer: removed %q from unhealthy after %v", k, b.healthCheckTimeout) 217 } 218 } 219 b.unhealthyMu.Unlock() 220} 221 222func (b *GRPC17Health) liveAddrs() ([]grpc.Address, map[string]struct{}) { 223 unhealthyCnt := b.countUnhealthy() 224 225 b.mu.RLock() 226 defer b.mu.RUnlock() 227 228 hbAddrs := b.addrs 229 if len(b.addrs) == 1 || unhealthyCnt == 0 || unhealthyCnt == len(b.addrs) { 230 liveHostPorts := make(map[string]struct{}, len(b.hostPort2ep)) 231 for k := range b.hostPort2ep { 232 liveHostPorts[k] = struct{}{} 233 } 234 return hbAddrs, liveHostPorts 235 } 236 237 addrs := make([]grpc.Address, 0, len(b.addrs)-unhealthyCnt) 238 liveHostPorts := make(map[string]struct{}, len(addrs)) 239 for _, addr := range b.addrs { 240 if !b.isUnhealthy(addr.Addr) { 241 addrs = append(addrs, addr) 242 liveHostPorts[addr.Addr] = struct{}{} 243 } 244 } 245 return addrs, liveHostPorts 246} 247 248func (b *GRPC17Health) updateUnhealthy() { 249 for { 250 select { 251 case <-time.After(b.healthCheckTimeout): 252 b.cleanupUnhealthy() 253 pinned := b.Pinned() 254 if pinned == "" || b.isUnhealthy(pinned) { 255 select { 256 case b.updateAddrsC <- NotifyNext: 257 case <-b.stopc: 258 return 259 } 260 } 261 case <-b.stopc: 262 return 263 } 264 } 265} 266 267// NeedUpdate returns true if all connections are down or 268// addresses do not include current pinned address. 269func (b *GRPC17Health) NeedUpdate() bool { 270 // updating notifyCh can trigger new connections, 271 // need update addrs if all connections are down 272 // or addrs does not include pinAddr. 273 b.mu.RLock() 274 update := !hasAddr(b.addrs, b.pinAddr) 275 b.mu.RUnlock() 276 return update 277} 278 279func (b *GRPC17Health) UpdateAddrs(eps ...string) { 280 np := getHostPort2ep(eps) 281 282 b.mu.Lock() 283 defer b.mu.Unlock() 284 285 match := len(np) == len(b.hostPort2ep) 286 if match { 287 for k, v := range np { 288 if b.hostPort2ep[k] != v { 289 match = false 290 break 291 } 292 } 293 } 294 if match { 295 // same endpoints, so no need to update address 296 return 297 } 298 299 b.hostPort2ep = np 300 b.addrs, b.eps = eps2addrs(eps), eps 301 302 b.unhealthyMu.Lock() 303 b.unhealthyHostPorts = make(map[string]time.Time) 304 b.unhealthyMu.Unlock() 305} 306 307func (b *GRPC17Health) Next() { 308 b.mu.RLock() 309 downc := b.downc 310 b.mu.RUnlock() 311 select { 312 case b.updateAddrsC <- NotifyNext: 313 case <-b.stopc: 314 } 315 // wait until disconnect so new RPCs are not issued on old connection 316 select { 317 case <-downc: 318 case <-b.stopc: 319 } 320} 321 322func (b *GRPC17Health) updateNotifyLoop() { 323 defer close(b.donec) 324 325 for { 326 b.mu.RLock() 327 upc, downc, addr := b.upc, b.downc, b.pinAddr 328 b.mu.RUnlock() 329 // downc or upc should be closed 330 select { 331 case <-downc: 332 downc = nil 333 default: 334 } 335 select { 336 case <-upc: 337 upc = nil 338 default: 339 } 340 switch { 341 case downc == nil && upc == nil: 342 // stale 343 select { 344 case <-b.stopc: 345 return 346 default: 347 } 348 case downc == nil: 349 b.notifyAddrs(NotifyReset) 350 select { 351 case <-upc: 352 case msg := <-b.updateAddrsC: 353 b.notifyAddrs(msg) 354 case <-b.stopc: 355 return 356 } 357 case upc == nil: 358 select { 359 // close connections that are not the pinned address 360 case b.notifyCh <- []grpc.Address{{Addr: addr}}: 361 case <-downc: 362 case <-b.stopc: 363 return 364 } 365 select { 366 case <-downc: 367 b.notifyAddrs(NotifyReset) 368 case msg := <-b.updateAddrsC: 369 b.notifyAddrs(msg) 370 case <-b.stopc: 371 return 372 } 373 } 374 } 375} 376 377func (b *GRPC17Health) notifyAddrs(msg NotifyMsg) { 378 if msg == NotifyNext { 379 select { 380 case b.notifyCh <- []grpc.Address{}: 381 case <-b.stopc: 382 return 383 } 384 } 385 b.mu.RLock() 386 pinAddr := b.pinAddr 387 downc := b.downc 388 b.mu.RUnlock() 389 addrs, hostPorts := b.liveAddrs() 390 391 var waitDown bool 392 if pinAddr != "" { 393 _, ok := hostPorts[pinAddr] 394 waitDown = !ok 395 } 396 397 select { 398 case b.notifyCh <- addrs: 399 if waitDown { 400 select { 401 case <-downc: 402 case <-b.stopc: 403 } 404 } 405 case <-b.stopc: 406 } 407} 408 409func (b *GRPC17Health) Up(addr grpc.Address) func(error) { 410 if !b.mayPin(addr) { 411 return func(err error) {} 412 } 413 414 b.mu.Lock() 415 defer b.mu.Unlock() 416 417 // gRPC might call Up after it called Close. We add this check 418 // to "fix" it up at application layer. Otherwise, will panic 419 // if b.upc is already closed. 420 if b.closed { 421 return func(err error) {} 422 } 423 424 // gRPC might call Up on a stale address. 425 // Prevent updating pinAddr with a stale address. 426 if !hasAddr(b.addrs, addr.Addr) { 427 return func(err error) {} 428 } 429 430 if b.pinAddr != "" { 431 lg.Infof("clientv3/balancer: %q is up but not pinned (already pinned %q)", addr.Addr, b.pinAddr) 432 return func(err error) {} 433 } 434 435 // notify waiting Get()s and pin first connected address 436 close(b.upc) 437 b.downc = make(chan struct{}) 438 b.pinAddr = addr.Addr 439 lg.Infof("clientv3/balancer: pin %q", addr.Addr) 440 441 // notify client that a connection is up 442 b.readyOnce.Do(func() { close(b.readyc) }) 443 444 return func(err error) { 445 // If connected to a black hole endpoint or a killed server, the gRPC ping 446 // timeout will induce a network I/O error, and retrying until success; 447 // finding healthy endpoint on retry could take several timeouts and redials. 448 // To avoid wasting retries, gray-list unhealthy endpoints. 449 b.HostPortError(addr.Addr, err) 450 451 b.mu.Lock() 452 b.upc = make(chan struct{}) 453 close(b.downc) 454 b.pinAddr = "" 455 b.mu.Unlock() 456 lg.Infof("clientv3/balancer: unpin %q (%q)", addr.Addr, err.Error()) 457 } 458} 459 460func (b *GRPC17Health) mayPin(addr grpc.Address) bool { 461 if b.Endpoint(addr.Addr) == "" { // stale host:port 462 return false 463 } 464 465 b.unhealthyMu.RLock() 466 unhealthyCnt := len(b.unhealthyHostPorts) 467 failedTime, bad := b.unhealthyHostPorts[addr.Addr] 468 b.unhealthyMu.RUnlock() 469 470 b.mu.RLock() 471 skip := len(b.addrs) == 1 || unhealthyCnt == 0 || len(b.addrs) == unhealthyCnt 472 b.mu.RUnlock() 473 if skip || !bad { 474 return true 475 } 476 477 // prevent isolated member's endpoint from being infinitely retried, as follows: 478 // 1. keepalive pings detects GoAway with http2.ErrCodeEnhanceYourCalm 479 // 2. balancer 'Up' unpins with grpc: failed with network I/O error 480 // 3. grpc-healthcheck still SERVING, thus retry to pin 481 // instead, return before grpc-healthcheck if failed within healthcheck timeout 482 if elapsed := time.Since(failedTime); elapsed < b.healthCheckTimeout { 483 lg.Infof("clientv3/balancer: %q is up but not pinned (failed %v ago, require minimum %v after failure)", addr.Addr, elapsed, b.healthCheckTimeout) 484 return false 485 } 486 487 if ok, _ := b.healthCheck(addr.Addr); ok { 488 b.removeUnhealthy(addr.Addr, "health check success") 489 return true 490 } 491 492 b.HostPortError(addr.Addr, errors.New("health check failed")) 493 return false 494} 495 496func (b *GRPC17Health) Get(ctx context.Context, opts grpc.BalancerGetOptions) (grpc.Address, func(), error) { 497 var ( 498 addr string 499 closed bool 500 ) 501 502 // If opts.BlockingWait is false (for fail-fast RPCs), it should return 503 // an address it has notified via Notify immediately instead of blocking. 504 if !opts.BlockingWait { 505 b.mu.RLock() 506 closed = b.closed 507 addr = b.pinAddr 508 b.mu.RUnlock() 509 if closed { 510 return grpc.Address{Addr: ""}, nil, grpc.ErrClientConnClosing 511 } 512 if addr == "" { 513 return grpc.Address{Addr: ""}, nil, ErrNoAddrAvailable 514 } 515 return grpc.Address{Addr: addr}, func() {}, nil 516 } 517 518 for { 519 b.mu.RLock() 520 ch := b.upc 521 b.mu.RUnlock() 522 select { 523 case <-ch: 524 case <-b.donec: 525 return grpc.Address{Addr: ""}, nil, grpc.ErrClientConnClosing 526 case <-ctx.Done(): 527 return grpc.Address{Addr: ""}, nil, ctx.Err() 528 } 529 b.mu.RLock() 530 closed = b.closed 531 addr = b.pinAddr 532 b.mu.RUnlock() 533 // Close() which sets b.closed = true can be called before Get(), Get() must exit if balancer is closed. 534 if closed { 535 return grpc.Address{Addr: ""}, nil, grpc.ErrClientConnClosing 536 } 537 if addr != "" { 538 break 539 } 540 } 541 return grpc.Address{Addr: addr}, func() {}, nil 542} 543 544func (b *GRPC17Health) Notify() <-chan []grpc.Address { return b.notifyCh } 545 546func (b *GRPC17Health) Close() error { 547 b.mu.Lock() 548 // In case gRPC calls close twice. TODO: remove the checking 549 // when we are sure that gRPC wont call close twice. 550 if b.closed { 551 b.mu.Unlock() 552 <-b.donec 553 return nil 554 } 555 b.closed = true 556 b.stopOnce.Do(func() { close(b.stopc) }) 557 b.pinAddr = "" 558 559 // In the case of following scenario: 560 // 1. upc is not closed; no pinned address 561 // 2. client issues an RPC, calling invoke(), which calls Get(), enters for loop, blocks 562 // 3. client.conn.Close() calls balancer.Close(); closed = true 563 // 4. for loop in Get() never exits since ctx is the context passed in by the client and may not be canceled 564 // we must close upc so Get() exits from blocking on upc 565 select { 566 case <-b.upc: 567 default: 568 // terminate all waiting Get()s 569 close(b.upc) 570 } 571 572 b.mu.Unlock() 573 b.wg.Wait() 574 575 // wait for updateNotifyLoop to finish 576 <-b.donec 577 close(b.notifyCh) 578 579 return nil 580} 581 582func grpcHealthCheck(ep string, dialFunc func(ep string, dopts ...grpc.DialOption) (*grpc.ClientConn, error)) (bool, error) { 583 conn, err := dialFunc(ep) 584 if err != nil { 585 return false, err 586 } 587 defer conn.Close() 588 cli := healthpb.NewHealthClient(conn) 589 ctx, cancel := context.WithTimeout(context.Background(), time.Second) 590 resp, err := cli.Check(ctx, &healthpb.HealthCheckRequest{}) 591 cancel() 592 if err != nil { 593 if s, ok := status.FromError(err); ok && s.Code() == codes.Unavailable { 594 if s.Message() == unknownService { // etcd < v3.3.0 595 return true, nil 596 } 597 } 598 return false, err 599 } 600 return resp.Status == healthpb.HealthCheckResponse_SERVING, nil 601} 602 603func hasAddr(addrs []grpc.Address, targetAddr string) bool { 604 for _, addr := range addrs { 605 if targetAddr == addr.Addr { 606 return true 607 } 608 } 609 return false 610} 611 612func getHost(ep string) string { 613 url, uerr := url.Parse(ep) 614 if uerr != nil || !strings.Contains(ep, "://") { 615 return ep 616 } 617 return url.Host 618} 619 620func eps2addrs(eps []string) []grpc.Address { 621 addrs := make([]grpc.Address, len(eps)) 622 for i := range eps { 623 addrs[i].Addr = getHost(eps[i]) 624 } 625 return addrs 626} 627 628func getHostPort2ep(eps []string) map[string]string { 629 hm := make(map[string]string, len(eps)) 630 for i := range eps { 631 _, host, _ := parseEndpoint(eps[i]) 632 hm[host] = eps[i] 633 } 634 return hm 635} 636 637func parseEndpoint(endpoint string) (proto string, host string, scheme string) { 638 proto = "tcp" 639 host = endpoint 640 url, uerr := url.Parse(endpoint) 641 if uerr != nil || !strings.Contains(endpoint, "://") { 642 return proto, host, scheme 643 } 644 scheme = url.Scheme 645 646 // strip scheme:// prefix since grpc dials by host 647 host = url.Host 648 switch url.Scheme { 649 case "http", "https": 650 case "unix", "unixs": 651 proto = "unix" 652 host = url.Host + url.Path 653 default: 654 proto, host = "", "" 655 } 656 return proto, host, scheme 657} 658