1/* 2 * 3 * Copyright 2019 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package balancer 20 21import ( 22 "bytes" 23 "encoding/json" 24 "fmt" 25 "reflect" 26 "sync" 27 "testing" 28 "time" 29 30 corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" 31 "github.com/golang/protobuf/jsonpb" 32 wrapperspb "github.com/golang/protobuf/ptypes/wrappers" 33 "github.com/google/go-cmp/cmp" 34 "google.golang.org/grpc" 35 "google.golang.org/grpc/balancer" 36 "google.golang.org/grpc/connectivity" 37 "google.golang.org/grpc/internal/grpctest" 38 "google.golang.org/grpc/internal/leakcheck" 39 scpb "google.golang.org/grpc/internal/proto/grpc_service_config" 40 "google.golang.org/grpc/resolver" 41 "google.golang.org/grpc/serviceconfig" 42 "google.golang.org/grpc/xds/internal/balancer/lrs" 43 xdsclient "google.golang.org/grpc/xds/internal/client" 44 "google.golang.org/grpc/xds/internal/client/bootstrap" 45 "google.golang.org/grpc/xds/internal/testutils" 46 "google.golang.org/grpc/xds/internal/testutils/fakexds" 47) 48 49var lbABuilder = &balancerABuilder{} 50 51func init() { 52 balancer.Register(&edsBalancerBuilder{}) 53 balancer.Register(lbABuilder) 54 balancer.Register(&balancerBBuilder{}) 55 56 bootstrapConfigNew = func() *bootstrap.Config { 57 return &bootstrap.Config{ 58 BalancerName: "", 59 Creds: grpc.WithInsecure(), 60 NodeProto: &corepb.Node{}, 61 } 62 } 63} 64 65type s struct{} 66 67func (s) Teardown(t *testing.T) { 68 leakcheck.Check(t) 69} 70 71func Test(t *testing.T) { 72 grpctest.RunSubTests(t, s{}) 73} 74 75const ( 76 fakeBalancerA = "fake_balancer_A" 77 fakeBalancerB = "fake_balancer_B" 78) 79 80var ( 81 testBalancerNameFooBar = "foo.bar" 82 testLBConfigFooBar = &XDSConfig{ 83 BalancerName: testBalancerNameFooBar, 84 ChildPolicy: &loadBalancingConfig{Name: fakeBalancerB}, 85 FallBackPolicy: &loadBalancingConfig{Name: fakeBalancerA}, 86 EDSServiceName: testEDSClusterName, 87 } 88 89 specialAddrForBalancerA = resolver.Address{Addr: "this.is.balancer.A"} 90 specialAddrForBalancerB = resolver.Address{Addr: "this.is.balancer.B"} 91) 92 93type balancerABuilder struct { 94 mu sync.Mutex 95 lastBalancer *balancerA 96} 97 98func (b *balancerABuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer { 99 b.mu.Lock() 100 b.lastBalancer = &balancerA{cc: cc, subconnStateChange: testutils.NewChannelWithSize(10)} 101 b.mu.Unlock() 102 return b.lastBalancer 103} 104 105func (b *balancerABuilder) Name() string { 106 return string(fakeBalancerA) 107} 108 109func (b *balancerABuilder) getLastBalancer() *balancerA { 110 b.mu.Lock() 111 defer b.mu.Unlock() 112 return b.lastBalancer 113} 114 115func (b *balancerABuilder) clearLastBalancer() { 116 b.mu.Lock() 117 defer b.mu.Unlock() 118 b.lastBalancer = nil 119} 120 121type balancerBBuilder struct{} 122 123func (b *balancerBBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer { 124 return &balancerB{cc: cc} 125} 126 127func (*balancerBBuilder) Name() string { 128 return string(fakeBalancerB) 129} 130 131// A fake balancer implementation which does two things: 132// * Appends a unique address to the list of resolved addresses received before 133// attempting to create a SubConn. 134// * Makes the received subConn state changes available through a channel, for 135// the test to inspect. 136type balancerA struct { 137 cc balancer.ClientConn 138 subconnStateChange *testutils.Channel 139} 140 141func (b *balancerA) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) { 142 b.subconnStateChange.Send(&scStateChange{sc: sc, state: state}) 143} 144 145func (b *balancerA) HandleResolvedAddrs(addrs []resolver.Address, err error) { 146 _, _ = b.cc.NewSubConn(append(addrs, specialAddrForBalancerA), balancer.NewSubConnOptions{}) 147} 148 149func (b *balancerA) Close() {} 150 151func (b *balancerA) waitForSubConnStateChange(wantState *scStateChange) error { 152 return waitForSubConnStateChange(b.subconnStateChange, wantState) 153} 154 155// A fake balancer implementation which appends a unique address to the list of 156// resolved addresses received before attempting to create a SubConn. 157type balancerB struct { 158 cc balancer.ClientConn 159} 160 161func (b *balancerB) HandleResolvedAddrs(addrs []resolver.Address, err error) { 162 _, _ = b.cc.NewSubConn(append(addrs, specialAddrForBalancerB), balancer.NewSubConnOptions{}) 163} 164 165func (balancerB) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) { 166 panic("implement me") 167} 168func (balancerB) Close() {} 169 170func newTestClientConn() *testClientConn { 171 return &testClientConn{newSubConns: testutils.NewChannelWithSize(10)} 172} 173 174type testClientConn struct { 175 newSubConns *testutils.Channel 176} 177 178func (t *testClientConn) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) { 179 t.newSubConns.Send(addrs) 180 return nil, nil 181} 182 183func (t *testClientConn) waitForNewSubConns(wantAddrs []resolver.Address) error { 184 val, err := t.newSubConns.Receive() 185 if err != nil { 186 return fmt.Errorf("error waiting for subconns: %v", err) 187 } 188 gotAddrs := val.([]resolver.Address) 189 if !reflect.DeepEqual(gotAddrs, wantAddrs) { 190 return fmt.Errorf("got subconn address %v, want %v", gotAddrs, wantAddrs) 191 } 192 return nil 193} 194 195func (testClientConn) RemoveSubConn(balancer.SubConn) {} 196func (testClientConn) UpdateBalancerState(connectivity.State, balancer.Picker) {} 197func (testClientConn) UpdateState(balancer.State) {} 198func (testClientConn) ResolveNow(resolver.ResolveNowOptions) {} 199func (testClientConn) Target() string { return testServiceName } 200 201type scStateChange struct { 202 sc balancer.SubConn 203 state connectivity.State 204} 205 206type fakeEDSBalancer struct { 207 cc balancer.ClientConn 208 childPolicy *testutils.Channel 209 subconnStateChange *testutils.Channel 210 loadStore lrs.Store 211} 212 213func (f *fakeEDSBalancer) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) { 214 f.subconnStateChange.Send(&scStateChange{sc: sc, state: state}) 215} 216 217func (f *fakeEDSBalancer) HandleChildPolicy(name string, config json.RawMessage) { 218 f.childPolicy.Send(&loadBalancingConfig{Name: name, Config: config}) 219} 220 221func (f *fakeEDSBalancer) Close() {} 222func (f *fakeEDSBalancer) HandleEDSResponse(edsResp *xdsclient.EDSUpdate) {} 223 224func (f *fakeEDSBalancer) waitForChildPolicy(wantPolicy *loadBalancingConfig) error { 225 val, err := f.childPolicy.Receive() 226 if err != nil { 227 return fmt.Errorf("error waiting for childPolicy: %v", err) 228 } 229 gotPolicy := val.(*loadBalancingConfig) 230 if !reflect.DeepEqual(gotPolicy, wantPolicy) { 231 return fmt.Errorf("got childPolicy %v, want %v", gotPolicy, wantPolicy) 232 } 233 return nil 234} 235 236func (f *fakeEDSBalancer) waitForSubConnStateChange(wantState *scStateChange) error { 237 return waitForSubConnStateChange(f.subconnStateChange, wantState) 238} 239 240func waitForSubConnStateChange(ch *testutils.Channel, wantState *scStateChange) error { 241 val, err := ch.Receive() 242 if err != nil { 243 return fmt.Errorf("error waiting for subconnStateChange: %v", err) 244 } 245 gotState := val.(*scStateChange) 246 if !reflect.DeepEqual(gotState, wantState) { 247 return fmt.Errorf("got subconnStateChange %v, want %v", gotState, wantState) 248 } 249 return nil 250} 251 252func newFakeEDSBalancer(cc balancer.ClientConn, loadStore lrs.Store) edsBalancerInterface { 253 return &fakeEDSBalancer{ 254 cc: cc, 255 childPolicy: testutils.NewChannelWithSize(10), 256 subconnStateChange: testutils.NewChannelWithSize(10), 257 loadStore: loadStore, 258 } 259} 260 261type fakeSubConn struct{} 262 263func (*fakeSubConn) UpdateAddresses([]resolver.Address) { panic("implement me") } 264func (*fakeSubConn) Connect() { panic("implement me") } 265 266// TestXDSFallbackResolvedAddrs verifies that the fallback balancer specified 267// in the provided lbconfig is initialized, and that it receives the addresses 268// pushed by the resolver. 269// 270// The test does the following: 271// * Builds a new xds balancer. 272// * Since there is no xDS server to respond to requests from the xds client 273// (created as part of the xds balancer), we expect the fallback policy to 274// kick in. 275// * Repeatedly pushes new ClientConnState which specifies the same fallback 276// policy, but a different set of resolved addresses. 277// * The fallback policy is implemented by a fake balancer, which appends a 278// unique address to the list of addresses it uses to create the SubConn. 279// * We also have a fake ClientConn which verifies that it receives the 280// expected address list. 281func (s) TestXDSFallbackResolvedAddrs(t *testing.T) { 282 startupTimeout = 500 * time.Millisecond 283 defer func() { startupTimeout = defaultTimeout }() 284 285 builder := balancer.Get(edsName) 286 cc := newTestClientConn() 287 edsB, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}).(*edsBalancer) 288 if !ok { 289 t.Fatalf("builder.Build(%s) returned type {%T}, want {*edsBalancer}", edsName, edsB) 290 } 291 defer edsB.Close() 292 293 tests := []struct { 294 resolvedAddrs []resolver.Address 295 wantAddrs []resolver.Address 296 }{ 297 { 298 resolvedAddrs: []resolver.Address{{Addr: "1.1.1.1:10001"}, {Addr: "2.2.2.2:10002"}}, 299 wantAddrs: []resolver.Address{{Addr: "1.1.1.1:10001"}, {Addr: "2.2.2.2:10002"}, specialAddrForBalancerA}, 300 }, 301 { 302 resolvedAddrs: []resolver.Address{{Addr: "1.1.1.1:10001"}}, 303 wantAddrs: []resolver.Address{{Addr: "1.1.1.1:10001"}, specialAddrForBalancerA}, 304 }, 305 } 306 for _, test := range tests { 307 edsB.UpdateClientConnState(balancer.ClientConnState{ 308 ResolverState: resolver.State{Addresses: test.resolvedAddrs}, 309 BalancerConfig: testLBConfigFooBar, 310 }) 311 if err := cc.waitForNewSubConns(test.wantAddrs); err != nil { 312 t.Fatal(err) 313 } 314 } 315} 316 317// waitForNewXDSClientWithEDSWatch makes sure that a new xdsClient is created 318// with the provided name. It also make sure that the newly created client 319// registers an eds watcher. 320func waitForNewXDSClientWithEDSWatch(t *testing.T, ch *testutils.Channel, wantName string) *fakexds.Client { 321 t.Helper() 322 323 val, err := ch.Receive() 324 if err != nil { 325 t.Fatalf("error when waiting for a new xds client: %v", err) 326 return nil 327 } 328 xdsC := val.(*fakexds.Client) 329 if xdsC.Name() != wantName { 330 t.Fatalf("xdsClient created to balancer: %v, want %v", xdsC.Name(), wantName) 331 return nil 332 } 333 _, err = xdsC.WaitForWatchEDS() 334 if err != nil { 335 t.Fatalf("xdsClient.WatchEDS failed with error: %v", err) 336 return nil 337 } 338 return xdsC 339} 340 341// waitForNewEDSLB makes sure that a new edsLB is created by the top-level 342// edsBalancer. 343func waitForNewEDSLB(t *testing.T, ch *testutils.Channel) *fakeEDSBalancer { 344 t.Helper() 345 346 val, err := ch.Receive() 347 if err != nil { 348 t.Fatalf("error when waiting for a new edsLB: %v", err) 349 return nil 350 } 351 return val.(*fakeEDSBalancer) 352} 353 354// setup overrides the functions which are used to create the xdsClient and the 355// edsLB, creates fake version of them and makes them available on the provided 356// channels. The returned cancel function should be called by the test for 357// cleanup. 358func setup(edsLBCh *testutils.Channel, xdsClientCh *testutils.Channel) func() { 359 origNewEDSBalancer := newEDSBalancer 360 newEDSBalancer = func(cc balancer.ClientConn, loadStore lrs.Store) edsBalancerInterface { 361 edsLB := newFakeEDSBalancer(cc, loadStore) 362 defer func() { edsLBCh.Send(edsLB) }() 363 return edsLB 364 } 365 366 origXdsClientNew := xdsclientNew 367 xdsclientNew = func(opts xdsclient.Options) (xdsClientInterface, error) { 368 xdsC := fakexds.NewClientWithName(opts.Config.BalancerName) 369 defer func() { xdsClientCh.Send(xdsC) }() 370 return xdsC, nil 371 } 372 return func() { 373 newEDSBalancer = origNewEDSBalancer 374 xdsclientNew = origXdsClientNew 375 } 376} 377 378// setupForFallback performs everything that setup does and in addition 379// overrides the fallback startupTimeout to a small value to trigger fallback 380// in tests. 381func setupForFallback(edsLBCh *testutils.Channel, xdsClientCh *testutils.Channel) func() { 382 cancel := setup(edsLBCh, xdsClientCh) 383 startupTimeout = 500 * time.Millisecond 384 return func() { 385 cancel() 386 startupTimeout = defaultTimeout 387 } 388} 389 390// TestXDSConfigBalancerNameUpdate verifies different scenarios where the 391// balancer name in the lbConfig is updated. 392// 393// The test does the following: 394// * Builds a new xds balancer. 395// * Since there is no xDS server to respond to requests from the xds client 396// (created as part of the xds balancer), we expect the fallback policy to 397// kick in. 398// * Repeatedly pushes new ClientConnState which specifies different 399// balancerName in the lbConfig. We expect xdsClient objects to created 400// whenever the balancerName changes. We also expect a new edsLB to created 401// the first time the client receives an edsUpdate. 402func (s) TestXDSConfigBalancerNameUpdate(t *testing.T) { 403 edsLBCh := testutils.NewChannel() 404 xdsClientCh := testutils.NewChannel() 405 cancel := setupForFallback(edsLBCh, xdsClientCh) 406 defer cancel() 407 408 builder := balancer.Get(edsName) 409 cc := newTestClientConn() 410 edsB, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testEDSClusterName}}).(*edsBalancer) 411 if !ok { 412 t.Fatalf("builder.Build(%s) returned type {%T}, want {*edsBalancer}", edsName, edsB) 413 } 414 defer edsB.Close() 415 416 addrs := []resolver.Address{{Addr: "1.1.1.1:10001"}, {Addr: "2.2.2.2:10002"}, {Addr: "3.3.3.3:10003"}} 417 edsB.UpdateClientConnState(balancer.ClientConnState{ 418 ResolverState: resolver.State{Addresses: addrs}, 419 BalancerConfig: testLBConfigFooBar, 420 }) 421 422 waitForNewXDSClientWithEDSWatch(t, xdsClientCh, testBalancerNameFooBar) 423 // Verify that fallbackLB (fakeBalancerA) takes over, since the xdsClient 424 // receives no edsUpdate. 425 if err := cc.waitForNewSubConns(append(addrs, specialAddrForBalancerA)); err != nil { 426 t.Fatal(err) 427 } 428 429 for i := 0; i < 2; i++ { 430 balancerName := fmt.Sprintf("balancer-%d", i) 431 edsB.UpdateClientConnState(balancer.ClientConnState{ 432 ResolverState: resolver.State{Addresses: addrs}, 433 BalancerConfig: &XDSConfig{ 434 BalancerName: balancerName, 435 ChildPolicy: &loadBalancingConfig{Name: fakeBalancerA}, 436 FallBackPolicy: &loadBalancingConfig{Name: fakeBalancerA}, 437 EDSServiceName: testEDSClusterName, 438 }, 439 }) 440 441 xdsC := waitForNewXDSClientWithEDSWatch(t, xdsClientCh, balancerName) 442 xdsC.InvokeWatchEDSCallback(&xdsclient.EDSUpdate{}, nil) 443 444 // In the first iteration, an edsLB takes over from the fallbackLB. In the 445 // second iteration, a new xds client is created, but the same edsLB is used. 446 if i == 0 { 447 if _, err := edsLBCh.Receive(); err != nil { 448 t.Fatalf("edsBalancer did not create edsLB after receiveing EDS update: %v, %d", err, i) 449 } 450 } else { 451 if _, err := edsLBCh.Receive(); err == nil { 452 t.Fatal("edsBalancer created new edsLB when it was not expected to") 453 } 454 } 455 } 456} 457 458// TestXDSConnfigChildPolicyUpdate verifies scenarios where the childPolicy 459// section of the lbConfig is updated. 460// 461// The test does the following: 462// * Builds a new xds balancer. 463// * Pushes a new ClientConnState with a childPolicy set to fakeBalancerA. 464// Verifies that a new xdsClient is created. It then pushes a new edsUpdate 465// through the fakexds client. Verifies that a new edsLB is created and it 466// receives the expected childPolicy. 467// * Pushes a new ClientConnState with a childPolicy set to fakeBalancerB. 468// This time around, we expect no new xdsClient or edsLB to be created. 469// Instead, we expect the existing edsLB to receive the new child policy. 470func (s) TestXDSConnfigChildPolicyUpdate(t *testing.T) { 471 edsLBCh := testutils.NewChannel() 472 xdsClientCh := testutils.NewChannel() 473 cancel := setup(edsLBCh, xdsClientCh) 474 defer cancel() 475 476 builder := balancer.Get(edsName) 477 cc := newTestClientConn() 478 edsB, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}).(*edsBalancer) 479 if !ok { 480 t.Fatalf("builder.Build(%s) returned type {%T}, want {*edsBalancer}", edsName, edsB) 481 } 482 defer edsB.Close() 483 484 edsB.UpdateClientConnState(balancer.ClientConnState{ 485 BalancerConfig: &XDSConfig{ 486 BalancerName: testBalancerNameFooBar, 487 ChildPolicy: &loadBalancingConfig{ 488 Name: fakeBalancerA, 489 Config: json.RawMessage("{}"), 490 }, 491 EDSServiceName: testEDSClusterName, 492 }, 493 }) 494 xdsC := waitForNewXDSClientWithEDSWatch(t, xdsClientCh, testBalancerNameFooBar) 495 xdsC.InvokeWatchEDSCallback(&xdsclient.EDSUpdate{}, nil) 496 edsLB := waitForNewEDSLB(t, edsLBCh) 497 edsLB.waitForChildPolicy(&loadBalancingConfig{ 498 Name: string(fakeBalancerA), 499 Config: json.RawMessage(`{}`), 500 }) 501 502 edsB.UpdateClientConnState(balancer.ClientConnState{ 503 BalancerConfig: &XDSConfig{ 504 BalancerName: testBalancerNameFooBar, 505 ChildPolicy: &loadBalancingConfig{ 506 Name: fakeBalancerB, 507 Config: json.RawMessage("{}"), 508 }, 509 EDSServiceName: testEDSClusterName, 510 }, 511 }) 512 edsLB.waitForChildPolicy(&loadBalancingConfig{ 513 Name: string(fakeBalancerA), 514 Config: json.RawMessage(`{}`), 515 }) 516} 517 518// TestXDSConfigFallBackUpdate verifies different scenarios where the fallback 519// config part of the lbConfig is updated. 520// 521// The test does the following: 522// * Builds a top-level edsBalancer 523// * Fakes the xdsClient and the underlying edsLB implementations. 524// * Sends a ClientConn update to the edsBalancer with a bogus balancerName. 525// This will get the balancer into fallback monitoring, but since the 526// startupTimeout package variable is not overridden to a small value, fallback 527// will not kick-in as yet. 528// * Sends another ClientConn update with fallback addresses. Still fallback 529// would not have kicked in because the startupTimeout hasn't expired. 530// * Sends an EDSUpdate through the fakexds.Client object. This will trigger 531// the creation of an edsLB object. This is verified. 532// * Trigger fallback by directly calling the loseContact method on the 533// top-level edsBalancer. This should instantiate the fallbackLB and should 534// send the appropriate subConns. 535// * Update the fallback policy to specify and different fallback LB and make 536// sure the new LB receives appropriate subConns. 537func (s) TestXDSConfigFallBackUpdate(t *testing.T) { 538 edsLBCh := testutils.NewChannel() 539 xdsClientCh := testutils.NewChannel() 540 cancel := setup(edsLBCh, xdsClientCh) 541 defer cancel() 542 543 builder := balancer.Get(edsName) 544 cc := newTestClientConn() 545 edsB, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testEDSClusterName}}).(*edsBalancer) 546 if !ok { 547 t.Fatalf("builder.Build(%s) returned type {%T}, want {*edsBalancer}", edsName, edsB) 548 } 549 defer edsB.Close() 550 551 bogusBalancerName := "wrong-balancer-name" 552 edsB.UpdateClientConnState(balancer.ClientConnState{ 553 BalancerConfig: &XDSConfig{ 554 BalancerName: bogusBalancerName, 555 FallBackPolicy: &loadBalancingConfig{Name: fakeBalancerA}, 556 }, 557 }) 558 xdsC := waitForNewXDSClientWithEDSWatch(t, xdsClientCh, bogusBalancerName) 559 560 addrs := []resolver.Address{{Addr: "1.1.1.1:10001"}, {Addr: "2.2.2.2:10002"}, {Addr: "3.3.3.3:10003"}} 561 edsB.UpdateClientConnState(balancer.ClientConnState{ 562 ResolverState: resolver.State{Addresses: addrs}, 563 BalancerConfig: &XDSConfig{ 564 BalancerName: bogusBalancerName, 565 FallBackPolicy: &loadBalancingConfig{Name: fakeBalancerB}, 566 }, 567 }) 568 569 xdsC.InvokeWatchEDSCallback(&xdsclient.EDSUpdate{}, nil) 570 if _, err := edsLBCh.Receive(); err != nil { 571 t.Fatalf("edsBalancer did not create edsLB after receiveing EDS update: %v", err) 572 } 573 574 // Call loseContact explicitly, error in EDS callback is not handled. 575 // Eventually, this should call EDS ballback with an error that indicates 576 // "lost contact". 577 edsB.loseContact() 578 579 // Verify that fallback (fakeBalancerB) takes over. 580 if err := cc.waitForNewSubConns(append(addrs, specialAddrForBalancerB)); err != nil { 581 t.Fatal(err) 582 } 583 584 edsB.UpdateClientConnState(balancer.ClientConnState{ 585 ResolverState: resolver.State{Addresses: addrs}, 586 BalancerConfig: &XDSConfig{ 587 BalancerName: bogusBalancerName, 588 FallBackPolicy: &loadBalancingConfig{Name: fakeBalancerA}, 589 }, 590 }) 591 592 // Verify that fallbackLB (fakeBalancerA) takes over. 593 if err := cc.waitForNewSubConns(append(addrs, specialAddrForBalancerA)); err != nil { 594 t.Fatal(err) 595 } 596} 597 598// TestXDSSubConnStateChange verifies if the top-level edsBalancer passes on 599// the subConnStateChange to appropriate child balancers (it tests for edsLB 600// and a fallbackLB). 601func (s) TestXDSSubConnStateChange(t *testing.T) { 602 edsLBCh := testutils.NewChannel() 603 xdsClientCh := testutils.NewChannel() 604 cancel := setup(edsLBCh, xdsClientCh) 605 defer cancel() 606 607 builder := balancer.Get(edsName) 608 cc := newTestClientConn() 609 edsB, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testEDSClusterName}}).(*edsBalancer) 610 if !ok { 611 t.Fatalf("builder.Build(%s) returned type {%T}, want {*edsBalancer}", edsName, edsB) 612 } 613 defer edsB.Close() 614 615 addrs := []resolver.Address{{Addr: "1.1.1.1:10001"}, {Addr: "2.2.2.2:10002"}, {Addr: "3.3.3.3:10003"}} 616 edsB.UpdateClientConnState(balancer.ClientConnState{ 617 ResolverState: resolver.State{Addresses: addrs}, 618 BalancerConfig: &XDSConfig{ 619 BalancerName: testBalancerNameFooBar, 620 ChildPolicy: &loadBalancingConfig{Name: fakeBalancerA}, 621 FallBackPolicy: &loadBalancingConfig{Name: fakeBalancerA}, 622 EDSServiceName: testEDSClusterName, 623 }, 624 }) 625 626 xdsC := waitForNewXDSClientWithEDSWatch(t, xdsClientCh, testBalancerNameFooBar) 627 xdsC.InvokeWatchEDSCallback(&xdsclient.EDSUpdate{}, nil) 628 edsLB := waitForNewEDSLB(t, edsLBCh) 629 630 fsc := &fakeSubConn{} 631 state := connectivity.Ready 632 edsB.UpdateSubConnState(fsc, balancer.SubConnState{ConnectivityState: state}) 633 edsLB.waitForSubConnStateChange(&scStateChange{sc: fsc, state: state}) 634 635 // lbABuilder maintains a pointer to the last balancerA that it created. We 636 // need to clear that to make sure a new one is created when we attempt to 637 // fallback in the next line. 638 lbABuilder.clearLastBalancer() 639 // Call loseContact explicitly, error in EDS callback is not handled. 640 // Eventually, this should call EDS ballback with an error that indicates 641 // "lost contact". 642 edsB.loseContact() 643 // Verify that fallback (fakeBalancerA) takes over. 644 if err := cc.waitForNewSubConns(append(addrs, specialAddrForBalancerA)); err != nil { 645 t.Fatal(err) 646 } 647 fblb := lbABuilder.getLastBalancer() 648 if fblb == nil { 649 t.Fatal("expected fallback balancerA to be built on fallback") 650 } 651 edsB.UpdateSubConnState(fsc, balancer.SubConnState{ConnectivityState: state}) 652 fblb.waitForSubConnStateChange(&scStateChange{sc: fsc, state: state}) 653} 654 655func TestXdsBalancerConfigParsing(t *testing.T) { 656 const testEDSName = "eds.service" 657 var testLRSName = "lrs.server" 658 b := bytes.NewBuffer(nil) 659 if err := (&jsonpb.Marshaler{}).Marshal(b, &scpb.XdsConfig{ 660 ChildPolicy: []*scpb.LoadBalancingConfig{ 661 {Policy: &scpb.LoadBalancingConfig_Xds{}}, 662 {Policy: &scpb.LoadBalancingConfig_RoundRobin{ 663 RoundRobin: &scpb.RoundRobinConfig{}, 664 }}, 665 }, 666 FallbackPolicy: []*scpb.LoadBalancingConfig{ 667 {Policy: &scpb.LoadBalancingConfig_Xds{}}, 668 {Policy: &scpb.LoadBalancingConfig_PickFirst{ 669 PickFirst: &scpb.PickFirstConfig{}, 670 }}, 671 }, 672 EdsServiceName: testEDSName, 673 LrsLoadReportingServerName: &wrapperspb.StringValue{Value: testLRSName}, 674 }); err != nil { 675 t.Fatalf("%v", err) 676 } 677 678 tests := []struct { 679 name string 680 js json.RawMessage 681 want serviceconfig.LoadBalancingConfig 682 wantErr bool 683 }{ 684 { 685 name: "jsonpb-generated", 686 js: b.Bytes(), 687 want: &XDSConfig{ 688 ChildPolicy: &loadBalancingConfig{ 689 Name: "round_robin", 690 Config: json.RawMessage("{}"), 691 }, 692 FallBackPolicy: &loadBalancingConfig{ 693 Name: "pick_first", 694 Config: json.RawMessage("{}"), 695 }, 696 EDSServiceName: testEDSName, 697 LrsLoadReportingServerName: &testLRSName, 698 }, 699 wantErr: false, 700 }, 701 { 702 // json with random balancers, and the first is not registered. 703 name: "manually-generated", 704 js: json.RawMessage(` 705{ 706 "balancerName": "fake.foo.bar", 707 "childPolicy": [ 708 {"fake_balancer_C": {}}, 709 {"fake_balancer_A": {}}, 710 {"fake_balancer_B": {}} 711 ], 712 "fallbackPolicy": [ 713 {"fake_balancer_C": {}}, 714 {"fake_balancer_B": {}}, 715 {"fake_balancer_A": {}} 716 ], 717 "edsServiceName": "eds.service", 718 "lrsLoadReportingServerName": "lrs.server" 719}`), 720 want: &XDSConfig{ 721 BalancerName: "fake.foo.bar", 722 ChildPolicy: &loadBalancingConfig{ 723 Name: "fake_balancer_A", 724 Config: json.RawMessage("{}"), 725 }, 726 FallBackPolicy: &loadBalancingConfig{ 727 Name: "fake_balancer_B", 728 Config: json.RawMessage("{}"), 729 }, 730 EDSServiceName: testEDSName, 731 LrsLoadReportingServerName: &testLRSName, 732 }, 733 wantErr: false, 734 }, 735 { 736 // json with no lrs server name, LrsLoadReportingServerName should 737 // be nil (not an empty string). 738 name: "no-lrs-server-name", 739 js: json.RawMessage(` 740{ 741 "balancerName": "fake.foo.bar", 742 "edsServiceName": "eds.service" 743}`), 744 want: &XDSConfig{ 745 BalancerName: "fake.foo.bar", 746 EDSServiceName: testEDSName, 747 LrsLoadReportingServerName: nil, 748 }, 749 wantErr: false, 750 }, 751 } 752 for _, tt := range tests { 753 t.Run(tt.name, func(t *testing.T) { 754 b := &edsBalancerBuilder{} 755 got, err := b.ParseConfig(tt.js) 756 if (err != nil) != tt.wantErr { 757 t.Errorf("edsBalancerBuilder.ParseConfig() error = %v, wantErr %v", err, tt.wantErr) 758 return 759 } 760 if !cmp.Equal(got, tt.want) { 761 t.Errorf(cmp.Diff(got, tt.want)) 762 } 763 }) 764 } 765} 766func TestLoadbalancingConfigParsing(t *testing.T) { 767 tests := []struct { 768 name string 769 s string 770 want *XDSConfig 771 }{ 772 { 773 name: "empty", 774 s: "{}", 775 want: &XDSConfig{}, 776 }, 777 { 778 name: "success1", 779 s: `{"childPolicy":[{"pick_first":{}}]}`, 780 want: &XDSConfig{ 781 ChildPolicy: &loadBalancingConfig{ 782 Name: "pick_first", 783 Config: json.RawMessage(`{}`), 784 }, 785 }, 786 }, 787 { 788 name: "success2", 789 s: `{"childPolicy":[{"round_robin":{}},{"pick_first":{}}]}`, 790 want: &XDSConfig{ 791 ChildPolicy: &loadBalancingConfig{ 792 Name: "round_robin", 793 Config: json.RawMessage(`{}`), 794 }, 795 }, 796 }, 797 } 798 for _, tt := range tests { 799 t.Run(tt.name, func(t *testing.T) { 800 var cfg XDSConfig 801 if err := json.Unmarshal([]byte(tt.s), &cfg); err != nil || !reflect.DeepEqual(&cfg, tt.want) { 802 t.Errorf("test name: %s, parseFullServiceConfig() = %+v, err: %v, want %+v, <nil>", tt.name, cfg, err, tt.want) 803 } 804 }) 805 } 806} 807 808func TestEqualStringPointers(t *testing.T) { 809 var ( 810 ta1 = "test-a" 811 ta2 = "test-a" 812 tb = "test-b" 813 ) 814 tests := []struct { 815 name string 816 a *string 817 b *string 818 want bool 819 }{ 820 {"both-nil", nil, nil, true}, 821 {"a-non-nil", &ta1, nil, false}, 822 {"b-non-nil", nil, &tb, false}, 823 {"equal", &ta1, &ta2, true}, 824 {"different", &ta1, &tb, false}, 825 } 826 for _, tt := range tests { 827 t.Run(tt.name, func(t *testing.T) { 828 if got := equalStringPointers(tt.a, tt.b); got != tt.want { 829 t.Errorf("equalStringPointers() = %v, want %v", got, tt.want) 830 } 831 }) 832 } 833} 834