1/* 2 * 3 * Copyright 2016 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package grpc 20 21import ( 22 "context" 23 "fmt" 24 "math" 25 "strconv" 26 "sync" 27 "testing" 28 "time" 29 30 "google.golang.org/grpc/codes" 31 "google.golang.org/grpc/naming" 32 "google.golang.org/grpc/status" 33) 34 35func pickFirstBalancerV1(r naming.Resolver) Balancer { 36 return &pickFirst{&roundRobin{r: r}} 37} 38 39type testWatcher struct { 40 // the channel to receives name resolution updates 41 update chan *naming.Update 42 // the side channel to get to know how many updates in a batch 43 side chan int 44 // the channel to notify update injector that the update reading is done 45 readDone chan int 46} 47 48func (w *testWatcher) Next() (updates []*naming.Update, err error) { 49 n := <-w.side 50 if n == 0 { 51 return nil, fmt.Errorf("w.side is closed") 52 } 53 for i := 0; i < n; i++ { 54 u := <-w.update 55 if u != nil { 56 updates = append(updates, u) 57 } 58 } 59 w.readDone <- 0 60 return 61} 62 63func (w *testWatcher) Close() { 64 close(w.side) 65} 66 67// Inject naming resolution updates to the testWatcher. 68func (w *testWatcher) inject(updates []*naming.Update) { 69 w.side <- len(updates) 70 for _, u := range updates { 71 w.update <- u 72 } 73 <-w.readDone 74} 75 76type testNameResolver struct { 77 w *testWatcher 78 addr string 79} 80 81func (r *testNameResolver) Resolve(target string) (naming.Watcher, error) { 82 r.w = &testWatcher{ 83 update: make(chan *naming.Update, 1), 84 side: make(chan int, 1), 85 readDone: make(chan int), 86 } 87 r.w.side <- 1 88 r.w.update <- &naming.Update{ 89 Op: naming.Add, 90 Addr: r.addr, 91 } 92 go func() { 93 <-r.w.readDone 94 }() 95 return r.w, nil 96} 97 98func startServers(t *testing.T, numServers int, maxStreams uint32) ([]*server, *testNameResolver, func()) { 99 var servers []*server 100 for i := 0; i < numServers; i++ { 101 s := newTestServer() 102 servers = append(servers, s) 103 go s.start(t, 0, maxStreams) 104 s.wait(t, 2*time.Second) 105 } 106 // Point to server[0] 107 addr := "localhost:" + servers[0].port 108 return servers, &testNameResolver{ 109 addr: addr, 110 }, func() { 111 for i := 0; i < numServers; i++ { 112 servers[i].stop() 113 } 114 } 115} 116 117func (s) TestNameDiscovery(t *testing.T) { 118 // Start 2 servers on 2 ports. 119 numServers := 2 120 servers, r, cleanup := startServers(t, numServers, math.MaxUint32) 121 defer cleanup() 122 cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{})) 123 if err != nil { 124 t.Fatalf("Failed to create ClientConn: %v", err) 125 } 126 defer cc.Close() 127 req := "port" 128 var reply string 129 if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[0].port { 130 t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want %s", err, servers[0].port) 131 } 132 // Inject the name resolution change to remove servers[0] and add servers[1]. 133 var updates []*naming.Update 134 updates = append(updates, &naming.Update{ 135 Op: naming.Delete, 136 Addr: "localhost:" + servers[0].port, 137 }) 138 updates = append(updates, &naming.Update{ 139 Op: naming.Add, 140 Addr: "localhost:" + servers[1].port, 141 }) 142 r.w.inject(updates) 143 // Loop until the rpcs in flight talks to servers[1]. 144 for { 145 if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[1].port { 146 break 147 } 148 time.Sleep(10 * time.Millisecond) 149 } 150} 151 152func (s) TestEmptyAddrs(t *testing.T) { 153 servers, r, cleanup := startServers(t, 1, math.MaxUint32) 154 defer cleanup() 155 cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{})) 156 if err != nil { 157 t.Fatalf("Failed to create ClientConn: %v", err) 158 } 159 defer cc.Close() 160 var reply string 161 if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply); err != nil || reply != expectedResponse { 162 t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, reply = %q, want %q, <nil>", err, reply, expectedResponse) 163 } 164 // Inject name resolution change to remove the server so that there is no address 165 // available after that. 166 u := &naming.Update{ 167 Op: naming.Delete, 168 Addr: "localhost:" + servers[0].port, 169 } 170 r.w.inject([]*naming.Update{u}) 171 // Loop until the above updates apply. 172 for { 173 time.Sleep(10 * time.Millisecond) 174 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) 175 if err := cc.Invoke(ctx, "/foo/bar", &expectedRequest, &reply); err != nil { 176 cancel() 177 break 178 } 179 cancel() 180 } 181} 182 183func (s) TestRoundRobin(t *testing.T) { 184 // Start 3 servers on 3 ports. 185 numServers := 3 186 servers, r, cleanup := startServers(t, numServers, math.MaxUint32) 187 defer cleanup() 188 cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{})) 189 if err != nil { 190 t.Fatalf("Failed to create ClientConn: %v", err) 191 } 192 defer cc.Close() 193 // Add servers[1] to the service discovery. 194 u := &naming.Update{ 195 Op: naming.Add, 196 Addr: "localhost:" + servers[1].port, 197 } 198 r.w.inject([]*naming.Update{u}) 199 req := "port" 200 var reply string 201 // Loop until servers[1] is up 202 for { 203 if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[1].port { 204 break 205 } 206 time.Sleep(10 * time.Millisecond) 207 } 208 // Add server2[2] to the service discovery. 209 u = &naming.Update{ 210 Op: naming.Add, 211 Addr: "localhost:" + servers[2].port, 212 } 213 r.w.inject([]*naming.Update{u}) 214 // Loop until both servers[2] are up. 215 for { 216 if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[2].port { 217 break 218 } 219 time.Sleep(10 * time.Millisecond) 220 } 221 // Check the incoming RPCs served in a round-robin manner. 222 for i := 0; i < 10; i++ { 223 if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[i%numServers].port { 224 t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", i, err, servers[i%numServers].port) 225 } 226 } 227} 228 229func (s) TestCloseWithPendingRPC(t *testing.T) { 230 servers, r, cleanup := startServers(t, 1, math.MaxUint32) 231 defer cleanup() 232 cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{})) 233 if err != nil { 234 t.Fatalf("Failed to create ClientConn: %v", err) 235 } 236 defer cc.Close() 237 var reply string 238 if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); err != nil { 239 t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want %s", err, servers[0].port) 240 } 241 // Remove the server. 242 updates := []*naming.Update{{ 243 Op: naming.Delete, 244 Addr: "localhost:" + servers[0].port, 245 }} 246 r.w.inject(updates) 247 // Loop until the above update applies. 248 for { 249 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) 250 if err := cc.Invoke(ctx, "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); status.Code(err) == codes.DeadlineExceeded { 251 cancel() 252 break 253 } 254 time.Sleep(10 * time.Millisecond) 255 cancel() 256 } 257 // Issue 2 RPCs which should be completed with error status once cc is closed. 258 var wg sync.WaitGroup 259 wg.Add(2) 260 go func() { 261 defer wg.Done() 262 var reply string 263 if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); err == nil { 264 t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err) 265 } 266 }() 267 go func() { 268 defer wg.Done() 269 var reply string 270 time.Sleep(5 * time.Millisecond) 271 if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); err == nil { 272 t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err) 273 } 274 }() 275 time.Sleep(5 * time.Millisecond) 276 cc.Close() 277 wg.Wait() 278} 279 280func (s) TestGetOnWaitChannel(t *testing.T) { 281 servers, r, cleanup := startServers(t, 1, math.MaxUint32) 282 defer cleanup() 283 cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{})) 284 if err != nil { 285 t.Fatalf("Failed to create ClientConn: %v", err) 286 } 287 defer cc.Close() 288 // Remove all servers so that all upcoming RPCs will block on waitCh. 289 updates := []*naming.Update{{ 290 Op: naming.Delete, 291 Addr: "localhost:" + servers[0].port, 292 }} 293 r.w.inject(updates) 294 for { 295 var reply string 296 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) 297 if err := cc.Invoke(ctx, "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); status.Code(err) == codes.DeadlineExceeded { 298 cancel() 299 break 300 } 301 cancel() 302 time.Sleep(10 * time.Millisecond) 303 } 304 var wg sync.WaitGroup 305 wg.Add(1) 306 go func() { 307 defer wg.Done() 308 var reply string 309 if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); err != nil { 310 t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want <nil>", err) 311 } 312 }() 313 // Add a connected server to get the above RPC through. 314 updates = []*naming.Update{{ 315 Op: naming.Add, 316 Addr: "localhost:" + servers[0].port, 317 }} 318 r.w.inject(updates) 319 // Wait until the above RPC succeeds. 320 wg.Wait() 321} 322 323func (s) TestOneServerDown(t *testing.T) { 324 // Start 2 servers. 325 numServers := 2 326 servers, r, cleanup := startServers(t, numServers, math.MaxUint32) 327 defer cleanup() 328 cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{})) 329 if err != nil { 330 t.Fatalf("Failed to create ClientConn: %v", err) 331 } 332 defer cc.Close() 333 // Add servers[1] to the service discovery. 334 var updates []*naming.Update 335 updates = append(updates, &naming.Update{ 336 Op: naming.Add, 337 Addr: "localhost:" + servers[1].port, 338 }) 339 r.w.inject(updates) 340 req := "port" 341 var reply string 342 // Loop until servers[1] is up 343 for { 344 if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[1].port { 345 break 346 } 347 time.Sleep(10 * time.Millisecond) 348 } 349 350 var wg sync.WaitGroup 351 numRPC := 100 352 sleepDuration := 10 * time.Millisecond 353 wg.Add(1) 354 go func() { 355 time.Sleep(sleepDuration) 356 // After sleepDuration, kill server[0]. 357 servers[0].stop() 358 wg.Done() 359 }() 360 361 // All non-failfast RPCs should not block because there's at least one connection available. 362 for i := 0; i < numRPC; i++ { 363 wg.Add(1) 364 go func() { 365 time.Sleep(sleepDuration) 366 // After sleepDuration, invoke RPC. 367 // server[0] is killed around the same time to make it racy between balancer and gRPC internals. 368 cc.Invoke(context.Background(), "/foo/bar", &req, &reply, WaitForReady(true)) 369 wg.Done() 370 }() 371 } 372 wg.Wait() 373} 374 375func (s) TestOneAddressRemoval(t *testing.T) { 376 // Start 2 servers. 377 numServers := 2 378 servers, r, cleanup := startServers(t, numServers, math.MaxUint32) 379 defer cleanup() 380 cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{})) 381 if err != nil { 382 t.Fatalf("Failed to create ClientConn: %v", err) 383 } 384 defer cc.Close() 385 // Add servers[1] to the service discovery. 386 var updates []*naming.Update 387 updates = append(updates, &naming.Update{ 388 Op: naming.Add, 389 Addr: "localhost:" + servers[1].port, 390 }) 391 r.w.inject(updates) 392 req := "port" 393 var reply string 394 // Loop until servers[1] is up 395 for { 396 if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[1].port { 397 break 398 } 399 time.Sleep(10 * time.Millisecond) 400 } 401 402 var wg sync.WaitGroup 403 numRPC := 100 404 sleepDuration := 10 * time.Millisecond 405 wg.Add(1) 406 go func() { 407 time.Sleep(sleepDuration) 408 // After sleepDuration, delete server[0]. 409 var updates []*naming.Update 410 updates = append(updates, &naming.Update{ 411 Op: naming.Delete, 412 Addr: "localhost:" + servers[0].port, 413 }) 414 r.w.inject(updates) 415 wg.Done() 416 }() 417 418 // All non-failfast RPCs should not fail because there's at least one connection available. 419 for i := 0; i < numRPC; i++ { 420 wg.Add(1) 421 go func() { 422 var reply string 423 time.Sleep(sleepDuration) 424 // After sleepDuration, invoke RPC. 425 // server[0] is removed around the same time to make it racy between balancer and gRPC internals. 426 if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); err != nil { 427 t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want nil", err) 428 } 429 wg.Done() 430 }() 431 } 432 wg.Wait() 433} 434 435func checkServerUp(t *testing.T, currentServer *server) { 436 req := "port" 437 port := currentServer.port 438 cc, err := Dial("passthrough:///localhost:"+port, WithBlock(), WithInsecure(), WithCodec(testCodec{})) 439 if err != nil { 440 t.Fatalf("Failed to create ClientConn: %v", err) 441 } 442 defer cc.Close() 443 var reply string 444 for { 445 if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == port { 446 break 447 } 448 time.Sleep(10 * time.Millisecond) 449 } 450} 451 452func (s) TestPickFirstEmptyAddrs(t *testing.T) { 453 servers, r, cleanup := startServers(t, 1, math.MaxUint32) 454 defer cleanup() 455 cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(pickFirstBalancerV1(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{})) 456 if err != nil { 457 t.Fatalf("Failed to create ClientConn: %v", err) 458 } 459 defer cc.Close() 460 var reply string 461 if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply); err != nil || reply != expectedResponse { 462 t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, reply = %q, want %q, <nil>", err, reply, expectedResponse) 463 } 464 // Inject name resolution change to remove the server so that there is no address 465 // available after that. 466 u := &naming.Update{ 467 Op: naming.Delete, 468 Addr: "localhost:" + servers[0].port, 469 } 470 r.w.inject([]*naming.Update{u}) 471 // Loop until the above updates apply. 472 for { 473 time.Sleep(10 * time.Millisecond) 474 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) 475 if err := cc.Invoke(ctx, "/foo/bar", &expectedRequest, &reply); err != nil { 476 cancel() 477 break 478 } 479 cancel() 480 } 481} 482 483func (s) TestPickFirstCloseWithPendingRPC(t *testing.T) { 484 servers, r, cleanup := startServers(t, 1, math.MaxUint32) 485 defer cleanup() 486 cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(pickFirstBalancerV1(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{})) 487 if err != nil { 488 t.Fatalf("Failed to create ClientConn: %v", err) 489 } 490 defer cc.Close() 491 var reply string 492 if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); err != nil { 493 t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want %s", err, servers[0].port) 494 } 495 // Remove the server. 496 updates := []*naming.Update{{ 497 Op: naming.Delete, 498 Addr: "localhost:" + servers[0].port, 499 }} 500 r.w.inject(updates) 501 // Loop until the above update applies. 502 for { 503 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) 504 if err := cc.Invoke(ctx, "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); status.Code(err) == codes.DeadlineExceeded { 505 cancel() 506 break 507 } 508 time.Sleep(10 * time.Millisecond) 509 cancel() 510 } 511 // Issue 2 RPCs which should be completed with error status once cc is closed. 512 var wg sync.WaitGroup 513 wg.Add(2) 514 go func() { 515 defer wg.Done() 516 var reply string 517 if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); err == nil { 518 t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err) 519 } 520 }() 521 go func() { 522 defer wg.Done() 523 var reply string 524 time.Sleep(5 * time.Millisecond) 525 if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); err == nil { 526 t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err) 527 } 528 }() 529 time.Sleep(5 * time.Millisecond) 530 cc.Close() 531 wg.Wait() 532} 533 534func (s) TestPickFirstOrderAllServerUp(t *testing.T) { 535 // Start 3 servers on 3 ports. 536 numServers := 3 537 servers, r, cleanup := startServers(t, numServers, math.MaxUint32) 538 defer cleanup() 539 cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(pickFirstBalancerV1(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{})) 540 if err != nil { 541 t.Fatalf("Failed to create ClientConn: %v", err) 542 } 543 defer cc.Close() 544 // Add servers[1] and [2] to the service discovery. 545 u := &naming.Update{ 546 Op: naming.Add, 547 Addr: "localhost:" + servers[1].port, 548 } 549 r.w.inject([]*naming.Update{u}) 550 551 u = &naming.Update{ 552 Op: naming.Add, 553 Addr: "localhost:" + servers[2].port, 554 } 555 r.w.inject([]*naming.Update{u}) 556 557 // Loop until all 3 servers are up 558 checkServerUp(t, servers[0]) 559 checkServerUp(t, servers[1]) 560 checkServerUp(t, servers[2]) 561 562 // Check the incoming RPCs served in server[0] 563 req := "port" 564 var reply string 565 for i := 0; i < 20; i++ { 566 if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[0].port { 567 t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 0, err, servers[0].port) 568 } 569 time.Sleep(10 * time.Millisecond) 570 } 571 572 // Delete server[0] in the balancer, the incoming RPCs served in server[1] 573 // For test addrconn, close server[0] instead 574 u = &naming.Update{ 575 Op: naming.Delete, 576 Addr: "localhost:" + servers[0].port, 577 } 578 r.w.inject([]*naming.Update{u}) 579 // Loop until it changes to server[1] 580 for { 581 if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[1].port { 582 break 583 } 584 time.Sleep(10 * time.Millisecond) 585 } 586 for i := 0; i < 20; i++ { 587 if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[1].port { 588 t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port) 589 } 590 time.Sleep(10 * time.Millisecond) 591 } 592 593 // Add server[0] back to the balancer, the incoming RPCs served in server[1] 594 // Add is append operation, the order of Notify now is {server[1].port server[2].port server[0].port} 595 u = &naming.Update{ 596 Op: naming.Add, 597 Addr: "localhost:" + servers[0].port, 598 } 599 r.w.inject([]*naming.Update{u}) 600 for i := 0; i < 20; i++ { 601 if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[1].port { 602 t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port) 603 } 604 time.Sleep(10 * time.Millisecond) 605 } 606 607 // Delete server[1] in the balancer, the incoming RPCs served in server[2] 608 u = &naming.Update{ 609 Op: naming.Delete, 610 Addr: "localhost:" + servers[1].port, 611 } 612 r.w.inject([]*naming.Update{u}) 613 for { 614 if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[2].port { 615 break 616 } 617 time.Sleep(1 * time.Second) 618 } 619 for i := 0; i < 20; i++ { 620 if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[2].port { 621 t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 2, err, servers[2].port) 622 } 623 time.Sleep(10 * time.Millisecond) 624 } 625 626 // Delete server[2] in the balancer, the incoming RPCs served in server[0] 627 u = &naming.Update{ 628 Op: naming.Delete, 629 Addr: "localhost:" + servers[2].port, 630 } 631 r.w.inject([]*naming.Update{u}) 632 for { 633 if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[0].port { 634 break 635 } 636 time.Sleep(1 * time.Second) 637 } 638 for i := 0; i < 20; i++ { 639 if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[0].port { 640 t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 0, err, servers[0].port) 641 } 642 time.Sleep(10 * time.Millisecond) 643 } 644} 645 646func (s) TestPickFirstOrderOneServerDown(t *testing.T) { 647 // Start 3 servers on 3 ports. 648 numServers := 3 649 servers, r, cleanup := startServers(t, numServers, math.MaxUint32) 650 defer cleanup() 651 cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(pickFirstBalancerV1(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{})) 652 if err != nil { 653 t.Fatalf("Failed to create ClientConn: %v", err) 654 } 655 defer cc.Close() 656 // Add servers[1] and [2] to the service discovery. 657 u := &naming.Update{ 658 Op: naming.Add, 659 Addr: "localhost:" + servers[1].port, 660 } 661 r.w.inject([]*naming.Update{u}) 662 663 u = &naming.Update{ 664 Op: naming.Add, 665 Addr: "localhost:" + servers[2].port, 666 } 667 r.w.inject([]*naming.Update{u}) 668 669 // Loop until all 3 servers are up 670 checkServerUp(t, servers[0]) 671 checkServerUp(t, servers[1]) 672 checkServerUp(t, servers[2]) 673 674 // Check the incoming RPCs served in server[0] 675 req := "port" 676 var reply string 677 for i := 0; i < 20; i++ { 678 if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[0].port { 679 t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 0, err, servers[0].port) 680 } 681 time.Sleep(10 * time.Millisecond) 682 } 683 684 // server[0] down, incoming RPCs served in server[1], but the order of Notify still remains 685 // {server[0] server[1] server[2]} 686 servers[0].stop() 687 // Loop until it changes to server[1] 688 for { 689 if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[1].port { 690 break 691 } 692 time.Sleep(10 * time.Millisecond) 693 } 694 for i := 0; i < 20; i++ { 695 if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[1].port { 696 t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port) 697 } 698 time.Sleep(10 * time.Millisecond) 699 } 700 701 // up the server[0] back, the incoming RPCs served in server[1] 702 p, _ := strconv.Atoi(servers[0].port) 703 servers[0] = newTestServer() 704 go servers[0].start(t, p, math.MaxUint32) 705 defer servers[0].stop() 706 servers[0].wait(t, 2*time.Second) 707 checkServerUp(t, servers[0]) 708 709 for i := 0; i < 20; i++ { 710 if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[1].port { 711 t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port) 712 } 713 time.Sleep(10 * time.Millisecond) 714 } 715 716 // Delete server[1] in the balancer, the incoming RPCs served in server[0] 717 u = &naming.Update{ 718 Op: naming.Delete, 719 Addr: "localhost:" + servers[1].port, 720 } 721 r.w.inject([]*naming.Update{u}) 722 for { 723 if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[0].port { 724 break 725 } 726 time.Sleep(1 * time.Second) 727 } 728 for i := 0; i < 20; i++ { 729 if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[0].port { 730 t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 0, err, servers[0].port) 731 } 732 time.Sleep(10 * time.Millisecond) 733 } 734} 735 736func (s) TestPickFirstOneAddressRemoval(t *testing.T) { 737 // Start 2 servers. 738 numServers := 2 739 servers, r, cleanup := startServers(t, numServers, math.MaxUint32) 740 defer cleanup() 741 cc, err := Dial("passthrough:///localhost:"+servers[0].port, WithBalancer(pickFirstBalancerV1(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{})) 742 if err != nil { 743 t.Fatalf("Failed to create ClientConn: %v", err) 744 } 745 defer cc.Close() 746 // Add servers[1] to the service discovery. 747 var updates []*naming.Update 748 updates = append(updates, &naming.Update{ 749 Op: naming.Add, 750 Addr: "localhost:" + servers[1].port, 751 }) 752 r.w.inject(updates) 753 754 // Create a new cc to Loop until servers[1] is up 755 checkServerUp(t, servers[0]) 756 checkServerUp(t, servers[1]) 757 758 var wg sync.WaitGroup 759 numRPC := 100 760 sleepDuration := 10 * time.Millisecond 761 wg.Add(1) 762 go func() { 763 time.Sleep(sleepDuration) 764 // After sleepDuration, delete server[0]. 765 var updates []*naming.Update 766 updates = append(updates, &naming.Update{ 767 Op: naming.Delete, 768 Addr: "localhost:" + servers[0].port, 769 }) 770 r.w.inject(updates) 771 wg.Done() 772 }() 773 774 // All non-failfast RPCs should not fail because there's at least one connection available. 775 for i := 0; i < numRPC; i++ { 776 wg.Add(1) 777 go func() { 778 var reply string 779 time.Sleep(sleepDuration) 780 // After sleepDuration, invoke RPC. 781 // server[0] is removed around the same time to make it racy between balancer and gRPC internals. 782 if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); err != nil { 783 t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want nil", err) 784 } 785 wg.Done() 786 }() 787 } 788 wg.Wait() 789} 790