1/* 2 * 3 * Copyright 2016 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package grpc 20 21import ( 22 "context" 23 "fmt" 24 "math" 25 "strconv" 26 "sync" 27 "testing" 28 "time" 29 30 "google.golang.org/grpc/codes" 31 _ "google.golang.org/grpc/grpclog/glogger" 32 "google.golang.org/grpc/internal/leakcheck" 33 "google.golang.org/grpc/naming" 34 "google.golang.org/grpc/status" 35 36 // V1 balancer tests use passthrough resolver instead of dns. 37 // TODO(bar) remove this when removing v1 balaner entirely. 38 39 _ "google.golang.org/grpc/resolver/passthrough" 40) 41 42func pickFirstBalancerV1(r naming.Resolver) Balancer { 43 return &pickFirst{&roundRobin{r: r}} 44} 45 46type testWatcher struct { 47 // the channel to receives name resolution updates 48 update chan *naming.Update 49 // the side channel to get to know how many updates in a batch 50 side chan int 51 // the channel to notify update injector that the update reading is done 52 readDone chan int 53} 54 55func (w *testWatcher) Next() (updates []*naming.Update, err error) { 56 n := <-w.side 57 if n == 0 { 58 return nil, fmt.Errorf("w.side is closed") 59 } 60 for i := 0; i < n; i++ { 61 u := <-w.update 62 if u != nil { 63 updates = append(updates, u) 64 } 65 } 66 w.readDone <- 0 67 return 68} 69 70func (w *testWatcher) Close() { 71 close(w.side) 72} 73 74// Inject naming resolution updates to the testWatcher. 75func (w *testWatcher) inject(updates []*naming.Update) { 76 w.side <- len(updates) 77 for _, u := range updates { 78 w.update <- u 79 } 80 <-w.readDone 81} 82 83type testNameResolver struct { 84 w *testWatcher 85 addr string 86} 87 88func (r *testNameResolver) Resolve(target string) (naming.Watcher, error) { 89 r.w = &testWatcher{ 90 update: make(chan *naming.Update, 1), 91 side: make(chan int, 1), 92 readDone: make(chan int), 93 } 94 r.w.side <- 1 95 r.w.update <- &naming.Update{ 96 Op: naming.Add, 97 Addr: r.addr, 98 } 99 go func() { 100 <-r.w.readDone 101 }() 102 return r.w, nil 103} 104 105func startServers(t *testing.T, numServers int, maxStreams uint32) ([]*server, *testNameResolver, func()) { 106 var servers []*server 107 for i := 0; i < numServers; i++ { 108 s := newTestServer() 109 servers = append(servers, s) 110 go s.start(t, 0, maxStreams) 111 s.wait(t, 2*time.Second) 112 } 113 // Point to server[0] 114 addr := "localhost:" + servers[0].port 115 return servers, &testNameResolver{ 116 addr: addr, 117 }, func() { 118 for i := 0; i < numServers; i++ { 119 servers[i].stop() 120 } 121 } 122} 123 124func TestNameDiscovery(t *testing.T) { 125 defer leakcheck.Check(t) 126 // Start 2 servers on 2 ports. 127 numServers := 2 128 servers, r, cleanup := startServers(t, numServers, math.MaxUint32) 129 defer cleanup() 130 cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{})) 131 if err != nil { 132 t.Fatalf("Failed to create ClientConn: %v", err) 133 } 134 defer cc.Close() 135 req := "port" 136 var reply string 137 if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[0].port { 138 t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want %s", err, servers[0].port) 139 } 140 // Inject the name resolution change to remove servers[0] and add servers[1]. 141 var updates []*naming.Update 142 updates = append(updates, &naming.Update{ 143 Op: naming.Delete, 144 Addr: "localhost:" + servers[0].port, 145 }) 146 updates = append(updates, &naming.Update{ 147 Op: naming.Add, 148 Addr: "localhost:" + servers[1].port, 149 }) 150 r.w.inject(updates) 151 // Loop until the rpcs in flight talks to servers[1]. 152 for { 153 if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[1].port { 154 break 155 } 156 time.Sleep(10 * time.Millisecond) 157 } 158} 159 160func TestEmptyAddrs(t *testing.T) { 161 defer leakcheck.Check(t) 162 servers, r, cleanup := startServers(t, 1, math.MaxUint32) 163 defer cleanup() 164 cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{})) 165 if err != nil { 166 t.Fatalf("Failed to create ClientConn: %v", err) 167 } 168 defer cc.Close() 169 var reply string 170 if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply); err != nil || reply != expectedResponse { 171 t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, reply = %q, want %q, <nil>", err, reply, expectedResponse) 172 } 173 // Inject name resolution change to remove the server so that there is no address 174 // available after that. 175 u := &naming.Update{ 176 Op: naming.Delete, 177 Addr: "localhost:" + servers[0].port, 178 } 179 r.w.inject([]*naming.Update{u}) 180 // Loop until the above updates apply. 181 for { 182 time.Sleep(10 * time.Millisecond) 183 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) 184 if err := cc.Invoke(ctx, "/foo/bar", &expectedRequest, &reply); err != nil { 185 cancel() 186 break 187 } 188 cancel() 189 } 190} 191 192func TestRoundRobin(t *testing.T) { 193 defer leakcheck.Check(t) 194 // Start 3 servers on 3 ports. 195 numServers := 3 196 servers, r, cleanup := startServers(t, numServers, math.MaxUint32) 197 defer cleanup() 198 cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{})) 199 if err != nil { 200 t.Fatalf("Failed to create ClientConn: %v", err) 201 } 202 defer cc.Close() 203 // Add servers[1] to the service discovery. 204 u := &naming.Update{ 205 Op: naming.Add, 206 Addr: "localhost:" + servers[1].port, 207 } 208 r.w.inject([]*naming.Update{u}) 209 req := "port" 210 var reply string 211 // Loop until servers[1] is up 212 for { 213 if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[1].port { 214 break 215 } 216 time.Sleep(10 * time.Millisecond) 217 } 218 // Add server2[2] to the service discovery. 219 u = &naming.Update{ 220 Op: naming.Add, 221 Addr: "localhost:" + servers[2].port, 222 } 223 r.w.inject([]*naming.Update{u}) 224 // Loop until both servers[2] are up. 225 for { 226 if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[2].port { 227 break 228 } 229 time.Sleep(10 * time.Millisecond) 230 } 231 // Check the incoming RPCs served in a round-robin manner. 232 for i := 0; i < 10; i++ { 233 if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[i%numServers].port { 234 t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", i, err, servers[i%numServers].port) 235 } 236 } 237} 238 239func TestCloseWithPendingRPC(t *testing.T) { 240 defer leakcheck.Check(t) 241 servers, r, cleanup := startServers(t, 1, math.MaxUint32) 242 defer cleanup() 243 cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{})) 244 if err != nil { 245 t.Fatalf("Failed to create ClientConn: %v", err) 246 } 247 defer cc.Close() 248 var reply string 249 if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, FailFast(false)); err != nil { 250 t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want %s", err, servers[0].port) 251 } 252 // Remove the server. 253 updates := []*naming.Update{{ 254 Op: naming.Delete, 255 Addr: "localhost:" + servers[0].port, 256 }} 257 r.w.inject(updates) 258 // Loop until the above update applies. 259 for { 260 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) 261 if err := cc.Invoke(ctx, "/foo/bar", &expectedRequest, &reply, FailFast(false)); status.Code(err) == codes.DeadlineExceeded { 262 cancel() 263 break 264 } 265 time.Sleep(10 * time.Millisecond) 266 cancel() 267 } 268 // Issue 2 RPCs which should be completed with error status once cc is closed. 269 var wg sync.WaitGroup 270 wg.Add(2) 271 go func() { 272 defer wg.Done() 273 var reply string 274 if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, FailFast(false)); err == nil { 275 t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err) 276 } 277 }() 278 go func() { 279 defer wg.Done() 280 var reply string 281 time.Sleep(5 * time.Millisecond) 282 if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, FailFast(false)); err == nil { 283 t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err) 284 } 285 }() 286 time.Sleep(5 * time.Millisecond) 287 cc.Close() 288 wg.Wait() 289} 290 291func TestGetOnWaitChannel(t *testing.T) { 292 defer leakcheck.Check(t) 293 servers, r, cleanup := startServers(t, 1, math.MaxUint32) 294 defer cleanup() 295 cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{})) 296 if err != nil { 297 t.Fatalf("Failed to create ClientConn: %v", err) 298 } 299 defer cc.Close() 300 // Remove all servers so that all upcoming RPCs will block on waitCh. 301 updates := []*naming.Update{{ 302 Op: naming.Delete, 303 Addr: "localhost:" + servers[0].port, 304 }} 305 r.w.inject(updates) 306 for { 307 var reply string 308 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) 309 if err := cc.Invoke(ctx, "/foo/bar", &expectedRequest, &reply, FailFast(false)); status.Code(err) == codes.DeadlineExceeded { 310 cancel() 311 break 312 } 313 cancel() 314 time.Sleep(10 * time.Millisecond) 315 } 316 var wg sync.WaitGroup 317 wg.Add(1) 318 go func() { 319 defer wg.Done() 320 var reply string 321 if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, FailFast(false)); err != nil { 322 t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want <nil>", err) 323 } 324 }() 325 // Add a connected server to get the above RPC through. 326 updates = []*naming.Update{{ 327 Op: naming.Add, 328 Addr: "localhost:" + servers[0].port, 329 }} 330 r.w.inject(updates) 331 // Wait until the above RPC succeeds. 332 wg.Wait() 333} 334 335func TestOneServerDown(t *testing.T) { 336 defer leakcheck.Check(t) 337 // Start 2 servers. 338 numServers := 2 339 servers, r, cleanup := startServers(t, numServers, math.MaxUint32) 340 defer cleanup() 341 cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}), WithWaitForHandshake()) 342 if err != nil { 343 t.Fatalf("Failed to create ClientConn: %v", err) 344 } 345 defer cc.Close() 346 // Add servers[1] to the service discovery. 347 var updates []*naming.Update 348 updates = append(updates, &naming.Update{ 349 Op: naming.Add, 350 Addr: "localhost:" + servers[1].port, 351 }) 352 r.w.inject(updates) 353 req := "port" 354 var reply string 355 // Loop until servers[1] is up 356 for { 357 if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[1].port { 358 break 359 } 360 time.Sleep(10 * time.Millisecond) 361 } 362 363 var wg sync.WaitGroup 364 numRPC := 100 365 sleepDuration := 10 * time.Millisecond 366 wg.Add(1) 367 go func() { 368 time.Sleep(sleepDuration) 369 // After sleepDuration, kill server[0]. 370 servers[0].stop() 371 wg.Done() 372 }() 373 374 // All non-failfast RPCs should not block because there's at least one connection available. 375 for i := 0; i < numRPC; i++ { 376 wg.Add(1) 377 go func() { 378 time.Sleep(sleepDuration) 379 // After sleepDuration, invoke RPC. 380 // server[0] is killed around the same time to make it racy between balancer and gRPC internals. 381 cc.Invoke(context.Background(), "/foo/bar", &req, &reply, FailFast(false)) 382 wg.Done() 383 }() 384 } 385 wg.Wait() 386} 387 388func TestOneAddressRemoval(t *testing.T) { 389 defer leakcheck.Check(t) 390 // Start 2 servers. 391 numServers := 2 392 servers, r, cleanup := startServers(t, numServers, math.MaxUint32) 393 defer cleanup() 394 cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{})) 395 if err != nil { 396 t.Fatalf("Failed to create ClientConn: %v", err) 397 } 398 defer cc.Close() 399 // Add servers[1] to the service discovery. 400 var updates []*naming.Update 401 updates = append(updates, &naming.Update{ 402 Op: naming.Add, 403 Addr: "localhost:" + servers[1].port, 404 }) 405 r.w.inject(updates) 406 req := "port" 407 var reply string 408 // Loop until servers[1] is up 409 for { 410 if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[1].port { 411 break 412 } 413 time.Sleep(10 * time.Millisecond) 414 } 415 416 var wg sync.WaitGroup 417 numRPC := 100 418 sleepDuration := 10 * time.Millisecond 419 wg.Add(1) 420 go func() { 421 time.Sleep(sleepDuration) 422 // After sleepDuration, delete server[0]. 423 var updates []*naming.Update 424 updates = append(updates, &naming.Update{ 425 Op: naming.Delete, 426 Addr: "localhost:" + servers[0].port, 427 }) 428 r.w.inject(updates) 429 wg.Done() 430 }() 431 432 // All non-failfast RPCs should not fail because there's at least one connection available. 433 for i := 0; i < numRPC; i++ { 434 wg.Add(1) 435 go func() { 436 var reply string 437 time.Sleep(sleepDuration) 438 // After sleepDuration, invoke RPC. 439 // server[0] is removed around the same time to make it racy between balancer and gRPC internals. 440 if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, FailFast(false)); err != nil { 441 t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want nil", err) 442 } 443 wg.Done() 444 }() 445 } 446 wg.Wait() 447} 448 449func checkServerUp(t *testing.T, currentServer *server) { 450 req := "port" 451 port := currentServer.port 452 cc, err := Dial("passthrough:///localhost:"+port, WithBlock(), WithInsecure(), WithCodec(testCodec{})) 453 if err != nil { 454 t.Fatalf("Failed to create ClientConn: %v", err) 455 } 456 defer cc.Close() 457 var reply string 458 for { 459 if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == port { 460 break 461 } 462 time.Sleep(10 * time.Millisecond) 463 } 464} 465 466func TestPickFirstEmptyAddrs(t *testing.T) { 467 defer leakcheck.Check(t) 468 servers, r, cleanup := startServers(t, 1, math.MaxUint32) 469 defer cleanup() 470 cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(pickFirstBalancerV1(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{})) 471 if err != nil { 472 t.Fatalf("Failed to create ClientConn: %v", err) 473 } 474 defer cc.Close() 475 var reply string 476 if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply); err != nil || reply != expectedResponse { 477 t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, reply = %q, want %q, <nil>", err, reply, expectedResponse) 478 } 479 // Inject name resolution change to remove the server so that there is no address 480 // available after that. 481 u := &naming.Update{ 482 Op: naming.Delete, 483 Addr: "localhost:" + servers[0].port, 484 } 485 r.w.inject([]*naming.Update{u}) 486 // Loop until the above updates apply. 487 for { 488 time.Sleep(10 * time.Millisecond) 489 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) 490 if err := cc.Invoke(ctx, "/foo/bar", &expectedRequest, &reply); err != nil { 491 cancel() 492 break 493 } 494 cancel() 495 } 496} 497 498func TestPickFirstCloseWithPendingRPC(t *testing.T) { 499 defer leakcheck.Check(t) 500 servers, r, cleanup := startServers(t, 1, math.MaxUint32) 501 defer cleanup() 502 cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(pickFirstBalancerV1(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{})) 503 if err != nil { 504 t.Fatalf("Failed to create ClientConn: %v", err) 505 } 506 defer cc.Close() 507 var reply string 508 if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, FailFast(false)); err != nil { 509 t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want %s", err, servers[0].port) 510 } 511 // Remove the server. 512 updates := []*naming.Update{{ 513 Op: naming.Delete, 514 Addr: "localhost:" + servers[0].port, 515 }} 516 r.w.inject(updates) 517 // Loop until the above update applies. 518 for { 519 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) 520 if err := cc.Invoke(ctx, "/foo/bar", &expectedRequest, &reply, FailFast(false)); status.Code(err) == codes.DeadlineExceeded { 521 cancel() 522 break 523 } 524 time.Sleep(10 * time.Millisecond) 525 cancel() 526 } 527 // Issue 2 RPCs which should be completed with error status once cc is closed. 528 var wg sync.WaitGroup 529 wg.Add(2) 530 go func() { 531 defer wg.Done() 532 var reply string 533 if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, FailFast(false)); err == nil { 534 t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err) 535 } 536 }() 537 go func() { 538 defer wg.Done() 539 var reply string 540 time.Sleep(5 * time.Millisecond) 541 if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, FailFast(false)); err == nil { 542 t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err) 543 } 544 }() 545 time.Sleep(5 * time.Millisecond) 546 cc.Close() 547 wg.Wait() 548} 549 550func TestPickFirstOrderAllServerUp(t *testing.T) { 551 defer leakcheck.Check(t) 552 // Start 3 servers on 3 ports. 553 numServers := 3 554 servers, r, cleanup := startServers(t, numServers, math.MaxUint32) 555 defer cleanup() 556 cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(pickFirstBalancerV1(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{})) 557 if err != nil { 558 t.Fatalf("Failed to create ClientConn: %v", err) 559 } 560 defer cc.Close() 561 // Add servers[1] and [2] to the service discovery. 562 u := &naming.Update{ 563 Op: naming.Add, 564 Addr: "localhost:" + servers[1].port, 565 } 566 r.w.inject([]*naming.Update{u}) 567 568 u = &naming.Update{ 569 Op: naming.Add, 570 Addr: "localhost:" + servers[2].port, 571 } 572 r.w.inject([]*naming.Update{u}) 573 574 // Loop until all 3 servers are up 575 checkServerUp(t, servers[0]) 576 checkServerUp(t, servers[1]) 577 checkServerUp(t, servers[2]) 578 579 // Check the incoming RPCs served in server[0] 580 req := "port" 581 var reply string 582 for i := 0; i < 20; i++ { 583 if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[0].port { 584 t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 0, err, servers[0].port) 585 } 586 time.Sleep(10 * time.Millisecond) 587 } 588 589 // Delete server[0] in the balancer, the incoming RPCs served in server[1] 590 // For test addrconn, close server[0] instead 591 u = &naming.Update{ 592 Op: naming.Delete, 593 Addr: "localhost:" + servers[0].port, 594 } 595 r.w.inject([]*naming.Update{u}) 596 // Loop until it changes to server[1] 597 for { 598 if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[1].port { 599 break 600 } 601 time.Sleep(10 * time.Millisecond) 602 } 603 for i := 0; i < 20; i++ { 604 if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[1].port { 605 t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port) 606 } 607 time.Sleep(10 * time.Millisecond) 608 } 609 610 // Add server[0] back to the balancer, the incoming RPCs served in server[1] 611 // Add is append operation, the order of Notify now is {server[1].port server[2].port server[0].port} 612 u = &naming.Update{ 613 Op: naming.Add, 614 Addr: "localhost:" + servers[0].port, 615 } 616 r.w.inject([]*naming.Update{u}) 617 for i := 0; i < 20; i++ { 618 if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[1].port { 619 t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port) 620 } 621 time.Sleep(10 * time.Millisecond) 622 } 623 624 // Delete server[1] in the balancer, the incoming RPCs served in server[2] 625 u = &naming.Update{ 626 Op: naming.Delete, 627 Addr: "localhost:" + servers[1].port, 628 } 629 r.w.inject([]*naming.Update{u}) 630 for { 631 if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[2].port { 632 break 633 } 634 time.Sleep(1 * time.Second) 635 } 636 for i := 0; i < 20; i++ { 637 if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[2].port { 638 t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 2, err, servers[2].port) 639 } 640 time.Sleep(10 * time.Millisecond) 641 } 642 643 // Delete server[2] in the balancer, the incoming RPCs served in server[0] 644 u = &naming.Update{ 645 Op: naming.Delete, 646 Addr: "localhost:" + servers[2].port, 647 } 648 r.w.inject([]*naming.Update{u}) 649 for { 650 if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[0].port { 651 break 652 } 653 time.Sleep(1 * time.Second) 654 } 655 for i := 0; i < 20; i++ { 656 if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[0].port { 657 t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 0, err, servers[0].port) 658 } 659 time.Sleep(10 * time.Millisecond) 660 } 661} 662 663func TestPickFirstOrderOneServerDown(t *testing.T) { 664 defer leakcheck.Check(t) 665 // Start 3 servers on 3 ports. 666 numServers := 3 667 servers, r, cleanup := startServers(t, numServers, math.MaxUint32) 668 defer cleanup() 669 cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(pickFirstBalancerV1(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}), WithWaitForHandshake()) 670 if err != nil { 671 t.Fatalf("Failed to create ClientConn: %v", err) 672 } 673 defer cc.Close() 674 // Add servers[1] and [2] to the service discovery. 675 u := &naming.Update{ 676 Op: naming.Add, 677 Addr: "localhost:" + servers[1].port, 678 } 679 r.w.inject([]*naming.Update{u}) 680 681 u = &naming.Update{ 682 Op: naming.Add, 683 Addr: "localhost:" + servers[2].port, 684 } 685 r.w.inject([]*naming.Update{u}) 686 687 // Loop until all 3 servers are up 688 checkServerUp(t, servers[0]) 689 checkServerUp(t, servers[1]) 690 checkServerUp(t, servers[2]) 691 692 // Check the incoming RPCs served in server[0] 693 req := "port" 694 var reply string 695 for i := 0; i < 20; i++ { 696 if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[0].port { 697 t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 0, err, servers[0].port) 698 } 699 time.Sleep(10 * time.Millisecond) 700 } 701 702 // server[0] down, incoming RPCs served in server[1], but the order of Notify still remains 703 // {server[0] server[1] server[2]} 704 servers[0].stop() 705 // Loop until it changes to server[1] 706 for { 707 if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[1].port { 708 break 709 } 710 time.Sleep(10 * time.Millisecond) 711 } 712 for i := 0; i < 20; i++ { 713 if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[1].port { 714 t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port) 715 } 716 time.Sleep(10 * time.Millisecond) 717 } 718 719 // up the server[0] back, the incoming RPCs served in server[1] 720 p, _ := strconv.Atoi(servers[0].port) 721 servers[0] = newTestServer() 722 go servers[0].start(t, p, math.MaxUint32) 723 defer servers[0].stop() 724 servers[0].wait(t, 2*time.Second) 725 checkServerUp(t, servers[0]) 726 727 for i := 0; i < 20; i++ { 728 if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[1].port { 729 t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port) 730 } 731 time.Sleep(10 * time.Millisecond) 732 } 733 734 // Delete server[1] in the balancer, the incoming RPCs served in server[0] 735 u = &naming.Update{ 736 Op: naming.Delete, 737 Addr: "localhost:" + servers[1].port, 738 } 739 r.w.inject([]*naming.Update{u}) 740 for { 741 if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[0].port { 742 break 743 } 744 time.Sleep(1 * time.Second) 745 } 746 for i := 0; i < 20; i++ { 747 if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[0].port { 748 t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 0, err, servers[0].port) 749 } 750 time.Sleep(10 * time.Millisecond) 751 } 752} 753 754func TestPickFirstOneAddressRemoval(t *testing.T) { 755 defer leakcheck.Check(t) 756 // Start 2 servers. 757 numServers := 2 758 servers, r, cleanup := startServers(t, numServers, math.MaxUint32) 759 defer cleanup() 760 cc, err := Dial("passthrough:///localhost:"+servers[0].port, WithBalancer(pickFirstBalancerV1(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{})) 761 if err != nil { 762 t.Fatalf("Failed to create ClientConn: %v", err) 763 } 764 defer cc.Close() 765 // Add servers[1] to the service discovery. 766 var updates []*naming.Update 767 updates = append(updates, &naming.Update{ 768 Op: naming.Add, 769 Addr: "localhost:" + servers[1].port, 770 }) 771 r.w.inject(updates) 772 773 // Create a new cc to Loop until servers[1] is up 774 checkServerUp(t, servers[0]) 775 checkServerUp(t, servers[1]) 776 777 var wg sync.WaitGroup 778 numRPC := 100 779 sleepDuration := 10 * time.Millisecond 780 wg.Add(1) 781 go func() { 782 time.Sleep(sleepDuration) 783 // After sleepDuration, delete server[0]. 784 var updates []*naming.Update 785 updates = append(updates, &naming.Update{ 786 Op: naming.Delete, 787 Addr: "localhost:" + servers[0].port, 788 }) 789 r.w.inject(updates) 790 wg.Done() 791 }() 792 793 // All non-failfast RPCs should not fail because there's at least one connection available. 794 for i := 0; i < numRPC; i++ { 795 wg.Add(1) 796 go func() { 797 var reply string 798 time.Sleep(sleepDuration) 799 // After sleepDuration, invoke RPC. 800 // server[0] is removed around the same time to make it racy between balancer and gRPC internals. 801 if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, FailFast(false)); err != nil { 802 t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want nil", err) 803 } 804 wg.Done() 805 }() 806 } 807 wg.Wait() 808} 809