1// +build go1.12 2 3/* 4 * 5 * Copyright 2019 gRPC authors. 6 * 7 * Licensed under the Apache License, Version 2.0 (the "License"); 8 * you may not use this file except in compliance with the License. 9 * You may obtain a copy of the License at 10 * 11 * http://www.apache.org/licenses/LICENSE-2.0 12 * 13 * Unless required by applicable law or agreed to in writing, software 14 * distributed under the License is distributed on an "AS IS" BASIS, 15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 16 * See the License for the specific language governing permissions and 17 * limitations under the License. 18 * 19 */ 20 21package edsbalancer 22 23import ( 24 "bytes" 25 "context" 26 "encoding/json" 27 "fmt" 28 "reflect" 29 "testing" 30 "time" 31 32 "github.com/golang/protobuf/jsonpb" 33 wrapperspb "github.com/golang/protobuf/ptypes/wrappers" 34 "github.com/google/go-cmp/cmp" 35 "google.golang.org/grpc/balancer" 36 "google.golang.org/grpc/connectivity" 37 "google.golang.org/grpc/internal/grpclog" 38 "google.golang.org/grpc/internal/grpctest" 39 scpb "google.golang.org/grpc/internal/proto/grpc_service_config" 40 "google.golang.org/grpc/internal/testutils" 41 "google.golang.org/grpc/resolver" 42 "google.golang.org/grpc/serviceconfig" 43 "google.golang.org/grpc/xds/internal" 44 xdsclient "google.golang.org/grpc/xds/internal/client" 45 "google.golang.org/grpc/xds/internal/client/load" 46 "google.golang.org/grpc/xds/internal/testutils/fakeclient" 47 48 _ "google.golang.org/grpc/xds/internal/client/v2" // V2 client registration. 49) 50 51const ( 52 defaultTestTimeout = 1 * time.Second 53 defaultTestShortTimeout = 10 * time.Millisecond 54 testServiceName = "test/foo" 55 testEDSClusterName = "test/service/eds" 56) 57 58var ( 59 // A non-empty endpoints update which is expected to be accepted by the EDS 60 // LB policy. 61 defaultEndpointsUpdate = xdsclient.EndpointsUpdate{ 62 Localities: []xdsclient.Locality{ 63 { 64 Endpoints: []xdsclient.Endpoint{{Address: "endpoint1"}}, 65 ID: internal.LocalityID{Zone: "zone"}, 66 Priority: 1, 67 Weight: 100, 68 }, 69 }, 70 } 71) 72 73func init() { 74 balancer.Register(&edsBalancerBuilder{}) 75} 76 77func subConnFromPicker(p balancer.Picker) func() balancer.SubConn { 78 return func() balancer.SubConn { 79 scst, _ := p.Pick(balancer.PickInfo{}) 80 return scst.SubConn 81 } 82} 83 84type s struct { 85 grpctest.Tester 86} 87 88func Test(t *testing.T) { 89 grpctest.RunSubTests(t, s{}) 90} 91 92const testBalancerNameFooBar = "foo.bar" 93 94func newNoopTestClientConn() *noopTestClientConn { 95 return &noopTestClientConn{} 96} 97 98// noopTestClientConn is used in EDS balancer config update tests that only 99// cover the config update handling, but not SubConn/load-balancing. 100type noopTestClientConn struct { 101 balancer.ClientConn 102} 103 104func (t *noopTestClientConn) NewSubConn([]resolver.Address, balancer.NewSubConnOptions) (balancer.SubConn, error) { 105 return nil, nil 106} 107 108func (noopTestClientConn) Target() string { return testServiceName } 109 110type scStateChange struct { 111 sc balancer.SubConn 112 state connectivity.State 113} 114 115type fakeEDSBalancer struct { 116 cc balancer.ClientConn 117 childPolicy *testutils.Channel 118 subconnStateChange *testutils.Channel 119 edsUpdate *testutils.Channel 120 serviceName *testutils.Channel 121 serviceRequestMax *testutils.Channel 122 clusterName *testutils.Channel 123} 124 125func (f *fakeEDSBalancer) handleSubConnStateChange(sc balancer.SubConn, state connectivity.State) { 126 f.subconnStateChange.Send(&scStateChange{sc: sc, state: state}) 127} 128 129func (f *fakeEDSBalancer) handleChildPolicy(name string, config json.RawMessage) { 130 f.childPolicy.Send(&loadBalancingConfig{Name: name, Config: config}) 131} 132 133func (f *fakeEDSBalancer) handleEDSResponse(edsResp xdsclient.EndpointsUpdate) { 134 f.edsUpdate.Send(edsResp) 135} 136 137func (f *fakeEDSBalancer) updateState(priority priorityType, s balancer.State) {} 138 139func (f *fakeEDSBalancer) updateServiceRequestsConfig(serviceName string, max *uint32) { 140 f.serviceName.Send(serviceName) 141 f.serviceRequestMax.Send(max) 142} 143 144func (f *fakeEDSBalancer) updateClusterName(name string) { 145 f.clusterName.Send(name) 146} 147 148func (f *fakeEDSBalancer) close() {} 149 150func (f *fakeEDSBalancer) waitForChildPolicy(ctx context.Context, wantPolicy *loadBalancingConfig) error { 151 val, err := f.childPolicy.Receive(ctx) 152 if err != nil { 153 return err 154 } 155 gotPolicy := val.(*loadBalancingConfig) 156 if !cmp.Equal(gotPolicy, wantPolicy) { 157 return fmt.Errorf("got childPolicy %v, want %v", gotPolicy, wantPolicy) 158 } 159 return nil 160} 161 162func (f *fakeEDSBalancer) waitForSubConnStateChange(ctx context.Context, wantState *scStateChange) error { 163 val, err := f.subconnStateChange.Receive(ctx) 164 if err != nil { 165 return err 166 } 167 gotState := val.(*scStateChange) 168 if !cmp.Equal(gotState, wantState, cmp.AllowUnexported(scStateChange{})) { 169 return fmt.Errorf("got subconnStateChange %v, want %v", gotState, wantState) 170 } 171 return nil 172} 173 174func (f *fakeEDSBalancer) waitForEDSResponse(ctx context.Context, wantUpdate xdsclient.EndpointsUpdate) error { 175 val, err := f.edsUpdate.Receive(ctx) 176 if err != nil { 177 return err 178 } 179 gotUpdate := val.(xdsclient.EndpointsUpdate) 180 if !reflect.DeepEqual(gotUpdate, wantUpdate) { 181 return fmt.Errorf("got edsUpdate %+v, want %+v", gotUpdate, wantUpdate) 182 } 183 return nil 184} 185 186func (f *fakeEDSBalancer) waitForCounterUpdate(ctx context.Context, wantServiceName string) error { 187 val, err := f.serviceName.Receive(ctx) 188 if err != nil { 189 return err 190 } 191 gotServiceName := val.(string) 192 if gotServiceName != wantServiceName { 193 return fmt.Errorf("got serviceName %v, want %v", gotServiceName, wantServiceName) 194 } 195 return nil 196} 197 198func (f *fakeEDSBalancer) waitForCountMaxUpdate(ctx context.Context, want *uint32) error { 199 val, err := f.serviceRequestMax.Receive(ctx) 200 if err != nil { 201 return err 202 } 203 got := val.(*uint32) 204 205 if got == nil && want == nil { 206 return nil 207 } 208 if got != nil && want != nil { 209 if *got != *want { 210 return fmt.Errorf("got countMax %v, want %v", *got, *want) 211 } 212 return nil 213 } 214 return fmt.Errorf("got countMax %+v, want %+v", got, want) 215} 216 217func (f *fakeEDSBalancer) waitForClusterNameUpdate(ctx context.Context, wantClusterName string) error { 218 val, err := f.clusterName.Receive(ctx) 219 if err != nil { 220 return err 221 } 222 gotServiceName := val.(string) 223 if gotServiceName != wantClusterName { 224 return fmt.Errorf("got clusterName %v, want %v", gotServiceName, wantClusterName) 225 } 226 return nil 227} 228 229func newFakeEDSBalancer(cc balancer.ClientConn) edsBalancerImplInterface { 230 return &fakeEDSBalancer{ 231 cc: cc, 232 childPolicy: testutils.NewChannelWithSize(10), 233 subconnStateChange: testutils.NewChannelWithSize(10), 234 edsUpdate: testutils.NewChannelWithSize(10), 235 serviceName: testutils.NewChannelWithSize(10), 236 serviceRequestMax: testutils.NewChannelWithSize(10), 237 clusterName: testutils.NewChannelWithSize(10), 238 } 239} 240 241type fakeSubConn struct{} 242 243func (*fakeSubConn) UpdateAddresses([]resolver.Address) { panic("implement me") } 244func (*fakeSubConn) Connect() { panic("implement me") } 245 246// waitForNewEDSLB makes sure that a new edsLB is created by the top-level 247// edsBalancer. 248func waitForNewEDSLB(ctx context.Context, ch *testutils.Channel) (*fakeEDSBalancer, error) { 249 val, err := ch.Receive(ctx) 250 if err != nil { 251 return nil, fmt.Errorf("error when waiting for a new edsLB: %v", err) 252 } 253 return val.(*fakeEDSBalancer), nil 254} 255 256// setup overrides the functions which are used to create the xdsClient and the 257// edsLB, creates fake version of them and makes them available on the provided 258// channels. The returned cancel function should be called by the test for 259// cleanup. 260func setup(edsLBCh *testutils.Channel) (*fakeclient.Client, func()) { 261 xdsC := fakeclient.NewClientWithName(testBalancerNameFooBar) 262 oldNewXDSClient := newXDSClient 263 newXDSClient = func() (xdsClientInterface, error) { return xdsC, nil } 264 265 origNewEDSBalancer := newEDSBalancer 266 newEDSBalancer = func(cc balancer.ClientConn, _ balancer.BuildOptions, _ func(priorityType, balancer.State), _ load.PerClusterReporter, _ *grpclog.PrefixLogger) edsBalancerImplInterface { 267 edsLB := newFakeEDSBalancer(cc) 268 defer func() { edsLBCh.Send(edsLB) }() 269 return edsLB 270 } 271 return xdsC, func() { 272 newEDSBalancer = origNewEDSBalancer 273 newXDSClient = oldNewXDSClient 274 } 275} 276 277const ( 278 fakeBalancerA = "fake_balancer_A" 279 fakeBalancerB = "fake_balancer_B" 280) 281 282// Install two fake balancers for service config update tests. 283// 284// ParseConfig only accepts the json if the balancer specified is registered. 285func init() { 286 balancer.Register(&fakeBalancerBuilder{name: fakeBalancerA}) 287 balancer.Register(&fakeBalancerBuilder{name: fakeBalancerB}) 288} 289 290type fakeBalancerBuilder struct { 291 name string 292} 293 294func (b *fakeBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer { 295 return &fakeBalancer{cc: cc} 296} 297 298func (b *fakeBalancerBuilder) Name() string { 299 return b.name 300} 301 302type fakeBalancer struct { 303 cc balancer.ClientConn 304} 305 306func (b *fakeBalancer) ResolverError(error) { 307 panic("implement me") 308} 309 310func (b *fakeBalancer) UpdateClientConnState(balancer.ClientConnState) error { 311 panic("implement me") 312} 313 314func (b *fakeBalancer) UpdateSubConnState(balancer.SubConn, balancer.SubConnState) { 315 panic("implement me") 316} 317 318func (b *fakeBalancer) Close() {} 319 320// TestConfigChildPolicyUpdate verifies scenarios where the childPolicy 321// section of the lbConfig is updated. 322// 323// The test does the following: 324// * Builds a new EDS balancer. 325// * Pushes a new ClientConnState with a childPolicy set to fakeBalancerA. 326// Verifies that an EDS watch is registered. It then pushes a new edsUpdate 327// through the fakexds client. Verifies that a new edsLB is created and it 328// receives the expected childPolicy. 329// * Pushes a new ClientConnState with a childPolicy set to fakeBalancerB. 330// Verifies that the existing edsLB receives the new child policy. 331func (s) TestConfigChildPolicyUpdate(t *testing.T) { 332 edsLBCh := testutils.NewChannel() 333 xdsC, cleanup := setup(edsLBCh) 334 defer cleanup() 335 336 builder := balancer.Get(edsName) 337 edsB := builder.Build(newNoopTestClientConn(), balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}) 338 if edsB == nil { 339 t.Fatalf("builder.Build(%s) failed and returned nil", edsName) 340 } 341 defer edsB.Close() 342 343 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) 344 defer cancel() 345 edsLB, err := waitForNewEDSLB(ctx, edsLBCh) 346 if err != nil { 347 t.Fatal(err) 348 } 349 350 lbCfgA := &loadBalancingConfig{ 351 Name: fakeBalancerA, 352 Config: json.RawMessage("{}"), 353 } 354 if err := edsB.UpdateClientConnState(balancer.ClientConnState{ 355 BalancerConfig: &EDSConfig{ 356 ChildPolicy: lbCfgA, 357 EDSServiceName: testServiceName, 358 }, 359 }); err != nil { 360 t.Fatalf("edsB.UpdateClientConnState() failed: %v", err) 361 } 362 363 if _, err := xdsC.WaitForWatchEDS(ctx); err != nil { 364 t.Fatalf("xdsClient.WatchEndpoints failed with error: %v", err) 365 } 366 xdsC.InvokeWatchEDSCallback(defaultEndpointsUpdate, nil) 367 if err := edsLB.waitForChildPolicy(ctx, lbCfgA); err != nil { 368 t.Fatal(err) 369 } 370 if err := edsLB.waitForCounterUpdate(ctx, testServiceName); err != nil { 371 t.Fatal(err) 372 } 373 if err := edsLB.waitForCountMaxUpdate(ctx, nil); err != nil { 374 t.Fatal(err) 375 } 376 377 var testCountMax uint32 = 100 378 lbCfgB := &loadBalancingConfig{ 379 Name: fakeBalancerB, 380 Config: json.RawMessage("{}"), 381 } 382 if err := edsB.UpdateClientConnState(balancer.ClientConnState{ 383 BalancerConfig: &EDSConfig{ 384 ChildPolicy: lbCfgB, 385 EDSServiceName: testServiceName, 386 MaxConcurrentRequests: &testCountMax, 387 }, 388 }); err != nil { 389 t.Fatalf("edsB.UpdateClientConnState() failed: %v", err) 390 } 391 if err := edsLB.waitForChildPolicy(ctx, lbCfgB); err != nil { 392 t.Fatal(err) 393 } 394 if err := edsLB.waitForCounterUpdate(ctx, testServiceName); err != nil { 395 // Counter is updated even though the service name didn't change. The 396 // eds_impl will compare the service names, and skip if it didn't change. 397 t.Fatal(err) 398 } 399 if err := edsLB.waitForCountMaxUpdate(ctx, &testCountMax); err != nil { 400 t.Fatal(err) 401 } 402} 403 404// TestSubConnStateChange verifies if the top-level edsBalancer passes on 405// the subConnStateChange to appropriate child balancer. 406func (s) TestSubConnStateChange(t *testing.T) { 407 edsLBCh := testutils.NewChannel() 408 xdsC, cleanup := setup(edsLBCh) 409 defer cleanup() 410 411 builder := balancer.Get(edsName) 412 edsB := builder.Build(newNoopTestClientConn(), balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}) 413 if edsB == nil { 414 t.Fatalf("builder.Build(%s) failed and returned nil", edsName) 415 } 416 defer edsB.Close() 417 418 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) 419 defer cancel() 420 edsLB, err := waitForNewEDSLB(ctx, edsLBCh) 421 if err != nil { 422 t.Fatal(err) 423 } 424 425 if err := edsB.UpdateClientConnState(balancer.ClientConnState{ 426 BalancerConfig: &EDSConfig{EDSServiceName: testServiceName}, 427 }); err != nil { 428 t.Fatalf("edsB.UpdateClientConnState() failed: %v", err) 429 } 430 431 if _, err := xdsC.WaitForWatchEDS(ctx); err != nil { 432 t.Fatalf("xdsClient.WatchEndpoints failed with error: %v", err) 433 } 434 xdsC.InvokeWatchEDSCallback(defaultEndpointsUpdate, nil) 435 436 fsc := &fakeSubConn{} 437 state := connectivity.Ready 438 edsB.UpdateSubConnState(fsc, balancer.SubConnState{ConnectivityState: state}) 439 if err := edsLB.waitForSubConnStateChange(ctx, &scStateChange{sc: fsc, state: state}); err != nil { 440 t.Fatal(err) 441 } 442} 443 444// TestErrorFromXDSClientUpdate verifies that an error from xdsClient update is 445// handled correctly. 446// 447// If it's resource-not-found, watch will NOT be canceled, the EDS impl will 448// receive an empty EDS update, and new RPCs will fail. 449// 450// If it's connection error, nothing will happen. This will need to change to 451// handle fallback. 452func (s) TestErrorFromXDSClientUpdate(t *testing.T) { 453 edsLBCh := testutils.NewChannel() 454 xdsC, cleanup := setup(edsLBCh) 455 defer cleanup() 456 457 builder := balancer.Get(edsName) 458 edsB := builder.Build(newNoopTestClientConn(), balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}) 459 if edsB == nil { 460 t.Fatalf("builder.Build(%s) failed and returned nil", edsName) 461 } 462 defer edsB.Close() 463 464 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) 465 defer cancel() 466 edsLB, err := waitForNewEDSLB(ctx, edsLBCh) 467 if err != nil { 468 t.Fatal(err) 469 } 470 471 if err := edsB.UpdateClientConnState(balancer.ClientConnState{ 472 BalancerConfig: &EDSConfig{EDSServiceName: testServiceName}, 473 }); err != nil { 474 t.Fatal(err) 475 } 476 477 if _, err := xdsC.WaitForWatchEDS(ctx); err != nil { 478 t.Fatalf("xdsClient.WatchEndpoints failed with error: %v", err) 479 } 480 xdsC.InvokeWatchEDSCallback(xdsclient.EndpointsUpdate{}, nil) 481 if err := edsLB.waitForEDSResponse(ctx, xdsclient.EndpointsUpdate{}); err != nil { 482 t.Fatalf("EDS impl got unexpected EDS response: %v", err) 483 } 484 485 connectionErr := xdsclient.NewErrorf(xdsclient.ErrorTypeConnection, "connection error") 486 xdsC.InvokeWatchEDSCallback(xdsclient.EndpointsUpdate{}, connectionErr) 487 488 sCtx, sCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) 489 defer sCancel() 490 if err := xdsC.WaitForCancelEDSWatch(sCtx); err != context.DeadlineExceeded { 491 t.Fatal("watch was canceled, want not canceled (timeout error)") 492 } 493 494 sCtx, sCancel = context.WithTimeout(context.Background(), defaultTestShortTimeout) 495 defer sCancel() 496 if err := edsLB.waitForEDSResponse(sCtx, xdsclient.EndpointsUpdate{}); err != context.DeadlineExceeded { 497 t.Fatal(err) 498 } 499 500 resourceErr := xdsclient.NewErrorf(xdsclient.ErrorTypeResourceNotFound, "edsBalancer resource not found error") 501 xdsC.InvokeWatchEDSCallback(xdsclient.EndpointsUpdate{}, resourceErr) 502 // Even if error is resource not found, watch shouldn't be canceled, because 503 // this is an EDS resource removed (and xds client actually never sends this 504 // error, but we still handles it). 505 sCtx, sCancel = context.WithTimeout(context.Background(), defaultTestShortTimeout) 506 defer sCancel() 507 if err := xdsC.WaitForCancelEDSWatch(sCtx); err != context.DeadlineExceeded { 508 t.Fatal("watch was canceled, want not canceled (timeout error)") 509 } 510 if err := edsLB.waitForEDSResponse(ctx, xdsclient.EndpointsUpdate{}); err != nil { 511 t.Fatalf("eds impl expecting empty update, got %v", err) 512 } 513} 514 515// TestErrorFromResolver verifies that resolver errors are handled correctly. 516// 517// If it's resource-not-found, watch will be canceled, the EDS impl will receive 518// an empty EDS update, and new RPCs will fail. 519// 520// If it's connection error, nothing will happen. This will need to change to 521// handle fallback. 522func (s) TestErrorFromResolver(t *testing.T) { 523 edsLBCh := testutils.NewChannel() 524 xdsC, cleanup := setup(edsLBCh) 525 defer cleanup() 526 527 builder := balancer.Get(edsName) 528 edsB := builder.Build(newNoopTestClientConn(), balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}) 529 if edsB == nil { 530 t.Fatalf("builder.Build(%s) failed and returned nil", edsName) 531 } 532 defer edsB.Close() 533 534 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) 535 defer cancel() 536 edsLB, err := waitForNewEDSLB(ctx, edsLBCh) 537 if err != nil { 538 t.Fatal(err) 539 } 540 541 if err := edsB.UpdateClientConnState(balancer.ClientConnState{ 542 BalancerConfig: &EDSConfig{EDSServiceName: testServiceName}, 543 }); err != nil { 544 t.Fatal(err) 545 } 546 547 if _, err := xdsC.WaitForWatchEDS(ctx); err != nil { 548 t.Fatalf("xdsClient.WatchEndpoints failed with error: %v", err) 549 } 550 xdsC.InvokeWatchEDSCallback(xdsclient.EndpointsUpdate{}, nil) 551 if err := edsLB.waitForEDSResponse(ctx, xdsclient.EndpointsUpdate{}); err != nil { 552 t.Fatalf("EDS impl got unexpected EDS response: %v", err) 553 } 554 555 connectionErr := xdsclient.NewErrorf(xdsclient.ErrorTypeConnection, "connection error") 556 edsB.ResolverError(connectionErr) 557 558 sCtx, sCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) 559 defer sCancel() 560 if err := xdsC.WaitForCancelEDSWatch(sCtx); err != context.DeadlineExceeded { 561 t.Fatal("watch was canceled, want not canceled (timeout error)") 562 } 563 564 sCtx, sCancel = context.WithTimeout(context.Background(), defaultTestShortTimeout) 565 defer sCancel() 566 if err := edsLB.waitForEDSResponse(sCtx, xdsclient.EndpointsUpdate{}); err != context.DeadlineExceeded { 567 t.Fatal("eds impl got EDS resp, want timeout error") 568 } 569 570 resourceErr := xdsclient.NewErrorf(xdsclient.ErrorTypeResourceNotFound, "edsBalancer resource not found error") 571 edsB.ResolverError(resourceErr) 572 if err := xdsC.WaitForCancelEDSWatch(ctx); err != nil { 573 t.Fatalf("want watch to be canceled, waitForCancel failed: %v", err) 574 } 575 if err := edsLB.waitForEDSResponse(ctx, xdsclient.EndpointsUpdate{}); err != nil { 576 t.Fatalf("EDS impl got unexpected EDS response: %v", err) 577 } 578} 579 580// Given a list of resource names, verifies that EDS requests for the same are 581// sent by the EDS balancer, through the fake xDS client. 582func verifyExpectedRequests(ctx context.Context, fc *fakeclient.Client, resourceNames ...string) error { 583 for _, name := range resourceNames { 584 if name == "" { 585 // ResourceName empty string indicates a cancel. 586 if err := fc.WaitForCancelEDSWatch(ctx); err != nil { 587 return fmt.Errorf("timed out when expecting resource %q", name) 588 } 589 return nil 590 } 591 592 resName, err := fc.WaitForWatchEDS(ctx) 593 if err != nil { 594 return fmt.Errorf("timed out when expecting resource %q, %p", name, fc) 595 } 596 if resName != name { 597 return fmt.Errorf("got EDS request for resource %q, expected: %q", resName, name) 598 } 599 } 600 return nil 601} 602 603// TestClientWatchEDS verifies that the xdsClient inside the top-level EDS LB 604// policy registers an EDS watch for expected resource upon receiving an update 605// from gRPC. 606func (s) TestClientWatchEDS(t *testing.T) { 607 edsLBCh := testutils.NewChannel() 608 xdsC, cleanup := setup(edsLBCh) 609 defer cleanup() 610 611 builder := balancer.Get(edsName) 612 edsB := builder.Build(newNoopTestClientConn(), balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}) 613 if edsB == nil { 614 t.Fatalf("builder.Build(%s) failed and returned nil", edsName) 615 } 616 defer edsB.Close() 617 618 // Update with an non-empty edsServiceName should trigger an EDS watch for 619 // the same. 620 if err := edsB.UpdateClientConnState(balancer.ClientConnState{ 621 BalancerConfig: &EDSConfig{EDSServiceName: "foobar-1"}, 622 }); err != nil { 623 t.Fatal(err) 624 } 625 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) 626 defer cancel() 627 if err := verifyExpectedRequests(ctx, xdsC, "foobar-1"); err != nil { 628 t.Fatal(err) 629 } 630 631 // Also test the case where the edsServerName changes from one non-empty 632 // name to another, and make sure a new watch is registered. The previously 633 // registered watch will be cancelled, which will result in an EDS request 634 // with no resource names being sent to the server. 635 if err := edsB.UpdateClientConnState(balancer.ClientConnState{ 636 BalancerConfig: &EDSConfig{EDSServiceName: "foobar-2"}, 637 }); err != nil { 638 t.Fatal(err) 639 } 640 if err := verifyExpectedRequests(ctx, xdsC, "", "foobar-2"); err != nil { 641 t.Fatal(err) 642 } 643} 644 645// TestCounterUpdate verifies that the counter update is triggered with the 646// service name from an update's config. 647func (s) TestCounterUpdate(t *testing.T) { 648 edsLBCh := testutils.NewChannel() 649 _, cleanup := setup(edsLBCh) 650 defer cleanup() 651 652 builder := balancer.Get(edsName) 653 edsB := builder.Build(newNoopTestClientConn(), balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}) 654 if edsB == nil { 655 t.Fatalf("builder.Build(%s) failed and returned nil", edsName) 656 } 657 defer edsB.Close() 658 659 var testCountMax uint32 = 100 660 // Update should trigger counter update with provided service name. 661 if err := edsB.UpdateClientConnState(balancer.ClientConnState{ 662 BalancerConfig: &EDSConfig{ 663 EDSServiceName: "foobar-1", 664 MaxConcurrentRequests: &testCountMax, 665 }, 666 }); err != nil { 667 t.Fatal(err) 668 } 669 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) 670 defer cancel() 671 edsI := edsB.(*edsBalancer).edsImpl.(*fakeEDSBalancer) 672 if err := edsI.waitForCounterUpdate(ctx, "foobar-1"); err != nil { 673 t.Fatal(err) 674 } 675 if err := edsI.waitForCountMaxUpdate(ctx, &testCountMax); err != nil { 676 t.Fatal(err) 677 } 678} 679 680// TestClusterNameUpdateInAddressAttributes verifies that cluster name update in 681// edsImpl is triggered with the update from a new service config. 682func (s) TestClusterNameUpdateInAddressAttributes(t *testing.T) { 683 edsLBCh := testutils.NewChannel() 684 xdsC, cleanup := setup(edsLBCh) 685 defer cleanup() 686 687 builder := balancer.Get(edsName) 688 edsB := builder.Build(newNoopTestClientConn(), balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}) 689 if edsB == nil { 690 t.Fatalf("builder.Build(%s) failed and returned nil", edsName) 691 } 692 defer edsB.Close() 693 694 // Update should trigger counter update with provided service name. 695 if err := edsB.UpdateClientConnState(balancer.ClientConnState{ 696 BalancerConfig: &EDSConfig{ 697 EDSServiceName: "foobar-1", 698 }, 699 }); err != nil { 700 t.Fatal(err) 701 } 702 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) 703 defer cancel() 704 gotCluster, err := xdsC.WaitForWatchEDS(ctx) 705 if err != nil || gotCluster != "foobar-1" { 706 t.Fatalf("unexpected EDS watch: %v, %v", gotCluster, err) 707 } 708 edsI := edsB.(*edsBalancer).edsImpl.(*fakeEDSBalancer) 709 if err := edsI.waitForClusterNameUpdate(ctx, "foobar-1"); err != nil { 710 t.Fatal(err) 711 } 712 713 // Update should trigger counter update with provided service name. 714 if err := edsB.UpdateClientConnState(balancer.ClientConnState{ 715 BalancerConfig: &EDSConfig{ 716 EDSServiceName: "foobar-2", 717 }, 718 }); err != nil { 719 t.Fatal(err) 720 } 721 if err := xdsC.WaitForCancelEDSWatch(ctx); err != nil { 722 t.Fatalf("failed to wait for EDS cancel: %v", err) 723 } 724 gotCluster2, err := xdsC.WaitForWatchEDS(ctx) 725 if err != nil || gotCluster2 != "foobar-2" { 726 t.Fatalf("unexpected EDS watch: %v, %v", gotCluster2, err) 727 } 728 if err := edsI.waitForClusterNameUpdate(ctx, "foobar-2"); err != nil { 729 t.Fatal(err) 730 } 731} 732 733func (s) TestBalancerConfigParsing(t *testing.T) { 734 const testEDSName = "eds.service" 735 var testLRSName = "lrs.server" 736 b := bytes.NewBuffer(nil) 737 if err := (&jsonpb.Marshaler{}).Marshal(b, &scpb.XdsConfig{ 738 ChildPolicy: []*scpb.LoadBalancingConfig{ 739 {Policy: &scpb.LoadBalancingConfig_Xds{}}, 740 {Policy: &scpb.LoadBalancingConfig_RoundRobin{ 741 RoundRobin: &scpb.RoundRobinConfig{}, 742 }}, 743 }, 744 FallbackPolicy: []*scpb.LoadBalancingConfig{ 745 {Policy: &scpb.LoadBalancingConfig_Xds{}}, 746 {Policy: &scpb.LoadBalancingConfig_PickFirst{ 747 PickFirst: &scpb.PickFirstConfig{}, 748 }}, 749 }, 750 EdsServiceName: testEDSName, 751 LrsLoadReportingServerName: &wrapperspb.StringValue{Value: testLRSName}, 752 }); err != nil { 753 t.Fatalf("%v", err) 754 } 755 756 var testMaxConcurrentRequests uint32 = 123 757 tests := []struct { 758 name string 759 js json.RawMessage 760 want serviceconfig.LoadBalancingConfig 761 wantErr bool 762 }{ 763 { 764 name: "bad json", 765 js: json.RawMessage(`i am not JSON`), 766 wantErr: true, 767 }, 768 { 769 name: "empty", 770 js: json.RawMessage(`{}`), 771 want: &EDSConfig{}, 772 }, 773 { 774 name: "jsonpb-generated", 775 js: b.Bytes(), 776 want: &EDSConfig{ 777 ChildPolicy: &loadBalancingConfig{ 778 Name: "round_robin", 779 Config: json.RawMessage("{}"), 780 }, 781 FallBackPolicy: &loadBalancingConfig{ 782 Name: "pick_first", 783 Config: json.RawMessage("{}"), 784 }, 785 EDSServiceName: testEDSName, 786 LrsLoadReportingServerName: &testLRSName, 787 }, 788 }, 789 { 790 // json with random balancers, and the first is not registered. 791 name: "manually-generated", 792 js: json.RawMessage(` 793{ 794 "childPolicy": [ 795 {"fake_balancer_C": {}}, 796 {"fake_balancer_A": {}}, 797 {"fake_balancer_B": {}} 798 ], 799 "fallbackPolicy": [ 800 {"fake_balancer_C": {}}, 801 {"fake_balancer_B": {}}, 802 {"fake_balancer_A": {}} 803 ], 804 "edsServiceName": "eds.service", 805 "maxConcurrentRequests": 123, 806 "lrsLoadReportingServerName": "lrs.server" 807}`), 808 want: &EDSConfig{ 809 ChildPolicy: &loadBalancingConfig{ 810 Name: "fake_balancer_A", 811 Config: json.RawMessage("{}"), 812 }, 813 FallBackPolicy: &loadBalancingConfig{ 814 Name: "fake_balancer_B", 815 Config: json.RawMessage("{}"), 816 }, 817 EDSServiceName: testEDSName, 818 MaxConcurrentRequests: &testMaxConcurrentRequests, 819 LrsLoadReportingServerName: &testLRSName, 820 }, 821 }, 822 { 823 // json with no lrs server name, LoadReportingServerName should 824 // be nil (not an empty string). 825 name: "no-lrs-server-name", 826 js: json.RawMessage(` 827{ 828 "edsServiceName": "eds.service" 829}`), 830 want: &EDSConfig{ 831 EDSServiceName: testEDSName, 832 LrsLoadReportingServerName: nil, 833 }, 834 }, 835 { 836 name: "good child policy", 837 js: json.RawMessage(`{"childPolicy":[{"pick_first":{}}]}`), 838 want: &EDSConfig{ 839 ChildPolicy: &loadBalancingConfig{ 840 Name: "pick_first", 841 Config: json.RawMessage(`{}`), 842 }, 843 }, 844 }, 845 { 846 name: "multiple good child policies", 847 js: json.RawMessage(`{"childPolicy":[{"round_robin":{}},{"pick_first":{}}]}`), 848 want: &EDSConfig{ 849 ChildPolicy: &loadBalancingConfig{ 850 Name: "round_robin", 851 Config: json.RawMessage(`{}`), 852 }, 853 }, 854 }, 855 } 856 for _, tt := range tests { 857 t.Run(tt.name, func(t *testing.T) { 858 b := &edsBalancerBuilder{} 859 got, err := b.ParseConfig(tt.js) 860 if (err != nil) != tt.wantErr { 861 t.Fatalf("edsBalancerBuilder.ParseConfig() error = %v, wantErr %v", err, tt.wantErr) 862 } 863 if tt.wantErr { 864 return 865 } 866 if !cmp.Equal(got, tt.want) { 867 t.Errorf(cmp.Diff(got, tt.want)) 868 } 869 }) 870 } 871} 872 873func (s) TestEqualStringPointers(t *testing.T) { 874 var ( 875 ta1 = "test-a" 876 ta2 = "test-a" 877 tb = "test-b" 878 ) 879 tests := []struct { 880 name string 881 a *string 882 b *string 883 want bool 884 }{ 885 {"both-nil", nil, nil, true}, 886 {"a-non-nil", &ta1, nil, false}, 887 {"b-non-nil", nil, &tb, false}, 888 {"equal", &ta1, &ta2, true}, 889 {"different", &ta1, &tb, false}, 890 } 891 for _, tt := range tests { 892 t.Run(tt.name, func(t *testing.T) { 893 if got := equalStringPointers(tt.a, tt.b); got != tt.want { 894 t.Errorf("equalStringPointers() = %v, want %v", got, tt.want) 895 } 896 }) 897 } 898} 899