1/* 2 * 3 * Copyright 2019 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package edsbalancer 20 21import ( 22 "bytes" 23 "encoding/json" 24 "fmt" 25 "reflect" 26 "testing" 27 28 corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" 29 "github.com/golang/protobuf/jsonpb" 30 wrapperspb "github.com/golang/protobuf/ptypes/wrappers" 31 "github.com/google/go-cmp/cmp" 32 "google.golang.org/grpc" 33 "google.golang.org/grpc/balancer" 34 "google.golang.org/grpc/connectivity" 35 "google.golang.org/grpc/internal/grpctest" 36 "google.golang.org/grpc/internal/leakcheck" 37 scpb "google.golang.org/grpc/internal/proto/grpc_service_config" 38 "google.golang.org/grpc/resolver" 39 "google.golang.org/grpc/serviceconfig" 40 "google.golang.org/grpc/xds/internal/balancer/lrs" 41 xdsclient "google.golang.org/grpc/xds/internal/client" 42 "google.golang.org/grpc/xds/internal/client/bootstrap" 43 "google.golang.org/grpc/xds/internal/testutils" 44 "google.golang.org/grpc/xds/internal/testutils/fakeclient" 45) 46 47func init() { 48 balancer.Register(&edsBalancerBuilder{}) 49 50 bootstrapConfigNew = func() (*bootstrap.Config, error) { 51 return &bootstrap.Config{ 52 BalancerName: testBalancerNameFooBar, 53 Creds: grpc.WithInsecure(), 54 NodeProto: &corepb.Node{}, 55 }, nil 56 } 57} 58 59type s struct{} 60 61func (s) Teardown(t *testing.T) { 62 leakcheck.Check(t) 63} 64 65func Test(t *testing.T) { 66 grpctest.RunSubTests(t, s{}) 67} 68 69const testBalancerNameFooBar = "foo.bar" 70 71func newNoopTestClientConn() *noopTestClientConn { 72 return &noopTestClientConn{} 73} 74 75// noopTestClientConn is used in EDS balancer config update tests that only 76// cover the config update handling, but not SubConn/load-balancing. 77type noopTestClientConn struct { 78 balancer.ClientConn 79} 80 81func (t *noopTestClientConn) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) { 82 return nil, nil 83} 84 85func (noopTestClientConn) Target() string { return testServiceName } 86 87type scStateChange struct { 88 sc balancer.SubConn 89 state connectivity.State 90} 91 92type fakeEDSBalancer struct { 93 cc balancer.ClientConn 94 childPolicy *testutils.Channel 95 subconnStateChange *testutils.Channel 96 loadStore lrs.Store 97} 98 99func (f *fakeEDSBalancer) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) { 100 f.subconnStateChange.Send(&scStateChange{sc: sc, state: state}) 101} 102 103func (f *fakeEDSBalancer) HandleChildPolicy(name string, config json.RawMessage) { 104 f.childPolicy.Send(&loadBalancingConfig{Name: name, Config: config}) 105} 106 107func (f *fakeEDSBalancer) Close() {} 108func (f *fakeEDSBalancer) HandleEDSResponse(edsResp *xdsclient.EDSUpdate) {} 109 110func (f *fakeEDSBalancer) waitForChildPolicy(wantPolicy *loadBalancingConfig) error { 111 val, err := f.childPolicy.Receive() 112 if err != nil { 113 return fmt.Errorf("error waiting for childPolicy: %v", err) 114 } 115 gotPolicy := val.(*loadBalancingConfig) 116 if !reflect.DeepEqual(gotPolicy, wantPolicy) { 117 return fmt.Errorf("got childPolicy %v, want %v", gotPolicy, wantPolicy) 118 } 119 return nil 120} 121 122func (f *fakeEDSBalancer) waitForSubConnStateChange(wantState *scStateChange) error { 123 val, err := f.subconnStateChange.Receive() 124 if err != nil { 125 return fmt.Errorf("error waiting for subconnStateChange: %v", err) 126 } 127 gotState := val.(*scStateChange) 128 if !reflect.DeepEqual(gotState, wantState) { 129 return fmt.Errorf("got subconnStateChange %v, want %v", gotState, wantState) 130 } 131 return nil 132} 133 134func newFakeEDSBalancer(cc balancer.ClientConn, loadStore lrs.Store) edsBalancerImplInterface { 135 return &fakeEDSBalancer{ 136 cc: cc, 137 childPolicy: testutils.NewChannelWithSize(10), 138 subconnStateChange: testutils.NewChannelWithSize(10), 139 loadStore: loadStore, 140 } 141} 142 143type fakeSubConn struct{} 144 145func (*fakeSubConn) UpdateAddresses([]resolver.Address) { panic("implement me") } 146func (*fakeSubConn) Connect() { panic("implement me") } 147 148// waitForNewXDSClientWithEDSWatch makes sure that a new xdsClient is created 149// with the provided name. It also make sure that the newly created client 150// registers an eds watcher. 151func waitForNewXDSClientWithEDSWatch(t *testing.T, ch *testutils.Channel, wantName string) *fakeclient.Client { 152 t.Helper() 153 154 val, err := ch.Receive() 155 if err != nil { 156 t.Fatalf("error when waiting for a new xds client: %v", err) 157 return nil 158 } 159 xdsC := val.(*fakeclient.Client) 160 if xdsC.Name() != wantName { 161 t.Fatalf("xdsClient created to balancer: %v, want %v", xdsC.Name(), wantName) 162 return nil 163 } 164 _, err = xdsC.WaitForWatchEDS() 165 if err != nil { 166 t.Fatalf("xdsClient.WatchEDS failed with error: %v", err) 167 return nil 168 } 169 return xdsC 170} 171 172// waitForNewEDSLB makes sure that a new edsLB is created by the top-level 173// edsBalancer. 174func waitForNewEDSLB(t *testing.T, ch *testutils.Channel) *fakeEDSBalancer { 175 t.Helper() 176 177 val, err := ch.Receive() 178 if err != nil { 179 t.Fatalf("error when waiting for a new edsLB: %v", err) 180 return nil 181 } 182 return val.(*fakeEDSBalancer) 183} 184 185// setup overrides the functions which are used to create the xdsClient and the 186// edsLB, creates fake version of them and makes them available on the provided 187// channels. The returned cancel function should be called by the test for 188// cleanup. 189func setup(edsLBCh *testutils.Channel, xdsClientCh *testutils.Channel) func() { 190 origNewEDSBalancer := newEDSBalancer 191 newEDSBalancer = func(cc balancer.ClientConn, loadStore lrs.Store) edsBalancerImplInterface { 192 edsLB := newFakeEDSBalancer(cc, loadStore) 193 defer func() { edsLBCh.Send(edsLB) }() 194 return edsLB 195 } 196 197 origXdsClientNew := xdsclientNew 198 xdsclientNew = func(opts xdsclient.Options) (xdsClientInterface, error) { 199 xdsC := fakeclient.NewClientWithName(opts.Config.BalancerName) 200 defer func() { xdsClientCh.Send(xdsC) }() 201 return xdsC, nil 202 } 203 return func() { 204 newEDSBalancer = origNewEDSBalancer 205 xdsclientNew = origXdsClientNew 206 } 207} 208 209// TestXDSConfigBalancerNameUpdate verifies different scenarios where the 210// balancer name in the lbConfig is updated. 211// 212// The test does the following: 213// * Builds a new xds balancer. 214// * Repeatedly pushes new ClientConnState which specifies different 215// balancerName in the lbConfig. We expect xdsClient objects to created 216// whenever the balancerName changes. 217func (s) TestXDSConfigBalancerNameUpdate(t *testing.T) { 218 oldBootstrapConfigNew := bootstrapConfigNew 219 bootstrapConfigNew = func() (*bootstrap.Config, error) { 220 // Return an error from bootstrap, so the eds balancer will use 221 // BalancerName from the config. 222 // 223 // TODO: remove this when deleting BalancerName from config. 224 return nil, fmt.Errorf("no bootstrap available") 225 } 226 defer func() { bootstrapConfigNew = oldBootstrapConfigNew }() 227 edsLBCh := testutils.NewChannel() 228 xdsClientCh := testutils.NewChannel() 229 cancel := setup(edsLBCh, xdsClientCh) 230 defer cancel() 231 232 builder := balancer.Get(edsName) 233 cc := newNoopTestClientConn() 234 edsB, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testEDSClusterName}}).(*edsBalancer) 235 if !ok { 236 t.Fatalf("builder.Build(%s) returned type {%T}, want {*edsBalancer}", edsName, edsB) 237 } 238 defer edsB.Close() 239 240 addrs := []resolver.Address{{Addr: "1.1.1.1:10001"}, {Addr: "2.2.2.2:10002"}, {Addr: "3.3.3.3:10003"}} 241 for i := 0; i < 2; i++ { 242 balancerName := fmt.Sprintf("balancer-%d", i) 243 edsB.UpdateClientConnState(balancer.ClientConnState{ 244 ResolverState: resolver.State{Addresses: addrs}, 245 BalancerConfig: &EDSConfig{ 246 BalancerName: balancerName, 247 EDSServiceName: testEDSClusterName, 248 }, 249 }) 250 251 xdsC := waitForNewXDSClientWithEDSWatch(t, xdsClientCh, balancerName) 252 xdsC.InvokeWatchEDSCallback(&xdsclient.EDSUpdate{}, nil) 253 } 254} 255 256const ( 257 fakeBalancerA = "fake_balancer_A" 258 fakeBalancerB = "fake_balancer_B" 259) 260 261// Install two fake balancers for service config update tests. 262// 263// ParseConfig only accepts the json if the balancer specified is registered. 264 265func init() { 266 balancer.Register(&fakeBalancerBuilder{name: fakeBalancerA}) 267 balancer.Register(&fakeBalancerBuilder{name: fakeBalancerB}) 268} 269 270type fakeBalancerBuilder struct { 271 name string 272} 273 274func (b *fakeBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer { 275 return &fakeBalancer{cc: cc} 276} 277 278func (b *fakeBalancerBuilder) Name() string { 279 return b.name 280} 281 282type fakeBalancer struct { 283 cc balancer.ClientConn 284} 285 286func (b *fakeBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) { 287 panic("implement me") 288} 289 290func (b *fakeBalancer) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) { 291 panic("implement me") 292} 293 294func (b *fakeBalancer) Close() {} 295 296// TestXDSConnfigChildPolicyUpdate verifies scenarios where the childPolicy 297// section of the lbConfig is updated. 298// 299// The test does the following: 300// * Builds a new xds balancer. 301// * Pushes a new ClientConnState with a childPolicy set to fakeBalancerA. 302// Verifies that a new xdsClient is created. It then pushes a new edsUpdate 303// through the fakexds client. Verifies that a new edsLB is created and it 304// receives the expected childPolicy. 305// * Pushes a new ClientConnState with a childPolicy set to fakeBalancerB. 306// This time around, we expect no new xdsClient or edsLB to be created. 307// Instead, we expect the existing edsLB to receive the new child policy. 308func (s) TestXDSConnfigChildPolicyUpdate(t *testing.T) { 309 edsLBCh := testutils.NewChannel() 310 xdsClientCh := testutils.NewChannel() 311 cancel := setup(edsLBCh, xdsClientCh) 312 defer cancel() 313 314 builder := balancer.Get(edsName) 315 cc := newNoopTestClientConn() 316 edsB, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}).(*edsBalancer) 317 if !ok { 318 t.Fatalf("builder.Build(%s) returned type {%T}, want {*edsBalancer}", edsName, edsB) 319 } 320 defer edsB.Close() 321 322 edsB.UpdateClientConnState(balancer.ClientConnState{ 323 BalancerConfig: &EDSConfig{ 324 BalancerName: testBalancerNameFooBar, 325 ChildPolicy: &loadBalancingConfig{ 326 Name: fakeBalancerA, 327 Config: json.RawMessage("{}"), 328 }, 329 EDSServiceName: testEDSClusterName, 330 }, 331 }) 332 xdsC := waitForNewXDSClientWithEDSWatch(t, xdsClientCh, testBalancerNameFooBar) 333 xdsC.InvokeWatchEDSCallback(&xdsclient.EDSUpdate{}, nil) 334 edsLB := waitForNewEDSLB(t, edsLBCh) 335 edsLB.waitForChildPolicy(&loadBalancingConfig{ 336 Name: string(fakeBalancerA), 337 Config: json.RawMessage(`{}`), 338 }) 339 340 edsB.UpdateClientConnState(balancer.ClientConnState{ 341 BalancerConfig: &EDSConfig{ 342 BalancerName: testBalancerNameFooBar, 343 ChildPolicy: &loadBalancingConfig{ 344 Name: fakeBalancerB, 345 Config: json.RawMessage("{}"), 346 }, 347 EDSServiceName: testEDSClusterName, 348 }, 349 }) 350 edsLB.waitForChildPolicy(&loadBalancingConfig{ 351 Name: string(fakeBalancerA), 352 Config: json.RawMessage(`{}`), 353 }) 354} 355 356// TestXDSSubConnStateChange verifies if the top-level edsBalancer passes on 357// the subConnStateChange to appropriate child balancers. 358func (s) TestXDSSubConnStateChange(t *testing.T) { 359 edsLBCh := testutils.NewChannel() 360 xdsClientCh := testutils.NewChannel() 361 cancel := setup(edsLBCh, xdsClientCh) 362 defer cancel() 363 364 builder := balancer.Get(edsName) 365 cc := newNoopTestClientConn() 366 edsB, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testEDSClusterName}}).(*edsBalancer) 367 if !ok { 368 t.Fatalf("builder.Build(%s) returned type {%T}, want {*edsBalancer}", edsName, edsB) 369 } 370 defer edsB.Close() 371 372 addrs := []resolver.Address{{Addr: "1.1.1.1:10001"}, {Addr: "2.2.2.2:10002"}, {Addr: "3.3.3.3:10003"}} 373 edsB.UpdateClientConnState(balancer.ClientConnState{ 374 ResolverState: resolver.State{Addresses: addrs}, 375 BalancerConfig: &EDSConfig{ 376 BalancerName: testBalancerNameFooBar, 377 EDSServiceName: testEDSClusterName, 378 }, 379 }) 380 381 xdsC := waitForNewXDSClientWithEDSWatch(t, xdsClientCh, testBalancerNameFooBar) 382 xdsC.InvokeWatchEDSCallback(&xdsclient.EDSUpdate{}, nil) 383 edsLB := waitForNewEDSLB(t, edsLBCh) 384 385 fsc := &fakeSubConn{} 386 state := connectivity.Ready 387 edsB.UpdateSubConnState(fsc, balancer.SubConnState{ConnectivityState: state}) 388 edsLB.waitForSubConnStateChange(&scStateChange{sc: fsc, state: state}) 389} 390 391func TestXDSBalancerConfigParsing(t *testing.T) { 392 const testEDSName = "eds.service" 393 var testLRSName = "lrs.server" 394 b := bytes.NewBuffer(nil) 395 if err := (&jsonpb.Marshaler{}).Marshal(b, &scpb.XdsConfig{ 396 ChildPolicy: []*scpb.LoadBalancingConfig{ 397 {Policy: &scpb.LoadBalancingConfig_Xds{}}, 398 {Policy: &scpb.LoadBalancingConfig_RoundRobin{ 399 RoundRobin: &scpb.RoundRobinConfig{}, 400 }}, 401 }, 402 FallbackPolicy: []*scpb.LoadBalancingConfig{ 403 {Policy: &scpb.LoadBalancingConfig_Xds{}}, 404 {Policy: &scpb.LoadBalancingConfig_PickFirst{ 405 PickFirst: &scpb.PickFirstConfig{}, 406 }}, 407 }, 408 EdsServiceName: testEDSName, 409 LrsLoadReportingServerName: &wrapperspb.StringValue{Value: testLRSName}, 410 }); err != nil { 411 t.Fatalf("%v", err) 412 } 413 414 tests := []struct { 415 name string 416 js json.RawMessage 417 want serviceconfig.LoadBalancingConfig 418 wantErr bool 419 }{ 420 { 421 name: "jsonpb-generated", 422 js: b.Bytes(), 423 want: &EDSConfig{ 424 ChildPolicy: &loadBalancingConfig{ 425 Name: "round_robin", 426 Config: json.RawMessage("{}"), 427 }, 428 FallBackPolicy: &loadBalancingConfig{ 429 Name: "pick_first", 430 Config: json.RawMessage("{}"), 431 }, 432 EDSServiceName: testEDSName, 433 LrsLoadReportingServerName: &testLRSName, 434 }, 435 wantErr: false, 436 }, 437 { 438 // json with random balancers, and the first is not registered. 439 name: "manually-generated", 440 js: json.RawMessage(` 441{ 442 "balancerName": "fake.foo.bar", 443 "childPolicy": [ 444 {"fake_balancer_C": {}}, 445 {"fake_balancer_A": {}}, 446 {"fake_balancer_B": {}} 447 ], 448 "fallbackPolicy": [ 449 {"fake_balancer_C": {}}, 450 {"fake_balancer_B": {}}, 451 {"fake_balancer_A": {}} 452 ], 453 "edsServiceName": "eds.service", 454 "lrsLoadReportingServerName": "lrs.server" 455}`), 456 want: &EDSConfig{ 457 BalancerName: "fake.foo.bar", 458 ChildPolicy: &loadBalancingConfig{ 459 Name: "fake_balancer_A", 460 Config: json.RawMessage("{}"), 461 }, 462 FallBackPolicy: &loadBalancingConfig{ 463 Name: "fake_balancer_B", 464 Config: json.RawMessage("{}"), 465 }, 466 EDSServiceName: testEDSName, 467 LrsLoadReportingServerName: &testLRSName, 468 }, 469 wantErr: false, 470 }, 471 { 472 // json with no lrs server name, LrsLoadReportingServerName should 473 // be nil (not an empty string). 474 name: "no-lrs-server-name", 475 js: json.RawMessage(` 476{ 477 "balancerName": "fake.foo.bar", 478 "edsServiceName": "eds.service" 479}`), 480 want: &EDSConfig{ 481 BalancerName: "fake.foo.bar", 482 EDSServiceName: testEDSName, 483 LrsLoadReportingServerName: nil, 484 }, 485 wantErr: false, 486 }, 487 } 488 for _, tt := range tests { 489 t.Run(tt.name, func(t *testing.T) { 490 b := &edsBalancerBuilder{} 491 got, err := b.ParseConfig(tt.js) 492 if (err != nil) != tt.wantErr { 493 t.Errorf("edsBalancerBuilder.ParseConfig() error = %v, wantErr %v", err, tt.wantErr) 494 return 495 } 496 if !cmp.Equal(got, tt.want) { 497 t.Errorf(cmp.Diff(got, tt.want)) 498 } 499 }) 500 } 501} 502func TestLoadbalancingConfigParsing(t *testing.T) { 503 tests := []struct { 504 name string 505 s string 506 want *EDSConfig 507 }{ 508 { 509 name: "empty", 510 s: "{}", 511 want: &EDSConfig{}, 512 }, 513 { 514 name: "success1", 515 s: `{"childPolicy":[{"pick_first":{}}]}`, 516 want: &EDSConfig{ 517 ChildPolicy: &loadBalancingConfig{ 518 Name: "pick_first", 519 Config: json.RawMessage(`{}`), 520 }, 521 }, 522 }, 523 { 524 name: "success2", 525 s: `{"childPolicy":[{"round_robin":{}},{"pick_first":{}}]}`, 526 want: &EDSConfig{ 527 ChildPolicy: &loadBalancingConfig{ 528 Name: "round_robin", 529 Config: json.RawMessage(`{}`), 530 }, 531 }, 532 }, 533 } 534 for _, tt := range tests { 535 t.Run(tt.name, func(t *testing.T) { 536 var cfg EDSConfig 537 if err := json.Unmarshal([]byte(tt.s), &cfg); err != nil || !reflect.DeepEqual(&cfg, tt.want) { 538 t.Errorf("test name: %s, parseFullServiceConfig() = %+v, err: %v, want %+v, <nil>", tt.name, cfg, err, tt.want) 539 } 540 }) 541 } 542} 543 544func TestEqualStringPointers(t *testing.T) { 545 var ( 546 ta1 = "test-a" 547 ta2 = "test-a" 548 tb = "test-b" 549 ) 550 tests := []struct { 551 name string 552 a *string 553 b *string 554 want bool 555 }{ 556 {"both-nil", nil, nil, true}, 557 {"a-non-nil", &ta1, nil, false}, 558 {"b-non-nil", nil, &tb, false}, 559 {"equal", &ta1, &ta2, true}, 560 {"different", &ta1, &tb, false}, 561 } 562 for _, tt := range tests { 563 t.Run(tt.name, func(t *testing.T) { 564 if got := equalStringPointers(tt.a, tt.b); got != tt.want { 565 t.Errorf("equalStringPointers() = %v, want %v", got, tt.want) 566 } 567 }) 568 } 569} 570