1/* 2 * 3 * Copyright 2017 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package roundrobin_test 20 21import ( 22 "context" 23 "fmt" 24 "net" 25 "sync" 26 "testing" 27 "time" 28 29 "google.golang.org/grpc" 30 "google.golang.org/grpc/balancer/roundrobin" 31 "google.golang.org/grpc/codes" 32 _ "google.golang.org/grpc/grpclog/glogger" 33 "google.golang.org/grpc/internal/leakcheck" 34 "google.golang.org/grpc/peer" 35 "google.golang.org/grpc/resolver" 36 "google.golang.org/grpc/resolver/manual" 37 "google.golang.org/grpc/status" 38 testpb "google.golang.org/grpc/test/grpc_testing" 39) 40 41type testServer struct { 42 testpb.TestServiceServer 43} 44 45func (s *testServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { 46 return &testpb.Empty{}, nil 47} 48 49func (s *testServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServer) error { 50 return nil 51} 52 53type test struct { 54 servers []*grpc.Server 55 addresses []string 56} 57 58func (t *test) cleanup() { 59 for _, s := range t.servers { 60 s.Stop() 61 } 62} 63 64func startTestServers(count int) (_ *test, err error) { 65 t := &test{} 66 67 defer func() { 68 if err != nil { 69 for _, s := range t.servers { 70 s.Stop() 71 } 72 } 73 }() 74 for i := 0; i < count; i++ { 75 lis, err := net.Listen("tcp", "localhost:0") 76 if err != nil { 77 return nil, fmt.Errorf("Failed to listen %v", err) 78 } 79 80 s := grpc.NewServer() 81 testpb.RegisterTestServiceServer(s, &testServer{}) 82 t.servers = append(t.servers, s) 83 t.addresses = append(t.addresses, lis.Addr().String()) 84 85 go func(s *grpc.Server, l net.Listener) { 86 s.Serve(l) 87 }(s, lis) 88 } 89 90 return t, nil 91} 92 93func TestOneBackend(t *testing.T) { 94 defer leakcheck.Check(t) 95 r, cleanup := manual.GenerateAndRegisterManualResolver() 96 defer cleanup() 97 98 test, err := startTestServers(1) 99 if err != nil { 100 t.Fatalf("failed to start servers: %v", err) 101 } 102 defer test.cleanup() 103 104 cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name)) 105 if err != nil { 106 t.Fatalf("failed to dial: %v", err) 107 } 108 defer cc.Close() 109 testc := testpb.NewTestServiceClient(cc) 110 // The first RPC should fail because there's no address. 111 ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) 112 defer cancel() 113 if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || status.Code(err) != codes.DeadlineExceeded { 114 t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err) 115 } 116 117 r.NewAddress([]resolver.Address{{Addr: test.addresses[0]}}) 118 // The second RPC should succeed. 119 if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil { 120 t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err) 121 } 122} 123 124func TestBackendsRoundRobin(t *testing.T) { 125 defer leakcheck.Check(t) 126 r, cleanup := manual.GenerateAndRegisterManualResolver() 127 defer cleanup() 128 129 backendCount := 5 130 test, err := startTestServers(backendCount) 131 if err != nil { 132 t.Fatalf("failed to start servers: %v", err) 133 } 134 defer test.cleanup() 135 136 cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name)) 137 if err != nil { 138 t.Fatalf("failed to dial: %v", err) 139 } 140 defer cc.Close() 141 testc := testpb.NewTestServiceClient(cc) 142 // The first RPC should fail because there's no address. 143 ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) 144 defer cancel() 145 if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || status.Code(err) != codes.DeadlineExceeded { 146 t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err) 147 } 148 149 var resolvedAddrs []resolver.Address 150 for i := 0; i < backendCount; i++ { 151 resolvedAddrs = append(resolvedAddrs, resolver.Address{Addr: test.addresses[i]}) 152 } 153 154 r.NewAddress(resolvedAddrs) 155 var p peer.Peer 156 // Make sure connections to all servers are up. 157 for si := 0; si < backendCount; si++ { 158 var connected bool 159 for i := 0; i < 1000; i++ { 160 if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil { 161 t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err) 162 } 163 if p.Addr.String() == test.addresses[si] { 164 connected = true 165 break 166 } 167 time.Sleep(time.Millisecond) 168 } 169 if !connected { 170 t.Fatalf("Connection to %v was not up after more than 1 second", test.addresses[si]) 171 } 172 } 173 174 for i := 0; i < 3*backendCount; i++ { 175 if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil { 176 t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err) 177 } 178 if p.Addr.String() != test.addresses[i%backendCount] { 179 t.Fatalf("Index %d: want peer %v, got peer %v", i, test.addresses[i%backendCount], p.Addr.String()) 180 } 181 } 182} 183 184func TestAddressesRemoved(t *testing.T) { 185 defer leakcheck.Check(t) 186 r, cleanup := manual.GenerateAndRegisterManualResolver() 187 defer cleanup() 188 189 test, err := startTestServers(1) 190 if err != nil { 191 t.Fatalf("failed to start servers: %v", err) 192 } 193 defer test.cleanup() 194 195 cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name)) 196 if err != nil { 197 t.Fatalf("failed to dial: %v", err) 198 } 199 defer cc.Close() 200 testc := testpb.NewTestServiceClient(cc) 201 // The first RPC should fail because there's no address. 202 ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) 203 defer cancel() 204 if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || status.Code(err) != codes.DeadlineExceeded { 205 t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err) 206 } 207 208 r.NewAddress([]resolver.Address{{Addr: test.addresses[0]}}) 209 // The second RPC should succeed. 210 if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil { 211 t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err) 212 } 213 214 r.NewAddress([]resolver.Address{}) 215 for i := 0; i < 1000; i++ { 216 ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) 217 defer cancel() 218 if _, err := testc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); status.Code(err) == codes.DeadlineExceeded { 219 return 220 } 221 time.Sleep(time.Millisecond) 222 } 223 t.Fatalf("No RPC failed after removing all addresses, want RPC to fail with DeadlineExceeded") 224} 225 226func TestCloseWithPendingRPC(t *testing.T) { 227 defer leakcheck.Check(t) 228 r, cleanup := manual.GenerateAndRegisterManualResolver() 229 defer cleanup() 230 231 test, err := startTestServers(1) 232 if err != nil { 233 t.Fatalf("failed to start servers: %v", err) 234 } 235 defer test.cleanup() 236 237 cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name)) 238 if err != nil { 239 t.Fatalf("failed to dial: %v", err) 240 } 241 testc := testpb.NewTestServiceClient(cc) 242 243 var wg sync.WaitGroup 244 for i := 0; i < 3; i++ { 245 wg.Add(1) 246 go func() { 247 defer wg.Done() 248 // This RPC blocks until cc is closed. 249 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) 250 if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) == codes.DeadlineExceeded { 251 t.Errorf("RPC failed because of deadline after cc is closed; want error the client connection is closing") 252 } 253 cancel() 254 }() 255 } 256 cc.Close() 257 wg.Wait() 258} 259 260func TestNewAddressWhileBlocking(t *testing.T) { 261 defer leakcheck.Check(t) 262 r, cleanup := manual.GenerateAndRegisterManualResolver() 263 defer cleanup() 264 265 test, err := startTestServers(1) 266 if err != nil { 267 t.Fatalf("failed to start servers: %v", err) 268 } 269 defer test.cleanup() 270 271 cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name)) 272 if err != nil { 273 t.Fatalf("failed to dial: %v", err) 274 } 275 defer cc.Close() 276 testc := testpb.NewTestServiceClient(cc) 277 // The first RPC should fail because there's no address. 278 ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) 279 defer cancel() 280 if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || status.Code(err) != codes.DeadlineExceeded { 281 t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err) 282 } 283 284 r.NewAddress([]resolver.Address{{Addr: test.addresses[0]}}) 285 // The second RPC should succeed. 286 ctx, cancel = context.WithTimeout(context.Background(), 2*time.Second) 287 defer cancel() 288 if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err != nil { 289 t.Fatalf("EmptyCall() = _, %v, want _, nil", err) 290 } 291 292 r.NewAddress([]resolver.Address{}) 293 294 var wg sync.WaitGroup 295 for i := 0; i < 3; i++ { 296 wg.Add(1) 297 go func() { 298 defer wg.Done() 299 // This RPC blocks until NewAddress is called. 300 testc.EmptyCall(context.Background(), &testpb.Empty{}) 301 }() 302 } 303 time.Sleep(50 * time.Millisecond) 304 r.NewAddress([]resolver.Address{{Addr: test.addresses[0]}}) 305 wg.Wait() 306} 307 308func TestOneServerDown(t *testing.T) { 309 defer leakcheck.Check(t) 310 r, cleanup := manual.GenerateAndRegisterManualResolver() 311 defer cleanup() 312 313 backendCount := 3 314 test, err := startTestServers(backendCount) 315 if err != nil { 316 t.Fatalf("failed to start servers: %v", err) 317 } 318 defer test.cleanup() 319 320 cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name), grpc.WithWaitForHandshake()) 321 if err != nil { 322 t.Fatalf("failed to dial: %v", err) 323 } 324 defer cc.Close() 325 testc := testpb.NewTestServiceClient(cc) 326 // The first RPC should fail because there's no address. 327 ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) 328 defer cancel() 329 if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || status.Code(err) != codes.DeadlineExceeded { 330 t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err) 331 } 332 333 var resolvedAddrs []resolver.Address 334 for i := 0; i < backendCount; i++ { 335 resolvedAddrs = append(resolvedAddrs, resolver.Address{Addr: test.addresses[i]}) 336 } 337 338 r.NewAddress(resolvedAddrs) 339 var p peer.Peer 340 // Make sure connections to all servers are up. 341 for si := 0; si < backendCount; si++ { 342 var connected bool 343 for i := 0; i < 1000; i++ { 344 if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil { 345 t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err) 346 } 347 if p.Addr.String() == test.addresses[si] { 348 connected = true 349 break 350 } 351 time.Sleep(time.Millisecond) 352 } 353 if !connected { 354 t.Fatalf("Connection to %v was not up after more than 1 second", test.addresses[si]) 355 } 356 } 357 358 for i := 0; i < 3*backendCount; i++ { 359 if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil { 360 t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err) 361 } 362 if p.Addr.String() != test.addresses[i%backendCount] { 363 t.Fatalf("Index %d: want peer %v, got peer %v", i, test.addresses[i%backendCount], p.Addr.String()) 364 } 365 } 366 367 // Stop one server, RPCs should roundrobin among the remaining servers. 368 backendCount-- 369 test.servers[backendCount].Stop() 370 // Loop until see server[backendCount-1] twice without seeing server[backendCount]. 371 var targetSeen int 372 for i := 0; i < 1000; i++ { 373 if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil { 374 targetSeen = 0 375 t.Logf("EmptyCall() = _, %v, want _, <nil>", err) 376 // Due to a race, this RPC could possibly get the connection that 377 // was closing, and this RPC may fail. Keep trying when this 378 // happens. 379 continue 380 } 381 switch p.Addr.String() { 382 case test.addresses[backendCount-1]: 383 targetSeen++ 384 case test.addresses[backendCount]: 385 // Reset targetSeen if peer is server[backendCount]. 386 targetSeen = 0 387 } 388 // Break to make sure the last picked address is server[-1], so the following for loop won't be flaky. 389 if targetSeen >= 2 { 390 break 391 } 392 } 393 if targetSeen != 2 { 394 t.Fatal("Failed to see server[backendCount-1] twice without seeing server[backendCount]") 395 } 396 for i := 0; i < 3*backendCount; i++ { 397 if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil { 398 t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err) 399 } 400 if p.Addr.String() != test.addresses[i%backendCount] { 401 t.Errorf("Index %d: want peer %v, got peer %v", i, test.addresses[i%backendCount], p.Addr.String()) 402 } 403 } 404} 405 406func TestAllServersDown(t *testing.T) { 407 defer leakcheck.Check(t) 408 r, cleanup := manual.GenerateAndRegisterManualResolver() 409 defer cleanup() 410 411 backendCount := 3 412 test, err := startTestServers(backendCount) 413 if err != nil { 414 t.Fatalf("failed to start servers: %v", err) 415 } 416 defer test.cleanup() 417 418 cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name), grpc.WithWaitForHandshake()) 419 if err != nil { 420 t.Fatalf("failed to dial: %v", err) 421 } 422 defer cc.Close() 423 testc := testpb.NewTestServiceClient(cc) 424 // The first RPC should fail because there's no address. 425 ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) 426 defer cancel() 427 if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || status.Code(err) != codes.DeadlineExceeded { 428 t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err) 429 } 430 431 var resolvedAddrs []resolver.Address 432 for i := 0; i < backendCount; i++ { 433 resolvedAddrs = append(resolvedAddrs, resolver.Address{Addr: test.addresses[i]}) 434 } 435 436 r.NewAddress(resolvedAddrs) 437 var p peer.Peer 438 // Make sure connections to all servers are up. 439 for si := 0; si < backendCount; si++ { 440 var connected bool 441 for i := 0; i < 1000; i++ { 442 if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil { 443 t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err) 444 } 445 if p.Addr.String() == test.addresses[si] { 446 connected = true 447 break 448 } 449 time.Sleep(time.Millisecond) 450 } 451 if !connected { 452 t.Fatalf("Connection to %v was not up after more than 1 second", test.addresses[si]) 453 } 454 } 455 456 for i := 0; i < 3*backendCount; i++ { 457 if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil { 458 t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err) 459 } 460 if p.Addr.String() != test.addresses[i%backendCount] { 461 t.Fatalf("Index %d: want peer %v, got peer %v", i, test.addresses[i%backendCount], p.Addr.String()) 462 } 463 } 464 465 // All servers are stopped, failfast RPC should fail with unavailable. 466 for i := 0; i < backendCount; i++ { 467 test.servers[i].Stop() 468 } 469 time.Sleep(100 * time.Millisecond) 470 for i := 0; i < 1000; i++ { 471 if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) == codes.Unavailable { 472 return 473 } 474 time.Sleep(time.Millisecond) 475 } 476 t.Fatalf("Failfast RPCs didn't fail with Unavailable after all servers are stopped") 477} 478