1/* 2 * 3 * Copyright 2017 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package roundrobin_test 20 21import ( 22 "context" 23 "fmt" 24 "net" 25 "strings" 26 "sync" 27 "testing" 28 "time" 29 30 "google.golang.org/grpc" 31 "google.golang.org/grpc/balancer/roundrobin" 32 "google.golang.org/grpc/codes" 33 "google.golang.org/grpc/connectivity" 34 "google.golang.org/grpc/internal/grpctest" 35 "google.golang.org/grpc/peer" 36 "google.golang.org/grpc/resolver" 37 "google.golang.org/grpc/resolver/manual" 38 "google.golang.org/grpc/status" 39 testpb "google.golang.org/grpc/test/grpc_testing" 40) 41 42type s struct { 43 grpctest.Tester 44} 45 46func Test(t *testing.T) { 47 grpctest.RunSubTests(t, s{}) 48} 49 50type testServer struct { 51 testpb.UnimplementedTestServiceServer 52} 53 54func (s *testServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { 55 return &testpb.Empty{}, nil 56} 57 58func (s *testServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServer) error { 59 return nil 60} 61 62type test struct { 63 servers []*grpc.Server 64 addresses []string 65} 66 67func (t *test) cleanup() { 68 for _, s := range t.servers { 69 s.Stop() 70 } 71} 72 73func startTestServers(count int) (_ *test, err error) { 74 t := &test{} 75 76 defer func() { 77 if err != nil { 78 t.cleanup() 79 } 80 }() 81 for i := 0; i < count; i++ { 82 lis, err := net.Listen("tcp", "localhost:0") 83 if err != nil { 84 return nil, fmt.Errorf("failed to listen %v", err) 85 } 86 87 s := grpc.NewServer() 88 testpb.RegisterTestServiceServer(s, &testServer{}) 89 t.servers = append(t.servers, s) 90 t.addresses = append(t.addresses, lis.Addr().String()) 91 92 go func(s *grpc.Server, l net.Listener) { 93 s.Serve(l) 94 }(s, lis) 95 } 96 97 return t, nil 98} 99 100func (s) TestOneBackend(t *testing.T) { 101 r, cleanup := manual.GenerateAndRegisterManualResolver() 102 defer cleanup() 103 104 test, err := startTestServers(1) 105 if err != nil { 106 t.Fatalf("failed to start servers: %v", err) 107 } 108 defer test.cleanup() 109 110 cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name)) 111 if err != nil { 112 t.Fatalf("failed to dial: %v", err) 113 } 114 defer cc.Close() 115 testc := testpb.NewTestServiceClient(cc) 116 // The first RPC should fail because there's no address. 117 ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) 118 defer cancel() 119 if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || status.Code(err) != codes.DeadlineExceeded { 120 t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err) 121 } 122 123 r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: test.addresses[0]}}}) 124 // The second RPC should succeed. 125 if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil { 126 t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err) 127 } 128} 129 130func (s) TestBackendsRoundRobin(t *testing.T) { 131 r, cleanup := manual.GenerateAndRegisterManualResolver() 132 defer cleanup() 133 134 backendCount := 5 135 test, err := startTestServers(backendCount) 136 if err != nil { 137 t.Fatalf("failed to start servers: %v", err) 138 } 139 defer test.cleanup() 140 141 cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name)) 142 if err != nil { 143 t.Fatalf("failed to dial: %v", err) 144 } 145 defer cc.Close() 146 testc := testpb.NewTestServiceClient(cc) 147 // The first RPC should fail because there's no address. 148 ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) 149 defer cancel() 150 if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || status.Code(err) != codes.DeadlineExceeded { 151 t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err) 152 } 153 154 var resolvedAddrs []resolver.Address 155 for i := 0; i < backendCount; i++ { 156 resolvedAddrs = append(resolvedAddrs, resolver.Address{Addr: test.addresses[i]}) 157 } 158 159 r.UpdateState(resolver.State{Addresses: resolvedAddrs}) 160 var p peer.Peer 161 // Make sure connections to all servers are up. 162 for si := 0; si < backendCount; si++ { 163 var connected bool 164 for i := 0; i < 1000; i++ { 165 if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil { 166 t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err) 167 } 168 if p.Addr.String() == test.addresses[si] { 169 connected = true 170 break 171 } 172 time.Sleep(time.Millisecond) 173 } 174 if !connected { 175 t.Fatalf("Connection to %v was not up after more than 1 second", test.addresses[si]) 176 } 177 } 178 179 for i := 0; i < 3*backendCount; i++ { 180 if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil { 181 t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err) 182 } 183 if p.Addr.String() != test.addresses[i%backendCount] { 184 t.Fatalf("Index %d: want peer %v, got peer %v", i, test.addresses[i%backendCount], p.Addr.String()) 185 } 186 } 187} 188 189func (s) TestAddressesRemoved(t *testing.T) { 190 r, cleanup := manual.GenerateAndRegisterManualResolver() 191 defer cleanup() 192 193 test, err := startTestServers(1) 194 if err != nil { 195 t.Fatalf("failed to start servers: %v", err) 196 } 197 defer test.cleanup() 198 199 cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name)) 200 if err != nil { 201 t.Fatalf("failed to dial: %v", err) 202 } 203 defer cc.Close() 204 testc := testpb.NewTestServiceClient(cc) 205 // The first RPC should fail because there's no address. 206 ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) 207 defer cancel() 208 if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || status.Code(err) != codes.DeadlineExceeded { 209 t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err) 210 } 211 212 r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: test.addresses[0]}}}) 213 // The second RPC should succeed. 214 if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil { 215 t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err) 216 } 217 218 r.UpdateState(resolver.State{Addresses: []resolver.Address{}}) 219 // Removing addresses results in an error reported to the clientconn, but 220 // the existing connections remain. RPCs should still succeed. 221 ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second) 222 defer cancel() 223 if _, err := testc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { 224 t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err) 225 } 226 227 // Stop the server to bring the channel state into transient failure. 228 test.cleanup() 229 // Wait for not-ready. 230 for src := cc.GetState(); src == connectivity.Ready; src = cc.GetState() { 231 if !cc.WaitForStateChange(ctx, src) { 232 t.Fatalf("timed out waiting for state change. got %v; want !%v", src, connectivity.Ready) 233 } 234 } 235 // Report an empty server list again; because the state is not ready, the 236 // empty address list error should surface to the user. 237 r.UpdateState(resolver.State{Addresses: []resolver.Address{}}) 238 239 const msgWant = "produced zero addresses" 240 if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || !strings.Contains(status.Convert(err).Message(), msgWant) { 241 t.Fatalf("EmptyCall() = _, %v, want _, Contains(Message(), %q)", err, msgWant) 242 } 243 244} 245 246func (s) TestCloseWithPendingRPC(t *testing.T) { 247 r, cleanup := manual.GenerateAndRegisterManualResolver() 248 defer cleanup() 249 250 test, err := startTestServers(1) 251 if err != nil { 252 t.Fatalf("failed to start servers: %v", err) 253 } 254 defer test.cleanup() 255 256 cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name)) 257 if err != nil { 258 t.Fatalf("failed to dial: %v", err) 259 } 260 testc := testpb.NewTestServiceClient(cc) 261 262 var wg sync.WaitGroup 263 for i := 0; i < 3; i++ { 264 wg.Add(1) 265 go func() { 266 defer wg.Done() 267 // This RPC blocks until cc is closed. 268 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) 269 if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) == codes.DeadlineExceeded { 270 t.Errorf("RPC failed because of deadline after cc is closed; want error the client connection is closing") 271 } 272 cancel() 273 }() 274 } 275 cc.Close() 276 wg.Wait() 277} 278 279func (s) TestNewAddressWhileBlocking(t *testing.T) { 280 r, cleanup := manual.GenerateAndRegisterManualResolver() 281 defer cleanup() 282 283 test, err := startTestServers(1) 284 if err != nil { 285 t.Fatalf("failed to start servers: %v", err) 286 } 287 defer test.cleanup() 288 289 cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name)) 290 if err != nil { 291 t.Fatalf("failed to dial: %v", err) 292 } 293 defer cc.Close() 294 testc := testpb.NewTestServiceClient(cc) 295 // The first RPC should fail because there's no address. 296 ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) 297 defer cancel() 298 if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || status.Code(err) != codes.DeadlineExceeded { 299 t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err) 300 } 301 302 r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: test.addresses[0]}}}) 303 // The second RPC should succeed. 304 ctx, cancel = context.WithTimeout(context.Background(), 2*time.Second) 305 defer cancel() 306 if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err != nil { 307 t.Fatalf("EmptyCall() = _, %v, want _, nil", err) 308 } 309 310 r.UpdateState(resolver.State{Addresses: []resolver.Address{}}) 311 312 var wg sync.WaitGroup 313 for i := 0; i < 3; i++ { 314 wg.Add(1) 315 go func() { 316 defer wg.Done() 317 // This RPC blocks until NewAddress is called. 318 testc.EmptyCall(context.Background(), &testpb.Empty{}) 319 }() 320 } 321 time.Sleep(50 * time.Millisecond) 322 r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: test.addresses[0]}}}) 323 wg.Wait() 324} 325 326func (s) TestOneServerDown(t *testing.T) { 327 r, cleanup := manual.GenerateAndRegisterManualResolver() 328 defer cleanup() 329 330 backendCount := 3 331 test, err := startTestServers(backendCount) 332 if err != nil { 333 t.Fatalf("failed to start servers: %v", err) 334 } 335 defer test.cleanup() 336 337 cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name)) 338 if err != nil { 339 t.Fatalf("failed to dial: %v", err) 340 } 341 defer cc.Close() 342 testc := testpb.NewTestServiceClient(cc) 343 // The first RPC should fail because there's no address. 344 ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) 345 defer cancel() 346 if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || status.Code(err) != codes.DeadlineExceeded { 347 t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err) 348 } 349 350 var resolvedAddrs []resolver.Address 351 for i := 0; i < backendCount; i++ { 352 resolvedAddrs = append(resolvedAddrs, resolver.Address{Addr: test.addresses[i]}) 353 } 354 355 r.UpdateState(resolver.State{Addresses: resolvedAddrs}) 356 var p peer.Peer 357 // Make sure connections to all servers are up. 358 for si := 0; si < backendCount; si++ { 359 var connected bool 360 for i := 0; i < 1000; i++ { 361 if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil { 362 t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err) 363 } 364 if p.Addr.String() == test.addresses[si] { 365 connected = true 366 break 367 } 368 time.Sleep(time.Millisecond) 369 } 370 if !connected { 371 t.Fatalf("Connection to %v was not up after more than 1 second", test.addresses[si]) 372 } 373 } 374 375 for i := 0; i < 3*backendCount; i++ { 376 if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil { 377 t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err) 378 } 379 if p.Addr.String() != test.addresses[i%backendCount] { 380 t.Fatalf("Index %d: want peer %v, got peer %v", i, test.addresses[i%backendCount], p.Addr.String()) 381 } 382 } 383 384 // Stop one server, RPCs should roundrobin among the remaining servers. 385 backendCount-- 386 test.servers[backendCount].Stop() 387 // Loop until see server[backendCount-1] twice without seeing server[backendCount]. 388 var targetSeen int 389 for i := 0; i < 1000; i++ { 390 if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil { 391 targetSeen = 0 392 t.Logf("EmptyCall() = _, %v, want _, <nil>", err) 393 // Due to a race, this RPC could possibly get the connection that 394 // was closing, and this RPC may fail. Keep trying when this 395 // happens. 396 continue 397 } 398 switch p.Addr.String() { 399 case test.addresses[backendCount-1]: 400 targetSeen++ 401 case test.addresses[backendCount]: 402 // Reset targetSeen if peer is server[backendCount]. 403 targetSeen = 0 404 } 405 // Break to make sure the last picked address is server[-1], so the following for loop won't be flaky. 406 if targetSeen >= 2 { 407 break 408 } 409 } 410 if targetSeen != 2 { 411 t.Fatal("Failed to see server[backendCount-1] twice without seeing server[backendCount]") 412 } 413 for i := 0; i < 3*backendCount; i++ { 414 if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil { 415 t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err) 416 } 417 if p.Addr.String() != test.addresses[i%backendCount] { 418 t.Errorf("Index %d: want peer %v, got peer %v", i, test.addresses[i%backendCount], p.Addr.String()) 419 } 420 } 421} 422 423func (s) TestAllServersDown(t *testing.T) { 424 r, cleanup := manual.GenerateAndRegisterManualResolver() 425 defer cleanup() 426 427 backendCount := 3 428 test, err := startTestServers(backendCount) 429 if err != nil { 430 t.Fatalf("failed to start servers: %v", err) 431 } 432 defer test.cleanup() 433 434 cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name)) 435 if err != nil { 436 t.Fatalf("failed to dial: %v", err) 437 } 438 defer cc.Close() 439 testc := testpb.NewTestServiceClient(cc) 440 // The first RPC should fail because there's no address. 441 ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) 442 defer cancel() 443 if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || status.Code(err) != codes.DeadlineExceeded { 444 t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err) 445 } 446 447 var resolvedAddrs []resolver.Address 448 for i := 0; i < backendCount; i++ { 449 resolvedAddrs = append(resolvedAddrs, resolver.Address{Addr: test.addresses[i]}) 450 } 451 452 r.UpdateState(resolver.State{Addresses: resolvedAddrs}) 453 var p peer.Peer 454 // Make sure connections to all servers are up. 455 for si := 0; si < backendCount; si++ { 456 var connected bool 457 for i := 0; i < 1000; i++ { 458 if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil { 459 t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err) 460 } 461 if p.Addr.String() == test.addresses[si] { 462 connected = true 463 break 464 } 465 time.Sleep(time.Millisecond) 466 } 467 if !connected { 468 t.Fatalf("Connection to %v was not up after more than 1 second", test.addresses[si]) 469 } 470 } 471 472 for i := 0; i < 3*backendCount; i++ { 473 if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil { 474 t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err) 475 } 476 if p.Addr.String() != test.addresses[i%backendCount] { 477 t.Fatalf("Index %d: want peer %v, got peer %v", i, test.addresses[i%backendCount], p.Addr.String()) 478 } 479 } 480 481 // All servers are stopped, failfast RPC should fail with unavailable. 482 for i := 0; i < backendCount; i++ { 483 test.servers[i].Stop() 484 } 485 time.Sleep(100 * time.Millisecond) 486 for i := 0; i < 1000; i++ { 487 if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) == codes.Unavailable { 488 return 489 } 490 time.Sleep(time.Millisecond) 491 } 492 t.Fatalf("Failfast RPCs didn't fail with Unavailable after all servers are stopped") 493} 494