1/* 2 * 3 * Copyright 2017 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package grpc 20 21import ( 22 "context" 23 "fmt" 24 "math" 25 "testing" 26 "time" 27 28 "google.golang.org/grpc/balancer" 29 "google.golang.org/grpc/balancer/roundrobin" 30 "google.golang.org/grpc/internal" 31 "google.golang.org/grpc/internal/balancer/stub" 32 "google.golang.org/grpc/resolver" 33 "google.golang.org/grpc/resolver/manual" 34 "google.golang.org/grpc/serviceconfig" 35) 36 37var _ balancer.Builder = &magicalLB{} 38var _ balancer.Balancer = &magicalLB{} 39 40// magicalLB is a ringer for grpclb. It is used to avoid circular dependencies on the grpclb package 41type magicalLB struct{} 42 43func (b *magicalLB) Name() string { 44 return "grpclb" 45} 46 47func (b *magicalLB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer { 48 return b 49} 50 51func (b *magicalLB) ResolverError(error) {} 52 53func (b *magicalLB) UpdateSubConnState(balancer.SubConn, balancer.SubConnState) {} 54 55func (b *magicalLB) UpdateClientConnState(balancer.ClientConnState) error { 56 return nil 57} 58 59func (b *magicalLB) Close() {} 60 61func init() { 62 balancer.Register(&magicalLB{}) 63} 64 65func startServers(t *testing.T, numServers int, maxStreams uint32) ([]*server, func()) { 66 var servers []*server 67 for i := 0; i < numServers; i++ { 68 s := newTestServer() 69 servers = append(servers, s) 70 go s.start(t, 0, maxStreams) 71 s.wait(t, 2*time.Second) 72 } 73 return servers, func() { 74 for i := 0; i < numServers; i++ { 75 servers[i].stop() 76 } 77 } 78} 79 80func checkPickFirst(cc *ClientConn, servers []*server) error { 81 var ( 82 req = "port" 83 reply string 84 err error 85 ) 86 connected := false 87 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) 88 defer cancel() 89 for i := 0; i < 5000; i++ { 90 if err = cc.Invoke(ctx, "/foo/bar", &req, &reply); errorDesc(err) == servers[0].port { 91 if connected { 92 // connected is set to false if peer is not server[0]. So if 93 // connected is true here, this is the second time we saw 94 // server[0] in a row. Break because pickfirst is in effect. 95 break 96 } 97 connected = true 98 } else { 99 connected = false 100 } 101 time.Sleep(time.Millisecond) 102 } 103 if !connected { 104 return fmt.Errorf("pickfirst is not in effect after 5 second, EmptyCall() = _, %v, want _, %v", err, servers[0].port) 105 } 106 107 // The following RPCs should all succeed with the first server. 108 for i := 0; i < 3; i++ { 109 err = cc.Invoke(ctx, "/foo/bar", &req, &reply) 110 if errorDesc(err) != servers[0].port { 111 return fmt.Errorf("index %d: want peer %v, got peer %v", i, servers[0].port, err) 112 } 113 } 114 return nil 115} 116 117func checkRoundRobin(cc *ClientConn, servers []*server) error { 118 var ( 119 req = "port" 120 reply string 121 err error 122 ) 123 124 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) 125 defer cancel() 126 // Make sure connections to all servers are up. 127 for i := 0; i < 2; i++ { 128 // Do this check twice, otherwise the first RPC's transport may still be 129 // picked by the closing pickfirst balancer, and the test becomes flaky. 130 for _, s := range servers { 131 var up bool 132 for i := 0; i < 5000; i++ { 133 if err = cc.Invoke(ctx, "/foo/bar", &req, &reply); errorDesc(err) == s.port { 134 up = true 135 break 136 } 137 time.Sleep(time.Millisecond) 138 } 139 if !up { 140 return fmt.Errorf("server %v is not up within 5 second", s.port) 141 } 142 } 143 } 144 145 serverCount := len(servers) 146 for i := 0; i < 3*serverCount; i++ { 147 err = cc.Invoke(ctx, "/foo/bar", &req, &reply) 148 if errorDesc(err) != servers[i%serverCount].port { 149 return fmt.Errorf("index %d: want peer %v, got peer %v", i, servers[i%serverCount].port, err) 150 } 151 } 152 return nil 153} 154 155func (s) TestSwitchBalancer(t *testing.T) { 156 r := manual.NewBuilderWithScheme("whatever") 157 158 const numServers = 2 159 servers, scleanup := startServers(t, numServers, math.MaxInt32) 160 defer scleanup() 161 162 cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r), WithCodec(testCodec{})) 163 if err != nil { 164 t.Fatalf("failed to dial: %v", err) 165 } 166 defer cc.Close() 167 addrs := []resolver.Address{{Addr: servers[0].addr}, {Addr: servers[1].addr}} 168 r.UpdateState(resolver.State{Addresses: addrs}) 169 // The default balancer is pickfirst. 170 if err := checkPickFirst(cc, servers); err != nil { 171 t.Fatalf("check pickfirst returned non-nil error: %v", err) 172 } 173 // Switch to roundrobin. 174 cc.updateResolverState(resolver.State{ServiceConfig: parseCfg(r, `{"loadBalancingPolicy": "round_robin"}`), Addresses: addrs}, nil) 175 if err := checkRoundRobin(cc, servers); err != nil { 176 t.Fatalf("check roundrobin returned non-nil error: %v", err) 177 } 178 // Switch to pickfirst. 179 cc.updateResolverState(resolver.State{ServiceConfig: parseCfg(r, `{"loadBalancingPolicy": "pick_first"}`), Addresses: addrs}, nil) 180 if err := checkPickFirst(cc, servers); err != nil { 181 t.Fatalf("check pickfirst returned non-nil error: %v", err) 182 } 183} 184 185// Test that balancer specified by dial option will not be overridden. 186func (s) TestBalancerDialOption(t *testing.T) { 187 r := manual.NewBuilderWithScheme("whatever") 188 189 const numServers = 2 190 servers, scleanup := startServers(t, numServers, math.MaxInt32) 191 defer scleanup() 192 193 cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r), WithCodec(testCodec{}), WithBalancerName(roundrobin.Name)) 194 if err != nil { 195 t.Fatalf("failed to dial: %v", err) 196 } 197 defer cc.Close() 198 addrs := []resolver.Address{{Addr: servers[0].addr}, {Addr: servers[1].addr}} 199 r.UpdateState(resolver.State{Addresses: addrs}) 200 // The init balancer is roundrobin. 201 if err := checkRoundRobin(cc, servers); err != nil { 202 t.Fatalf("check roundrobin returned non-nil error: %v", err) 203 } 204 // Switch to pickfirst. 205 cc.updateResolverState(resolver.State{ServiceConfig: parseCfg(r, `{"loadBalancingPolicy": "pick_first"}`), Addresses: addrs}, nil) 206 // Balancer is still roundrobin. 207 if err := checkRoundRobin(cc, servers); err != nil { 208 t.Fatalf("check roundrobin returned non-nil error: %v", err) 209 } 210} 211 212// First addr update contains grpclb. 213func (s) TestSwitchBalancerGRPCLBFirst(t *testing.T) { 214 r := manual.NewBuilderWithScheme("whatever") 215 216 cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r), WithCodec(testCodec{})) 217 if err != nil { 218 t.Fatalf("failed to dial: %v", err) 219 } 220 defer cc.Close() 221 222 // ClientConn will switch balancer to grpclb when receives an address of 223 // type GRPCLB. 224 r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}, {Addr: "grpclb", Type: resolver.GRPCLB}}}) 225 var isGRPCLB bool 226 for i := 0; i < 5000; i++ { 227 cc.mu.Lock() 228 isGRPCLB = cc.curBalancerName == "grpclb" 229 cc.mu.Unlock() 230 if isGRPCLB { 231 break 232 } 233 time.Sleep(time.Millisecond) 234 } 235 if !isGRPCLB { 236 t.Fatalf("after 5 second, cc.balancer is of type %v, not grpclb", cc.curBalancerName) 237 } 238 239 // New update containing new backend and new grpclb. Should not switch 240 // balancer. 241 r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend2"}, {Addr: "grpclb2", Type: resolver.GRPCLB}}}) 242 for i := 0; i < 200; i++ { 243 cc.mu.Lock() 244 isGRPCLB = cc.curBalancerName == "grpclb" 245 cc.mu.Unlock() 246 if !isGRPCLB { 247 break 248 } 249 time.Sleep(time.Millisecond) 250 } 251 if !isGRPCLB { 252 t.Fatalf("within 200 ms, cc.balancer switched to !grpclb, want grpclb") 253 } 254 255 var isPickFirst bool 256 // Switch balancer to pickfirst. 257 r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}}}) 258 for i := 0; i < 5000; i++ { 259 cc.mu.Lock() 260 isPickFirst = cc.curBalancerName == PickFirstBalancerName 261 cc.mu.Unlock() 262 if isPickFirst { 263 break 264 } 265 time.Sleep(time.Millisecond) 266 } 267 if !isPickFirst { 268 t.Fatalf("after 5 second, cc.balancer is of type %v, not pick_first", cc.curBalancerName) 269 } 270} 271 272// First addr update does not contain grpclb. 273func (s) TestSwitchBalancerGRPCLBSecond(t *testing.T) { 274 r := manual.NewBuilderWithScheme("whatever") 275 276 cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r), WithCodec(testCodec{})) 277 if err != nil { 278 t.Fatalf("failed to dial: %v", err) 279 } 280 defer cc.Close() 281 282 r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}}}) 283 var isPickFirst bool 284 for i := 0; i < 5000; i++ { 285 cc.mu.Lock() 286 isPickFirst = cc.curBalancerName == PickFirstBalancerName 287 cc.mu.Unlock() 288 if isPickFirst { 289 break 290 } 291 time.Sleep(time.Millisecond) 292 } 293 if !isPickFirst { 294 t.Fatalf("after 5 second, cc.balancer is of type %v, not pick_first", cc.curBalancerName) 295 } 296 297 // ClientConn will switch balancer to grpclb when receives an address of 298 // type GRPCLB. 299 r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}, {Addr: "grpclb", Type: resolver.GRPCLB}}}) 300 var isGRPCLB bool 301 for i := 0; i < 5000; i++ { 302 cc.mu.Lock() 303 isGRPCLB = cc.curBalancerName == "grpclb" 304 cc.mu.Unlock() 305 if isGRPCLB { 306 break 307 } 308 time.Sleep(time.Millisecond) 309 } 310 if !isGRPCLB { 311 t.Fatalf("after 5 second, cc.balancer is of type %v, not grpclb", cc.curBalancerName) 312 } 313 314 // New update containing new backend and new grpclb. Should not switch 315 // balancer. 316 r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend2"}, {Addr: "grpclb2", Type: resolver.GRPCLB}}}) 317 for i := 0; i < 200; i++ { 318 cc.mu.Lock() 319 isGRPCLB = cc.curBalancerName == "grpclb" 320 cc.mu.Unlock() 321 if !isGRPCLB { 322 break 323 } 324 time.Sleep(time.Millisecond) 325 } 326 if !isGRPCLB { 327 t.Fatalf("within 200 ms, cc.balancer switched to !grpclb, want grpclb") 328 } 329 330 // Switch balancer back. 331 r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}}}) 332 for i := 0; i < 5000; i++ { 333 cc.mu.Lock() 334 isPickFirst = cc.curBalancerName == PickFirstBalancerName 335 cc.mu.Unlock() 336 if isPickFirst { 337 break 338 } 339 time.Sleep(time.Millisecond) 340 } 341 if !isPickFirst { 342 t.Fatalf("after 5 second, cc.balancer is of type %v, not pick_first", cc.curBalancerName) 343 } 344} 345 346// Test that if the current balancer is roundrobin, after switching to grpclb, 347// when the resolved address doesn't contain grpclb addresses, balancer will be 348// switched back to roundrobin. 349func (s) TestSwitchBalancerGRPCLBRoundRobin(t *testing.T) { 350 r := manual.NewBuilderWithScheme("whatever") 351 352 cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r), WithCodec(testCodec{})) 353 if err != nil { 354 t.Fatalf("failed to dial: %v", err) 355 } 356 defer cc.Close() 357 358 sc := parseCfg(r, `{"loadBalancingPolicy": "round_robin"}`) 359 360 r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}}, ServiceConfig: sc}) 361 var isRoundRobin bool 362 for i := 0; i < 5000; i++ { 363 cc.mu.Lock() 364 isRoundRobin = cc.curBalancerName == "round_robin" 365 cc.mu.Unlock() 366 if isRoundRobin { 367 break 368 } 369 time.Sleep(time.Millisecond) 370 } 371 if !isRoundRobin { 372 t.Fatalf("after 5 second, cc.balancer is of type %v, not round_robin", cc.curBalancerName) 373 } 374 375 // ClientConn will switch balancer to grpclb when receives an address of 376 // type GRPCLB. 377 r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "grpclb", Type: resolver.GRPCLB}}, ServiceConfig: sc}) 378 var isGRPCLB bool 379 for i := 0; i < 5000; i++ { 380 cc.mu.Lock() 381 isGRPCLB = cc.curBalancerName == "grpclb" 382 cc.mu.Unlock() 383 if isGRPCLB { 384 break 385 } 386 time.Sleep(time.Millisecond) 387 } 388 if !isGRPCLB { 389 t.Fatalf("after 5 second, cc.balancer is of type %v, not grpclb", cc.curBalancerName) 390 } 391 392 // Switch balancer back. 393 r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}}, ServiceConfig: sc}) 394 for i := 0; i < 5000; i++ { 395 cc.mu.Lock() 396 isRoundRobin = cc.curBalancerName == "round_robin" 397 cc.mu.Unlock() 398 if isRoundRobin { 399 break 400 } 401 time.Sleep(time.Millisecond) 402 } 403 if !isRoundRobin { 404 t.Fatalf("after 5 second, cc.balancer is of type %v, not round_robin", cc.curBalancerName) 405 } 406} 407 408// Test that if resolved address list contains grpclb, the balancer option in 409// service config won't take effect. But when there's no grpclb address in a new 410// resolved address list, balancer will be switched to the new one. 411func (s) TestSwitchBalancerGRPCLBServiceConfig(t *testing.T) { 412 r := manual.NewBuilderWithScheme("whatever") 413 414 cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r), WithCodec(testCodec{})) 415 if err != nil { 416 t.Fatalf("failed to dial: %v", err) 417 } 418 defer cc.Close() 419 420 r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}}}) 421 var isPickFirst bool 422 for i := 0; i < 5000; i++ { 423 cc.mu.Lock() 424 isPickFirst = cc.curBalancerName == PickFirstBalancerName 425 cc.mu.Unlock() 426 if isPickFirst { 427 break 428 } 429 time.Sleep(time.Millisecond) 430 } 431 if !isPickFirst { 432 t.Fatalf("after 5 second, cc.balancer is of type %v, not pick_first", cc.curBalancerName) 433 } 434 435 // ClientConn will switch balancer to grpclb when receives an address of 436 // type GRPCLB. 437 addrs := []resolver.Address{{Addr: "grpclb", Type: resolver.GRPCLB}} 438 r.UpdateState(resolver.State{Addresses: addrs}) 439 var isGRPCLB bool 440 for i := 0; i < 5000; i++ { 441 cc.mu.Lock() 442 isGRPCLB = cc.curBalancerName == "grpclb" 443 cc.mu.Unlock() 444 if isGRPCLB { 445 break 446 } 447 time.Sleep(time.Millisecond) 448 } 449 if !isGRPCLB { 450 t.Fatalf("after 5 second, cc.balancer is of type %v, not grpclb", cc.curBalancerName) 451 } 452 453 sc := parseCfg(r, `{"loadBalancingPolicy": "round_robin"}`) 454 r.UpdateState(resolver.State{Addresses: addrs, ServiceConfig: sc}) 455 var isRoundRobin bool 456 for i := 0; i < 200; i++ { 457 cc.mu.Lock() 458 isRoundRobin = cc.curBalancerName == "round_robin" 459 cc.mu.Unlock() 460 if isRoundRobin { 461 break 462 } 463 time.Sleep(time.Millisecond) 464 } 465 // Balancer should NOT switch to round_robin because resolved list contains 466 // grpclb. 467 if isRoundRobin { 468 t.Fatalf("within 200 ms, cc.balancer switched to round_robin, want grpclb") 469 } 470 471 // Switch balancer back. 472 r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}}, ServiceConfig: sc}) 473 for i := 0; i < 5000; i++ { 474 cc.mu.Lock() 475 isRoundRobin = cc.curBalancerName == "round_robin" 476 cc.mu.Unlock() 477 if isRoundRobin { 478 break 479 } 480 time.Sleep(time.Millisecond) 481 } 482 if !isRoundRobin { 483 t.Fatalf("after 5 second, cc.balancer is of type %v, not round_robin", cc.curBalancerName) 484 } 485} 486 487// Test that when switching to grpclb fails because grpclb is not registered, 488// the fallback balancer will only get backend addresses, not the grpclb server 489// address. 490// 491// The tests sends 3 server addresses (all backends) as resolved addresses, but 492// claim the first one is grpclb server. The all RPCs should all be send to the 493// other addresses, not the first one. 494func (s) TestSwitchBalancerGRPCLBWithGRPCLBNotRegistered(t *testing.T) { 495 internal.BalancerUnregister("grpclb") 496 defer balancer.Register(&magicalLB{}) 497 498 r := manual.NewBuilderWithScheme("whatever") 499 500 const numServers = 3 501 servers, scleanup := startServers(t, numServers, math.MaxInt32) 502 defer scleanup() 503 504 cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r), WithCodec(testCodec{})) 505 if err != nil { 506 t.Fatalf("failed to dial: %v", err) 507 } 508 defer cc.Close() 509 r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: servers[1].addr}, {Addr: servers[2].addr}}}) 510 // The default balancer is pickfirst. 511 if err := checkPickFirst(cc, servers[1:]); err != nil { 512 t.Fatalf("check pickfirst returned non-nil error: %v", err) 513 } 514 // Try switching to grpclb by sending servers[0] as grpclb address. It's 515 // expected that servers[0] will be filtered out, so it will not be used by 516 // the balancer. 517 // 518 // If the filtering failed, servers[0] will be used for RPCs and the RPCs 519 // will succeed. The following checks will catch this and fail. 520 addrs := []resolver.Address{ 521 {Addr: servers[0].addr, Type: resolver.GRPCLB}, 522 {Addr: servers[1].addr}, {Addr: servers[2].addr}} 523 r.UpdateState(resolver.State{Addresses: addrs}) 524 // Still check for pickfirst, but only with server[1] and server[2]. 525 if err := checkPickFirst(cc, servers[1:]); err != nil { 526 t.Fatalf("check pickfirst returned non-nil error: %v", err) 527 } 528 // Switch to roundrobin, and check against server[1] and server[2]. 529 cc.updateResolverState(resolver.State{ServiceConfig: parseCfg(r, `{"loadBalancingPolicy": "round_robin"}`), Addresses: addrs}, nil) 530 if err := checkRoundRobin(cc, servers[1:]); err != nil { 531 t.Fatalf("check roundrobin returned non-nil error: %v", err) 532 } 533} 534 535const inlineRemoveSubConnBalancerName = "test-inline-remove-subconn-balancer" 536 537func init() { 538 stub.Register(inlineRemoveSubConnBalancerName, stub.BalancerFuncs{ 539 Close: func(data *stub.BalancerData) { 540 data.ClientConn.RemoveSubConn(&acBalancerWrapper{}) 541 }, 542 }) 543} 544 545// Test that when switching to balancers, the old balancer calls RemoveSubConn 546// in Close. 547// 548// This test is to make sure this close doesn't cause a deadlock. 549func (s) TestSwitchBalancerOldRemoveSubConn(t *testing.T) { 550 r := manual.NewBuilderWithScheme("whatever") 551 cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r)) 552 if err != nil { 553 t.Fatalf("failed to dial: %v", err) 554 } 555 defer cc.Close() 556 cc.updateResolverState(resolver.State{ServiceConfig: parseCfg(r, fmt.Sprintf(`{"loadBalancingPolicy": "%v"}`, inlineRemoveSubConnBalancerName))}, nil) 557 // This service config update will switch balancer from 558 // "test-inline-remove-subconn-balancer" to "pick_first". The test balancer 559 // will be closed, which will call cc.RemoveSubConn() inline (this 560 // RemoveSubConn is not required by the API, but some balancers might do 561 // it). 562 // 563 // This is to make sure the cc.RemoveSubConn() from Close() doesn't cause a 564 // deadlock (e.g. trying to grab a mutex while it's already locked). 565 // 566 // Do it in a goroutine so this test will fail with a helpful message 567 // (though the goroutine will still leak). 568 done := make(chan struct{}) 569 go func() { 570 cc.updateResolverState(resolver.State{ServiceConfig: parseCfg(r, `{"loadBalancingPolicy": "pick_first"}`)}, nil) 571 close(done) 572 }() 573 select { 574 case <-time.After(defaultTestTimeout): 575 t.Fatalf("timeout waiting for updateResolverState to finish") 576 case <-done: 577 } 578} 579 580func parseCfg(r *manual.Resolver, s string) *serviceconfig.ParseResult { 581 scpr := r.CC.ParseServiceConfig(s) 582 if scpr.Err != nil { 583 panic(fmt.Sprintf("Error parsing config %q: %v", s, scpr.Err)) 584 } 585 return scpr 586} 587