1/* 2 * 3 * Copyright 2016 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package grpc 20 21import ( 22 "fmt" 23 "math" 24 "strconv" 25 "sync" 26 "testing" 27 "time" 28 29 "golang.org/x/net/context" 30 "google.golang.org/grpc/codes" 31 "google.golang.org/grpc/naming" 32) 33 34type testWatcher struct { 35 // the channel to receives name resolution updates 36 update chan *naming.Update 37 // the side channel to get to know how many updates in a batch 38 side chan int 39 // the channel to notifiy update injector that the update reading is done 40 readDone chan int 41} 42 43func (w *testWatcher) Next() (updates []*naming.Update, err error) { 44 n := <-w.side 45 if n == 0 { 46 return nil, fmt.Errorf("w.side is closed") 47 } 48 for i := 0; i < n; i++ { 49 u := <-w.update 50 if u != nil { 51 updates = append(updates, u) 52 } 53 } 54 w.readDone <- 0 55 return 56} 57 58func (w *testWatcher) Close() { 59} 60 61// Inject naming resolution updates to the testWatcher. 62func (w *testWatcher) inject(updates []*naming.Update) { 63 w.side <- len(updates) 64 for _, u := range updates { 65 w.update <- u 66 } 67 <-w.readDone 68} 69 70type testNameResolver struct { 71 w *testWatcher 72 addr string 73} 74 75func (r *testNameResolver) Resolve(target string) (naming.Watcher, error) { 76 r.w = &testWatcher{ 77 update: make(chan *naming.Update, 1), 78 side: make(chan int, 1), 79 readDone: make(chan int), 80 } 81 r.w.side <- 1 82 r.w.update <- &naming.Update{ 83 Op: naming.Add, 84 Addr: r.addr, 85 } 86 go func() { 87 <-r.w.readDone 88 }() 89 return r.w, nil 90} 91 92func startServers(t *testing.T, numServers int, maxStreams uint32) ([]*server, *testNameResolver) { 93 var servers []*server 94 for i := 0; i < numServers; i++ { 95 s := newTestServer() 96 servers = append(servers, s) 97 go s.start(t, 0, maxStreams) 98 s.wait(t, 2*time.Second) 99 } 100 // Point to server[0] 101 addr := "localhost:" + servers[0].port 102 return servers, &testNameResolver{ 103 addr: addr, 104 } 105} 106 107func TestNameDiscovery(t *testing.T) { 108 // Start 2 servers on 2 ports. 109 numServers := 2 110 servers, r := startServers(t, numServers, math.MaxUint32) 111 cc, err := Dial("foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{})) 112 if err != nil { 113 t.Fatalf("Failed to create ClientConn: %v", err) 114 } 115 req := "port" 116 var reply string 117 if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[0].port { 118 t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want %s", err, servers[0].port) 119 } 120 // Inject the name resolution change to remove servers[0] and add servers[1]. 121 var updates []*naming.Update 122 updates = append(updates, &naming.Update{ 123 Op: naming.Delete, 124 Addr: "localhost:" + servers[0].port, 125 }) 126 updates = append(updates, &naming.Update{ 127 Op: naming.Add, 128 Addr: "localhost:" + servers[1].port, 129 }) 130 r.w.inject(updates) 131 // Loop until the rpcs in flight talks to servers[1]. 132 for { 133 if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[1].port { 134 break 135 } 136 time.Sleep(10 * time.Millisecond) 137 } 138 cc.Close() 139 for i := 0; i < numServers; i++ { 140 servers[i].stop() 141 } 142} 143 144func TestEmptyAddrs(t *testing.T) { 145 servers, r := startServers(t, 1, math.MaxUint32) 146 cc, err := Dial("foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{})) 147 if err != nil { 148 t.Fatalf("Failed to create ClientConn: %v", err) 149 } 150 var reply string 151 if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc); err != nil || reply != expectedResponse { 152 t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, reply = %q, want %q, <nil>", err, reply, expectedResponse) 153 } 154 // Inject name resolution change to remove the server so that there is no address 155 // available after that. 156 u := &naming.Update{ 157 Op: naming.Delete, 158 Addr: "localhost:" + servers[0].port, 159 } 160 r.w.inject([]*naming.Update{u}) 161 // Loop until the above updates apply. 162 for { 163 time.Sleep(10 * time.Millisecond) 164 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) 165 if err := Invoke(ctx, "/foo/bar", &expectedRequest, &reply, cc); err != nil { 166 cancel() 167 break 168 } 169 cancel() 170 } 171 cc.Close() 172 servers[0].stop() 173} 174 175func TestRoundRobin(t *testing.T) { 176 // Start 3 servers on 3 ports. 177 numServers := 3 178 servers, r := startServers(t, numServers, math.MaxUint32) 179 cc, err := Dial("foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{})) 180 if err != nil { 181 t.Fatalf("Failed to create ClientConn: %v", err) 182 } 183 // Add servers[1] to the service discovery. 184 u := &naming.Update{ 185 Op: naming.Add, 186 Addr: "localhost:" + servers[1].port, 187 } 188 r.w.inject([]*naming.Update{u}) 189 req := "port" 190 var reply string 191 // Loop until servers[1] is up 192 for { 193 if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[1].port { 194 break 195 } 196 time.Sleep(10 * time.Millisecond) 197 } 198 // Add server2[2] to the service discovery. 199 u = &naming.Update{ 200 Op: naming.Add, 201 Addr: "localhost:" + servers[2].port, 202 } 203 r.w.inject([]*naming.Update{u}) 204 // Loop until both servers[2] are up. 205 for { 206 if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[2].port { 207 break 208 } 209 time.Sleep(10 * time.Millisecond) 210 } 211 // Check the incoming RPCs served in a round-robin manner. 212 for i := 0; i < 10; i++ { 213 if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[i%numServers].port { 214 t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", i, err, servers[i%numServers].port) 215 } 216 } 217 cc.Close() 218 for i := 0; i < numServers; i++ { 219 servers[i].stop() 220 } 221} 222 223func TestCloseWithPendingRPC(t *testing.T) { 224 servers, r := startServers(t, 1, math.MaxUint32) 225 cc, err := Dial("foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{})) 226 if err != nil { 227 t.Fatalf("Failed to create ClientConn: %v", err) 228 } 229 var reply string 230 if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc, FailFast(false)); err != nil { 231 t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want %s", err, servers[0].port) 232 } 233 // Remove the server. 234 updates := []*naming.Update{{ 235 Op: naming.Delete, 236 Addr: "localhost:" + servers[0].port, 237 }} 238 r.w.inject(updates) 239 // Loop until the above update applies. 240 for { 241 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) 242 if err := Invoke(ctx, "/foo/bar", &expectedRequest, &reply, cc, FailFast(false)); Code(err) == codes.DeadlineExceeded { 243 cancel() 244 break 245 } 246 time.Sleep(10 * time.Millisecond) 247 cancel() 248 } 249 // Issue 2 RPCs which should be completed with error status once cc is closed. 250 var wg sync.WaitGroup 251 wg.Add(2) 252 go func() { 253 defer wg.Done() 254 var reply string 255 if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc, FailFast(false)); err == nil { 256 t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err) 257 } 258 }() 259 go func() { 260 defer wg.Done() 261 var reply string 262 time.Sleep(5 * time.Millisecond) 263 if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc, FailFast(false)); err == nil { 264 t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err) 265 } 266 }() 267 time.Sleep(5 * time.Millisecond) 268 cc.Close() 269 wg.Wait() 270 servers[0].stop() 271} 272 273func TestGetOnWaitChannel(t *testing.T) { 274 servers, r := startServers(t, 1, math.MaxUint32) 275 cc, err := Dial("foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{})) 276 if err != nil { 277 t.Fatalf("Failed to create ClientConn: %v", err) 278 } 279 // Remove all servers so that all upcoming RPCs will block on waitCh. 280 updates := []*naming.Update{{ 281 Op: naming.Delete, 282 Addr: "localhost:" + servers[0].port, 283 }} 284 r.w.inject(updates) 285 for { 286 var reply string 287 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) 288 if err := Invoke(ctx, "/foo/bar", &expectedRequest, &reply, cc, FailFast(false)); Code(err) == codes.DeadlineExceeded { 289 cancel() 290 break 291 } 292 cancel() 293 time.Sleep(10 * time.Millisecond) 294 } 295 var wg sync.WaitGroup 296 wg.Add(1) 297 go func() { 298 defer wg.Done() 299 var reply string 300 if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc, FailFast(false)); err != nil { 301 t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want <nil>", err) 302 } 303 }() 304 // Add a connected server to get the above RPC through. 305 updates = []*naming.Update{{ 306 Op: naming.Add, 307 Addr: "localhost:" + servers[0].port, 308 }} 309 r.w.inject(updates) 310 // Wait until the above RPC succeeds. 311 wg.Wait() 312 cc.Close() 313 servers[0].stop() 314} 315 316func TestOneServerDown(t *testing.T) { 317 // Start 2 servers. 318 numServers := 2 319 servers, r := startServers(t, numServers, math.MaxUint32) 320 cc, err := Dial("foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{})) 321 if err != nil { 322 t.Fatalf("Failed to create ClientConn: %v", err) 323 } 324 // Add servers[1] to the service discovery. 325 var updates []*naming.Update 326 updates = append(updates, &naming.Update{ 327 Op: naming.Add, 328 Addr: "localhost:" + servers[1].port, 329 }) 330 r.w.inject(updates) 331 req := "port" 332 var reply string 333 // Loop until servers[1] is up 334 for { 335 if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[1].port { 336 break 337 } 338 time.Sleep(10 * time.Millisecond) 339 } 340 341 var wg sync.WaitGroup 342 numRPC := 100 343 sleepDuration := 10 * time.Millisecond 344 wg.Add(1) 345 go func() { 346 time.Sleep(sleepDuration) 347 // After sleepDuration, kill server[0]. 348 servers[0].stop() 349 wg.Done() 350 }() 351 352 // All non-failfast RPCs should not block because there's at least one connection available. 353 for i := 0; i < numRPC; i++ { 354 wg.Add(1) 355 go func() { 356 time.Sleep(sleepDuration) 357 // After sleepDuration, invoke RPC. 358 // server[0] is killed around the same time to make it racy between balancer and gRPC internals. 359 Invoke(context.Background(), "/foo/bar", &req, &reply, cc, FailFast(false)) 360 wg.Done() 361 }() 362 } 363 wg.Wait() 364 cc.Close() 365 for i := 0; i < numServers; i++ { 366 servers[i].stop() 367 } 368} 369 370func TestOneAddressRemoval(t *testing.T) { 371 // Start 2 servers. 372 numServers := 2 373 servers, r := startServers(t, numServers, math.MaxUint32) 374 cc, err := Dial("foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{})) 375 if err != nil { 376 t.Fatalf("Failed to create ClientConn: %v", err) 377 } 378 // Add servers[1] to the service discovery. 379 var updates []*naming.Update 380 updates = append(updates, &naming.Update{ 381 Op: naming.Add, 382 Addr: "localhost:" + servers[1].port, 383 }) 384 r.w.inject(updates) 385 req := "port" 386 var reply string 387 // Loop until servers[1] is up 388 for { 389 if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[1].port { 390 break 391 } 392 time.Sleep(10 * time.Millisecond) 393 } 394 395 var wg sync.WaitGroup 396 numRPC := 100 397 sleepDuration := 10 * time.Millisecond 398 wg.Add(1) 399 go func() { 400 time.Sleep(sleepDuration) 401 // After sleepDuration, delete server[0]. 402 var updates []*naming.Update 403 updates = append(updates, &naming.Update{ 404 Op: naming.Delete, 405 Addr: "localhost:" + servers[0].port, 406 }) 407 r.w.inject(updates) 408 wg.Done() 409 }() 410 411 // All non-failfast RPCs should not fail because there's at least one connection available. 412 for i := 0; i < numRPC; i++ { 413 wg.Add(1) 414 go func() { 415 var reply string 416 time.Sleep(sleepDuration) 417 // After sleepDuration, invoke RPC. 418 // server[0] is removed around the same time to make it racy between balancer and gRPC internals. 419 if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc, FailFast(false)); err != nil { 420 t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err) 421 } 422 wg.Done() 423 }() 424 } 425 wg.Wait() 426 cc.Close() 427 for i := 0; i < numServers; i++ { 428 servers[i].stop() 429 } 430} 431 432func checkServerUp(t *testing.T, currentServer *server) { 433 req := "port" 434 port := currentServer.port 435 cc, err := Dial("localhost:"+port, WithBlock(), WithInsecure(), WithCodec(testCodec{})) 436 if err != nil { 437 t.Fatalf("Failed to create ClientConn: %v", err) 438 } 439 var reply string 440 for { 441 if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == port { 442 break 443 } 444 time.Sleep(10 * time.Millisecond) 445 } 446 cc.Close() 447} 448 449func TestPickFirstEmptyAddrs(t *testing.T) { 450 servers, r := startServers(t, 1, math.MaxUint32) 451 defer servers[0].stop() 452 cc, err := Dial("foo.bar.com", WithBalancer(pickFirstBalancer(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{})) 453 if err != nil { 454 t.Fatalf("Failed to create ClientConn: %v", err) 455 } 456 defer cc.Close() 457 var reply string 458 if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc); err != nil || reply != expectedResponse { 459 t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, reply = %q, want %q, <nil>", err, reply, expectedResponse) 460 } 461 // Inject name resolution change to remove the server so that there is no address 462 // available after that. 463 u := &naming.Update{ 464 Op: naming.Delete, 465 Addr: "localhost:" + servers[0].port, 466 } 467 r.w.inject([]*naming.Update{u}) 468 // Loop until the above updates apply. 469 for { 470 time.Sleep(10 * time.Millisecond) 471 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) 472 if err := Invoke(ctx, "/foo/bar", &expectedRequest, &reply, cc); err != nil { 473 cancel() 474 break 475 } 476 cancel() 477 } 478} 479 480func TestPickFirstCloseWithPendingRPC(t *testing.T) { 481 servers, r := startServers(t, 1, math.MaxUint32) 482 defer servers[0].stop() 483 cc, err := Dial("foo.bar.com", WithBalancer(pickFirstBalancer(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{})) 484 if err != nil { 485 t.Fatalf("Failed to create ClientConn: %v", err) 486 } 487 var reply string 488 if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc, FailFast(false)); err != nil { 489 t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want %s", err, servers[0].port) 490 } 491 // Remove the server. 492 updates := []*naming.Update{{ 493 Op: naming.Delete, 494 Addr: "localhost:" + servers[0].port, 495 }} 496 r.w.inject(updates) 497 // Loop until the above update applies. 498 for { 499 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) 500 if err := Invoke(ctx, "/foo/bar", &expectedRequest, &reply, cc, FailFast(false)); Code(err) == codes.DeadlineExceeded { 501 cancel() 502 break 503 } 504 time.Sleep(10 * time.Millisecond) 505 cancel() 506 } 507 // Issue 2 RPCs which should be completed with error status once cc is closed. 508 var wg sync.WaitGroup 509 wg.Add(2) 510 go func() { 511 defer wg.Done() 512 var reply string 513 if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc, FailFast(false)); err == nil { 514 t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err) 515 } 516 }() 517 go func() { 518 defer wg.Done() 519 var reply string 520 time.Sleep(5 * time.Millisecond) 521 if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc, FailFast(false)); err == nil { 522 t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err) 523 } 524 }() 525 time.Sleep(5 * time.Millisecond) 526 cc.Close() 527 wg.Wait() 528} 529 530func TestPickFirstOrderAllServerUp(t *testing.T) { 531 // Start 3 servers on 3 ports. 532 numServers := 3 533 servers, r := startServers(t, numServers, math.MaxUint32) 534 for i := 0; i < numServers; i++ { 535 defer servers[i].stop() 536 } 537 cc, err := Dial("foo.bar.com", WithBalancer(pickFirstBalancer(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{})) 538 if err != nil { 539 t.Fatalf("Failed to create ClientConn: %v", err) 540 } 541 defer cc.Close() 542 // Add servers[1] and [2] to the service discovery. 543 u := &naming.Update{ 544 Op: naming.Add, 545 Addr: "localhost:" + servers[1].port, 546 } 547 r.w.inject([]*naming.Update{u}) 548 549 u = &naming.Update{ 550 Op: naming.Add, 551 Addr: "localhost:" + servers[2].port, 552 } 553 r.w.inject([]*naming.Update{u}) 554 555 // Loop until all 3 servers are up 556 checkServerUp(t, servers[0]) 557 checkServerUp(t, servers[1]) 558 checkServerUp(t, servers[2]) 559 560 // Check the incoming RPCs served in server[0] 561 req := "port" 562 var reply string 563 for i := 0; i < 20; i++ { 564 if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[0].port { 565 t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 0, err, servers[0].port) 566 } 567 time.Sleep(10 * time.Millisecond) 568 } 569 570 // Delete server[0] in the balancer, the incoming RPCs served in server[1] 571 // For test addrconn, close server[0] instead 572 u = &naming.Update{ 573 Op: naming.Delete, 574 Addr: "localhost:" + servers[0].port, 575 } 576 r.w.inject([]*naming.Update{u}) 577 // Loop until it changes to server[1] 578 for { 579 if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[1].port { 580 break 581 } 582 time.Sleep(10 * time.Millisecond) 583 } 584 for i := 0; i < 20; i++ { 585 if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[1].port { 586 t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port) 587 } 588 time.Sleep(10 * time.Millisecond) 589 } 590 591 // Add server[0] back to the balancer, the incoming RPCs served in server[1] 592 // Add is append operation, the order of Notify now is {server[1].port server[2].port server[0].port} 593 u = &naming.Update{ 594 Op: naming.Add, 595 Addr: "localhost:" + servers[0].port, 596 } 597 r.w.inject([]*naming.Update{u}) 598 for i := 0; i < 20; i++ { 599 if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[1].port { 600 t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port) 601 } 602 time.Sleep(10 * time.Millisecond) 603 } 604 605 // Delete server[1] in the balancer, the incoming RPCs served in server[2] 606 u = &naming.Update{ 607 Op: naming.Delete, 608 Addr: "localhost:" + servers[1].port, 609 } 610 r.w.inject([]*naming.Update{u}) 611 for { 612 if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[2].port { 613 break 614 } 615 time.Sleep(1 * time.Second) 616 } 617 for i := 0; i < 20; i++ { 618 if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[2].port { 619 t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 2, err, servers[2].port) 620 } 621 time.Sleep(10 * time.Millisecond) 622 } 623 624 // Delete server[2] in the balancer, the incoming RPCs served in server[0] 625 u = &naming.Update{ 626 Op: naming.Delete, 627 Addr: "localhost:" + servers[2].port, 628 } 629 r.w.inject([]*naming.Update{u}) 630 for { 631 if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[0].port { 632 break 633 } 634 time.Sleep(1 * time.Second) 635 } 636 for i := 0; i < 20; i++ { 637 if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[0].port { 638 t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 2, err, servers[2].port) 639 } 640 time.Sleep(10 * time.Millisecond) 641 } 642} 643 644func TestPickFirstOrderOneServerDown(t *testing.T) { 645 // Start 3 servers on 3 ports. 646 numServers := 3 647 servers, r := startServers(t, numServers, math.MaxUint32) 648 for i := 0; i < numServers; i++ { 649 defer servers[i].stop() 650 } 651 cc, err := Dial("foo.bar.com", WithBalancer(pickFirstBalancer(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{})) 652 if err != nil { 653 t.Fatalf("Failed to create ClientConn: %v", err) 654 } 655 defer cc.Close() 656 // Add servers[1] and [2] to the service discovery. 657 u := &naming.Update{ 658 Op: naming.Add, 659 Addr: "localhost:" + servers[1].port, 660 } 661 r.w.inject([]*naming.Update{u}) 662 663 u = &naming.Update{ 664 Op: naming.Add, 665 Addr: "localhost:" + servers[2].port, 666 } 667 r.w.inject([]*naming.Update{u}) 668 669 // Loop until all 3 servers are up 670 checkServerUp(t, servers[0]) 671 checkServerUp(t, servers[1]) 672 checkServerUp(t, servers[2]) 673 674 // Check the incoming RPCs served in server[0] 675 req := "port" 676 var reply string 677 for i := 0; i < 20; i++ { 678 if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[0].port { 679 t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 0, err, servers[0].port) 680 } 681 time.Sleep(10 * time.Millisecond) 682 } 683 684 // server[0] down, incoming RPCs served in server[1], but the order of Notify still remains 685 // {server[0] server[1] server[2]} 686 servers[0].stop() 687 // Loop until it changes to server[1] 688 for { 689 if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[1].port { 690 break 691 } 692 time.Sleep(10 * time.Millisecond) 693 } 694 for i := 0; i < 20; i++ { 695 if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[1].port { 696 t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port) 697 } 698 time.Sleep(10 * time.Millisecond) 699 } 700 701 // up the server[0] back, the incoming RPCs served in server[1] 702 p, _ := strconv.Atoi(servers[0].port) 703 servers[0] = newTestServer() 704 go servers[0].start(t, p, math.MaxUint32) 705 servers[0].wait(t, 2*time.Second) 706 checkServerUp(t, servers[0]) 707 708 for i := 0; i < 20; i++ { 709 if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[1].port { 710 t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port) 711 } 712 time.Sleep(10 * time.Millisecond) 713 } 714 715 // Delete server[1] in the balancer, the incoming RPCs served in server[0] 716 u = &naming.Update{ 717 Op: naming.Delete, 718 Addr: "localhost:" + servers[1].port, 719 } 720 r.w.inject([]*naming.Update{u}) 721 for { 722 if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[0].port { 723 break 724 } 725 time.Sleep(1 * time.Second) 726 } 727 for i := 0; i < 20; i++ { 728 if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[0].port { 729 t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 0, err, servers[0].port) 730 } 731 time.Sleep(10 * time.Millisecond) 732 } 733} 734 735func TestPickFirstOneAddressRemoval(t *testing.T) { 736 // Start 2 servers. 737 numServers := 2 738 servers, r := startServers(t, numServers, math.MaxUint32) 739 for i := 0; i < numServers; i++ { 740 defer servers[i].stop() 741 } 742 cc, err := Dial("localhost:"+servers[0].port, WithBalancer(pickFirstBalancer(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{})) 743 if err != nil { 744 t.Fatalf("Failed to create ClientConn: %v", err) 745 } 746 defer cc.Close() 747 // Add servers[1] to the service discovery. 748 var updates []*naming.Update 749 updates = append(updates, &naming.Update{ 750 Op: naming.Add, 751 Addr: "localhost:" + servers[1].port, 752 }) 753 r.w.inject(updates) 754 755 // Create a new cc to Loop until servers[1] is up 756 checkServerUp(t, servers[0]) 757 checkServerUp(t, servers[1]) 758 759 var wg sync.WaitGroup 760 numRPC := 100 761 sleepDuration := 10 * time.Millisecond 762 wg.Add(1) 763 go func() { 764 time.Sleep(sleepDuration) 765 // After sleepDuration, delete server[0]. 766 var updates []*naming.Update 767 updates = append(updates, &naming.Update{ 768 Op: naming.Delete, 769 Addr: "localhost:" + servers[0].port, 770 }) 771 r.w.inject(updates) 772 wg.Done() 773 }() 774 775 // All non-failfast RPCs should not fail because there's at least one connection available. 776 for i := 0; i < numRPC; i++ { 777 wg.Add(1) 778 go func() { 779 var reply string 780 time.Sleep(sleepDuration) 781 // After sleepDuration, invoke RPC. 782 // server[0] is removed around the same time to make it racy between balancer and gRPC internals. 783 if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc, FailFast(false)); err != nil { 784 t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err) 785 } 786 wg.Done() 787 }() 788 } 789 wg.Wait() 790} 791