1/* 2 * 3 * Copyright 2019 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package edsbalancer 20 21import ( 22 "bytes" 23 "encoding/json" 24 "fmt" 25 "testing" 26 27 corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" 28 "github.com/golang/protobuf/jsonpb" 29 wrapperspb "github.com/golang/protobuf/ptypes/wrappers" 30 "github.com/google/go-cmp/cmp" 31 "google.golang.org/grpc" 32 "google.golang.org/grpc/balancer" 33 "google.golang.org/grpc/connectivity" 34 "google.golang.org/grpc/internal/grpclog" 35 "google.golang.org/grpc/internal/grpctest" 36 scpb "google.golang.org/grpc/internal/proto/grpc_service_config" 37 "google.golang.org/grpc/resolver" 38 "google.golang.org/grpc/serviceconfig" 39 "google.golang.org/grpc/xds/internal/balancer/lrs" 40 xdsclient "google.golang.org/grpc/xds/internal/client" 41 "google.golang.org/grpc/xds/internal/client/bootstrap" 42 "google.golang.org/grpc/xds/internal/testutils" 43 "google.golang.org/grpc/xds/internal/testutils/fakeclient" 44) 45 46func init() { 47 balancer.Register(&edsBalancerBuilder{}) 48 49 bootstrapConfigNew = func() (*bootstrap.Config, error) { 50 return &bootstrap.Config{ 51 BalancerName: testBalancerNameFooBar, 52 Creds: grpc.WithInsecure(), 53 NodeProto: &corepb.Node{}, 54 }, nil 55 } 56} 57 58type s struct { 59 grpctest.Tester 60} 61 62func Test(t *testing.T) { 63 grpctest.RunSubTests(t, s{}) 64} 65 66const testBalancerNameFooBar = "foo.bar" 67 68func newNoopTestClientConn() *noopTestClientConn { 69 return &noopTestClientConn{} 70} 71 72// noopTestClientConn is used in EDS balancer config update tests that only 73// cover the config update handling, but not SubConn/load-balancing. 74type noopTestClientConn struct { 75 balancer.ClientConn 76} 77 78func (t *noopTestClientConn) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) { 79 return nil, nil 80} 81 82func (noopTestClientConn) Target() string { return testServiceName } 83 84type scStateChange struct { 85 sc balancer.SubConn 86 state connectivity.State 87} 88 89type fakeEDSBalancer struct { 90 cc balancer.ClientConn 91 childPolicy *testutils.Channel 92 subconnStateChange *testutils.Channel 93 loadStore lrs.Store 94} 95 96func (f *fakeEDSBalancer) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) { 97 f.subconnStateChange.Send(&scStateChange{sc: sc, state: state}) 98} 99 100func (f *fakeEDSBalancer) HandleChildPolicy(name string, config json.RawMessage) { 101 f.childPolicy.Send(&loadBalancingConfig{Name: name, Config: config}) 102} 103 104func (f *fakeEDSBalancer) Close() {} 105func (f *fakeEDSBalancer) HandleEDSResponse(edsResp *xdsclient.EDSUpdate) {} 106func (f *fakeEDSBalancer) updateState(priority priorityType, s balancer.State) {} 107 108func (f *fakeEDSBalancer) waitForChildPolicy(wantPolicy *loadBalancingConfig) error { 109 val, err := f.childPolicy.Receive() 110 if err != nil { 111 return fmt.Errorf("error waiting for childPolicy: %v", err) 112 } 113 gotPolicy := val.(*loadBalancingConfig) 114 if !cmp.Equal(gotPolicy, wantPolicy) { 115 return fmt.Errorf("got childPolicy %v, want %v", gotPolicy, wantPolicy) 116 } 117 return nil 118} 119 120func (f *fakeEDSBalancer) waitForSubConnStateChange(wantState *scStateChange) error { 121 val, err := f.subconnStateChange.Receive() 122 if err != nil { 123 return fmt.Errorf("error waiting for subconnStateChange: %v", err) 124 } 125 gotState := val.(*scStateChange) 126 if !cmp.Equal(gotState, wantState, cmp.AllowUnexported(scStateChange{})) { 127 return fmt.Errorf("got subconnStateChange %v, want %v", gotState, wantState) 128 } 129 return nil 130} 131 132func newFakeEDSBalancer(cc balancer.ClientConn, loadStore lrs.Store) edsBalancerImplInterface { 133 return &fakeEDSBalancer{ 134 cc: cc, 135 childPolicy: testutils.NewChannelWithSize(10), 136 subconnStateChange: testutils.NewChannelWithSize(10), 137 loadStore: loadStore, 138 } 139} 140 141type fakeSubConn struct{} 142 143func (*fakeSubConn) UpdateAddresses([]resolver.Address) { panic("implement me") } 144func (*fakeSubConn) Connect() { panic("implement me") } 145 146// waitForNewXDSClientWithEDSWatch makes sure that a new xdsClient is created 147// with the provided name. It also make sure that the newly created client 148// registers an eds watcher. 149func waitForNewXDSClientWithEDSWatch(t *testing.T, ch *testutils.Channel, wantName string) *fakeclient.Client { 150 t.Helper() 151 152 val, err := ch.Receive() 153 if err != nil { 154 t.Fatalf("error when waiting for a new xds client: %v", err) 155 return nil 156 } 157 xdsC := val.(*fakeclient.Client) 158 if xdsC.Name() != wantName { 159 t.Fatalf("xdsClient created to balancer: %v, want %v", xdsC.Name(), wantName) 160 return nil 161 } 162 _, err = xdsC.WaitForWatchEDS() 163 if err != nil { 164 t.Fatalf("xdsClient.WatchEDS failed with error: %v", err) 165 return nil 166 } 167 return xdsC 168} 169 170// waitForNewEDSLB makes sure that a new edsLB is created by the top-level 171// edsBalancer. 172func waitForNewEDSLB(t *testing.T, ch *testutils.Channel) *fakeEDSBalancer { 173 t.Helper() 174 175 val, err := ch.Receive() 176 if err != nil { 177 t.Fatalf("error when waiting for a new edsLB: %v", err) 178 return nil 179 } 180 return val.(*fakeEDSBalancer) 181} 182 183// setup overrides the functions which are used to create the xdsClient and the 184// edsLB, creates fake version of them and makes them available on the provided 185// channels. The returned cancel function should be called by the test for 186// cleanup. 187func setup(edsLBCh *testutils.Channel, xdsClientCh *testutils.Channel) func() { 188 origNewEDSBalancer := newEDSBalancer 189 newEDSBalancer = func(cc balancer.ClientConn, enqueue func(priorityType, balancer.State), loadStore lrs.Store, logger *grpclog.PrefixLogger) edsBalancerImplInterface { 190 edsLB := newFakeEDSBalancer(cc, loadStore) 191 defer func() { edsLBCh.Send(edsLB) }() 192 return edsLB 193 } 194 195 origXdsClientNew := xdsclientNew 196 xdsclientNew = func(opts xdsclient.Options) (xdsClientInterface, error) { 197 xdsC := fakeclient.NewClientWithName(opts.Config.BalancerName) 198 defer func() { xdsClientCh.Send(xdsC) }() 199 return xdsC, nil 200 } 201 return func() { 202 newEDSBalancer = origNewEDSBalancer 203 xdsclientNew = origXdsClientNew 204 } 205} 206 207// TestXDSConfigBalancerNameUpdate verifies different scenarios where the 208// balancer name in the lbConfig is updated. 209// 210// The test does the following: 211// * Builds a new xds balancer. 212// * Repeatedly pushes new ClientConnState which specifies different 213// balancerName in the lbConfig. We expect xdsClient objects to created 214// whenever the balancerName changes. 215func (s) TestXDSConfigBalancerNameUpdate(t *testing.T) { 216 oldBootstrapConfigNew := bootstrapConfigNew 217 bootstrapConfigNew = func() (*bootstrap.Config, error) { 218 // Return an error from bootstrap, so the eds balancer will use 219 // BalancerName from the config. 220 // 221 // TODO: remove this when deleting BalancerName from config. 222 return nil, fmt.Errorf("no bootstrap available") 223 } 224 defer func() { bootstrapConfigNew = oldBootstrapConfigNew }() 225 edsLBCh := testutils.NewChannel() 226 xdsClientCh := testutils.NewChannel() 227 cancel := setup(edsLBCh, xdsClientCh) 228 defer cancel() 229 230 builder := balancer.Get(edsName) 231 cc := newNoopTestClientConn() 232 edsB, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testEDSClusterName}}).(*edsBalancer) 233 if !ok { 234 t.Fatalf("builder.Build(%s) returned type {%T}, want {*edsBalancer}", edsName, edsB) 235 } 236 defer edsB.Close() 237 238 addrs := []resolver.Address{{Addr: "1.1.1.1:10001"}, {Addr: "2.2.2.2:10002"}, {Addr: "3.3.3.3:10003"}} 239 for i := 0; i < 2; i++ { 240 balancerName := fmt.Sprintf("balancer-%d", i) 241 edsB.UpdateClientConnState(balancer.ClientConnState{ 242 ResolverState: resolver.State{Addresses: addrs}, 243 BalancerConfig: &EDSConfig{ 244 BalancerName: balancerName, 245 EDSServiceName: testEDSClusterName, 246 }, 247 }) 248 249 xdsC := waitForNewXDSClientWithEDSWatch(t, xdsClientCh, balancerName) 250 xdsC.InvokeWatchEDSCallback(&xdsclient.EDSUpdate{}, nil) 251 } 252} 253 254const ( 255 fakeBalancerA = "fake_balancer_A" 256 fakeBalancerB = "fake_balancer_B" 257) 258 259// Install two fake balancers for service config update tests. 260// 261// ParseConfig only accepts the json if the balancer specified is registered. 262 263func init() { 264 balancer.Register(&fakeBalancerBuilder{name: fakeBalancerA}) 265 balancer.Register(&fakeBalancerBuilder{name: fakeBalancerB}) 266} 267 268type fakeBalancerBuilder struct { 269 name string 270} 271 272func (b *fakeBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer { 273 return &fakeBalancer{cc: cc} 274} 275 276func (b *fakeBalancerBuilder) Name() string { 277 return b.name 278} 279 280type fakeBalancer struct { 281 cc balancer.ClientConn 282} 283 284func (b *fakeBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) { 285 panic("implement me") 286} 287 288func (b *fakeBalancer) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) { 289 panic("implement me") 290} 291 292func (b *fakeBalancer) Close() {} 293 294// TestXDSConnfigChildPolicyUpdate verifies scenarios where the childPolicy 295// section of the lbConfig is updated. 296// 297// The test does the following: 298// * Builds a new xds balancer. 299// * Pushes a new ClientConnState with a childPolicy set to fakeBalancerA. 300// Verifies that a new xdsClient is created. It then pushes a new edsUpdate 301// through the fakexds client. Verifies that a new edsLB is created and it 302// receives the expected childPolicy. 303// * Pushes a new ClientConnState with a childPolicy set to fakeBalancerB. 304// This time around, we expect no new xdsClient or edsLB to be created. 305// Instead, we expect the existing edsLB to receive the new child policy. 306func (s) TestXDSConnfigChildPolicyUpdate(t *testing.T) { 307 edsLBCh := testutils.NewChannel() 308 xdsClientCh := testutils.NewChannel() 309 cancel := setup(edsLBCh, xdsClientCh) 310 defer cancel() 311 312 builder := balancer.Get(edsName) 313 cc := newNoopTestClientConn() 314 edsB, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}).(*edsBalancer) 315 if !ok { 316 t.Fatalf("builder.Build(%s) returned type {%T}, want {*edsBalancer}", edsName, edsB) 317 } 318 defer edsB.Close() 319 320 edsB.UpdateClientConnState(balancer.ClientConnState{ 321 BalancerConfig: &EDSConfig{ 322 BalancerName: testBalancerNameFooBar, 323 ChildPolicy: &loadBalancingConfig{ 324 Name: fakeBalancerA, 325 Config: json.RawMessage("{}"), 326 }, 327 EDSServiceName: testEDSClusterName, 328 }, 329 }) 330 xdsC := waitForNewXDSClientWithEDSWatch(t, xdsClientCh, testBalancerNameFooBar) 331 xdsC.InvokeWatchEDSCallback(&xdsclient.EDSUpdate{}, nil) 332 edsLB := waitForNewEDSLB(t, edsLBCh) 333 edsLB.waitForChildPolicy(&loadBalancingConfig{ 334 Name: string(fakeBalancerA), 335 Config: json.RawMessage(`{}`), 336 }) 337 338 edsB.UpdateClientConnState(balancer.ClientConnState{ 339 BalancerConfig: &EDSConfig{ 340 BalancerName: testBalancerNameFooBar, 341 ChildPolicy: &loadBalancingConfig{ 342 Name: fakeBalancerB, 343 Config: json.RawMessage("{}"), 344 }, 345 EDSServiceName: testEDSClusterName, 346 }, 347 }) 348 edsLB.waitForChildPolicy(&loadBalancingConfig{ 349 Name: string(fakeBalancerA), 350 Config: json.RawMessage(`{}`), 351 }) 352} 353 354// TestXDSSubConnStateChange verifies if the top-level edsBalancer passes on 355// the subConnStateChange to appropriate child balancers. 356func (s) TestXDSSubConnStateChange(t *testing.T) { 357 edsLBCh := testutils.NewChannel() 358 xdsClientCh := testutils.NewChannel() 359 cancel := setup(edsLBCh, xdsClientCh) 360 defer cancel() 361 362 builder := balancer.Get(edsName) 363 cc := newNoopTestClientConn() 364 edsB, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testEDSClusterName}}).(*edsBalancer) 365 if !ok { 366 t.Fatalf("builder.Build(%s) returned type {%T}, want {*edsBalancer}", edsName, edsB) 367 } 368 defer edsB.Close() 369 370 addrs := []resolver.Address{{Addr: "1.1.1.1:10001"}, {Addr: "2.2.2.2:10002"}, {Addr: "3.3.3.3:10003"}} 371 edsB.UpdateClientConnState(balancer.ClientConnState{ 372 ResolverState: resolver.State{Addresses: addrs}, 373 BalancerConfig: &EDSConfig{ 374 BalancerName: testBalancerNameFooBar, 375 EDSServiceName: testEDSClusterName, 376 }, 377 }) 378 379 xdsC := waitForNewXDSClientWithEDSWatch(t, xdsClientCh, testBalancerNameFooBar) 380 xdsC.InvokeWatchEDSCallback(&xdsclient.EDSUpdate{}, nil) 381 edsLB := waitForNewEDSLB(t, edsLBCh) 382 383 fsc := &fakeSubConn{} 384 state := connectivity.Ready 385 edsB.UpdateSubConnState(fsc, balancer.SubConnState{ConnectivityState: state}) 386 edsLB.waitForSubConnStateChange(&scStateChange{sc: fsc, state: state}) 387} 388 389func (s) TestXDSBalancerConfigParsing(t *testing.T) { 390 const testEDSName = "eds.service" 391 var testLRSName = "lrs.server" 392 b := bytes.NewBuffer(nil) 393 if err := (&jsonpb.Marshaler{}).Marshal(b, &scpb.XdsConfig{ 394 ChildPolicy: []*scpb.LoadBalancingConfig{ 395 {Policy: &scpb.LoadBalancingConfig_Xds{}}, 396 {Policy: &scpb.LoadBalancingConfig_RoundRobin{ 397 RoundRobin: &scpb.RoundRobinConfig{}, 398 }}, 399 }, 400 FallbackPolicy: []*scpb.LoadBalancingConfig{ 401 {Policy: &scpb.LoadBalancingConfig_Xds{}}, 402 {Policy: &scpb.LoadBalancingConfig_PickFirst{ 403 PickFirst: &scpb.PickFirstConfig{}, 404 }}, 405 }, 406 EdsServiceName: testEDSName, 407 LrsLoadReportingServerName: &wrapperspb.StringValue{Value: testLRSName}, 408 }); err != nil { 409 t.Fatalf("%v", err) 410 } 411 412 tests := []struct { 413 name string 414 js json.RawMessage 415 want serviceconfig.LoadBalancingConfig 416 wantErr bool 417 }{ 418 { 419 name: "jsonpb-generated", 420 js: b.Bytes(), 421 want: &EDSConfig{ 422 ChildPolicy: &loadBalancingConfig{ 423 Name: "round_robin", 424 Config: json.RawMessage("{}"), 425 }, 426 FallBackPolicy: &loadBalancingConfig{ 427 Name: "pick_first", 428 Config: json.RawMessage("{}"), 429 }, 430 EDSServiceName: testEDSName, 431 LrsLoadReportingServerName: &testLRSName, 432 }, 433 wantErr: false, 434 }, 435 { 436 // json with random balancers, and the first is not registered. 437 name: "manually-generated", 438 js: json.RawMessage(` 439{ 440 "balancerName": "fake.foo.bar", 441 "childPolicy": [ 442 {"fake_balancer_C": {}}, 443 {"fake_balancer_A": {}}, 444 {"fake_balancer_B": {}} 445 ], 446 "fallbackPolicy": [ 447 {"fake_balancer_C": {}}, 448 {"fake_balancer_B": {}}, 449 {"fake_balancer_A": {}} 450 ], 451 "edsServiceName": "eds.service", 452 "lrsLoadReportingServerName": "lrs.server" 453}`), 454 want: &EDSConfig{ 455 BalancerName: "fake.foo.bar", 456 ChildPolicy: &loadBalancingConfig{ 457 Name: "fake_balancer_A", 458 Config: json.RawMessage("{}"), 459 }, 460 FallBackPolicy: &loadBalancingConfig{ 461 Name: "fake_balancer_B", 462 Config: json.RawMessage("{}"), 463 }, 464 EDSServiceName: testEDSName, 465 LrsLoadReportingServerName: &testLRSName, 466 }, 467 wantErr: false, 468 }, 469 { 470 // json with no lrs server name, LrsLoadReportingServerName should 471 // be nil (not an empty string). 472 name: "no-lrs-server-name", 473 js: json.RawMessage(` 474{ 475 "balancerName": "fake.foo.bar", 476 "edsServiceName": "eds.service" 477}`), 478 want: &EDSConfig{ 479 BalancerName: "fake.foo.bar", 480 EDSServiceName: testEDSName, 481 LrsLoadReportingServerName: nil, 482 }, 483 wantErr: false, 484 }, 485 } 486 for _, tt := range tests { 487 t.Run(tt.name, func(t *testing.T) { 488 b := &edsBalancerBuilder{} 489 got, err := b.ParseConfig(tt.js) 490 if (err != nil) != tt.wantErr { 491 t.Errorf("edsBalancerBuilder.ParseConfig() error = %v, wantErr %v", err, tt.wantErr) 492 return 493 } 494 if !cmp.Equal(got, tt.want) { 495 t.Errorf(cmp.Diff(got, tt.want)) 496 } 497 }) 498 } 499} 500func (s) TestLoadbalancingConfigParsing(t *testing.T) { 501 tests := []struct { 502 name string 503 s string 504 want *EDSConfig 505 }{ 506 { 507 name: "empty", 508 s: "{}", 509 want: &EDSConfig{}, 510 }, 511 { 512 name: "success1", 513 s: `{"childPolicy":[{"pick_first":{}}]}`, 514 want: &EDSConfig{ 515 ChildPolicy: &loadBalancingConfig{ 516 Name: "pick_first", 517 Config: json.RawMessage(`{}`), 518 }, 519 }, 520 }, 521 { 522 name: "success2", 523 s: `{"childPolicy":[{"round_robin":{}},{"pick_first":{}}]}`, 524 want: &EDSConfig{ 525 ChildPolicy: &loadBalancingConfig{ 526 Name: "round_robin", 527 Config: json.RawMessage(`{}`), 528 }, 529 }, 530 }, 531 } 532 for _, tt := range tests { 533 t.Run(tt.name, func(t *testing.T) { 534 var cfg EDSConfig 535 if err := json.Unmarshal([]byte(tt.s), &cfg); err != nil || !cmp.Equal(&cfg, tt.want) { 536 t.Errorf("test name: %s, parseFullServiceConfig() = %+v, err: %v, want %+v, <nil>", tt.name, cfg, err, tt.want) 537 } 538 }) 539 } 540} 541 542func (s) TestEqualStringPointers(t *testing.T) { 543 var ( 544 ta1 = "test-a" 545 ta2 = "test-a" 546 tb = "test-b" 547 ) 548 tests := []struct { 549 name string 550 a *string 551 b *string 552 want bool 553 }{ 554 {"both-nil", nil, nil, true}, 555 {"a-non-nil", &ta1, nil, false}, 556 {"b-non-nil", nil, &tb, false}, 557 {"equal", &ta1, &ta2, true}, 558 {"different", &ta1, &tb, false}, 559 } 560 for _, tt := range tests { 561 t.Run(tt.name, func(t *testing.T) { 562 if got := equalStringPointers(tt.a, tt.b); got != tt.want { 563 t.Errorf("equalStringPointers() = %v, want %v", got, tt.want) 564 } 565 }) 566 } 567} 568