1/* 2 * 3 * Copyright 2019 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package edsbalancer 20 21import ( 22 "bytes" 23 "encoding/json" 24 "fmt" 25 "testing" 26 27 corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" 28 "github.com/golang/protobuf/jsonpb" 29 wrapperspb "github.com/golang/protobuf/ptypes/wrappers" 30 "github.com/google/go-cmp/cmp" 31 "google.golang.org/grpc" 32 "google.golang.org/grpc/balancer" 33 "google.golang.org/grpc/connectivity" 34 "google.golang.org/grpc/internal/grpclog" 35 "google.golang.org/grpc/internal/grpctest" 36 scpb "google.golang.org/grpc/internal/proto/grpc_service_config" 37 "google.golang.org/grpc/resolver" 38 "google.golang.org/grpc/serviceconfig" 39 "google.golang.org/grpc/xds/internal/balancer/lrs" 40 xdsclient "google.golang.org/grpc/xds/internal/client" 41 "google.golang.org/grpc/xds/internal/client/bootstrap" 42 "google.golang.org/grpc/xds/internal/testutils" 43 "google.golang.org/grpc/xds/internal/testutils/fakeclient" 44) 45 46func init() { 47 balancer.Register(&edsBalancerBuilder{}) 48 49 bootstrapConfigNew = func() (*bootstrap.Config, error) { 50 return &bootstrap.Config{ 51 BalancerName: testBalancerNameFooBar, 52 Creds: grpc.WithInsecure(), 53 NodeProto: &corepb.Node{}, 54 }, nil 55 } 56} 57 58func subConnFromPicker(p balancer.V2Picker) func() balancer.SubConn { 59 return func() balancer.SubConn { 60 scst, _ := p.Pick(balancer.PickInfo{}) 61 return scst.SubConn 62 } 63} 64 65type s struct { 66 grpctest.Tester 67} 68 69func Test(t *testing.T) { 70 grpctest.RunSubTests(t, s{}) 71} 72 73const testBalancerNameFooBar = "foo.bar" 74 75func newNoopTestClientConn() *noopTestClientConn { 76 return &noopTestClientConn{} 77} 78 79// noopTestClientConn is used in EDS balancer config update tests that only 80// cover the config update handling, but not SubConn/load-balancing. 81type noopTestClientConn struct { 82 balancer.ClientConn 83} 84 85func (t *noopTestClientConn) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) { 86 return nil, nil 87} 88 89func (noopTestClientConn) Target() string { return testServiceName } 90 91type scStateChange struct { 92 sc balancer.SubConn 93 state connectivity.State 94} 95 96type fakeEDSBalancer struct { 97 cc balancer.ClientConn 98 childPolicy *testutils.Channel 99 subconnStateChange *testutils.Channel 100 loadStore lrs.Store 101} 102 103func (f *fakeEDSBalancer) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) { 104 f.subconnStateChange.Send(&scStateChange{sc: sc, state: state}) 105} 106 107func (f *fakeEDSBalancer) HandleChildPolicy(name string, config json.RawMessage) { 108 f.childPolicy.Send(&loadBalancingConfig{Name: name, Config: config}) 109} 110 111func (f *fakeEDSBalancer) Close() {} 112func (f *fakeEDSBalancer) HandleEDSResponse(edsResp *xdsclient.EDSUpdate) {} 113func (f *fakeEDSBalancer) updateState(priority priorityType, s balancer.State) {} 114 115func (f *fakeEDSBalancer) waitForChildPolicy(wantPolicy *loadBalancingConfig) error { 116 val, err := f.childPolicy.Receive() 117 if err != nil { 118 return fmt.Errorf("error waiting for childPolicy: %v", err) 119 } 120 gotPolicy := val.(*loadBalancingConfig) 121 if !cmp.Equal(gotPolicy, wantPolicy) { 122 return fmt.Errorf("got childPolicy %v, want %v", gotPolicy, wantPolicy) 123 } 124 return nil 125} 126 127func (f *fakeEDSBalancer) waitForSubConnStateChange(wantState *scStateChange) error { 128 val, err := f.subconnStateChange.Receive() 129 if err != nil { 130 return fmt.Errorf("error waiting for subconnStateChange: %v", err) 131 } 132 gotState := val.(*scStateChange) 133 if !cmp.Equal(gotState, wantState, cmp.AllowUnexported(scStateChange{})) { 134 return fmt.Errorf("got subconnStateChange %v, want %v", gotState, wantState) 135 } 136 return nil 137} 138 139func newFakeEDSBalancer(cc balancer.ClientConn, loadStore lrs.Store) edsBalancerImplInterface { 140 return &fakeEDSBalancer{ 141 cc: cc, 142 childPolicy: testutils.NewChannelWithSize(10), 143 subconnStateChange: testutils.NewChannelWithSize(10), 144 loadStore: loadStore, 145 } 146} 147 148type fakeSubConn struct{} 149 150func (*fakeSubConn) UpdateAddresses([]resolver.Address) { panic("implement me") } 151func (*fakeSubConn) Connect() { panic("implement me") } 152 153// waitForNewXDSClientWithEDSWatch makes sure that a new xdsClient is created 154// with the provided name. It also make sure that the newly created client 155// registers an eds watcher. 156func waitForNewXDSClientWithEDSWatch(t *testing.T, ch *testutils.Channel, wantName string) *fakeclient.Client { 157 t.Helper() 158 159 val, err := ch.Receive() 160 if err != nil { 161 t.Fatalf("error when waiting for a new xds client: %v", err) 162 return nil 163 } 164 xdsC := val.(*fakeclient.Client) 165 if xdsC.Name() != wantName { 166 t.Fatalf("xdsClient created to balancer: %v, want %v", xdsC.Name(), wantName) 167 return nil 168 } 169 _, err = xdsC.WaitForWatchEDS() 170 if err != nil { 171 t.Fatalf("xdsClient.WatchEndpoints failed with error: %v", err) 172 return nil 173 } 174 return xdsC 175} 176 177// waitForNewEDSLB makes sure that a new edsLB is created by the top-level 178// edsBalancer. 179func waitForNewEDSLB(t *testing.T, ch *testutils.Channel) *fakeEDSBalancer { 180 t.Helper() 181 182 val, err := ch.Receive() 183 if err != nil { 184 t.Fatalf("error when waiting for a new edsLB: %v", err) 185 return nil 186 } 187 return val.(*fakeEDSBalancer) 188} 189 190// setup overrides the functions which are used to create the xdsClient and the 191// edsLB, creates fake version of them and makes them available on the provided 192// channels. The returned cancel function should be called by the test for 193// cleanup. 194func setup(edsLBCh *testutils.Channel, xdsClientCh *testutils.Channel) func() { 195 origNewEDSBalancer := newEDSBalancer 196 newEDSBalancer = func(cc balancer.ClientConn, enqueue func(priorityType, balancer.State), loadStore lrs.Store, logger *grpclog.PrefixLogger) edsBalancerImplInterface { 197 edsLB := newFakeEDSBalancer(cc, loadStore) 198 defer func() { edsLBCh.Send(edsLB) }() 199 return edsLB 200 } 201 202 origXdsClientNew := xdsclientNew 203 xdsclientNew = func(opts xdsclient.Options) (xdsClientInterface, error) { 204 xdsC := fakeclient.NewClientWithName(opts.Config.BalancerName) 205 defer func() { xdsClientCh.Send(xdsC) }() 206 return xdsC, nil 207 } 208 return func() { 209 newEDSBalancer = origNewEDSBalancer 210 xdsclientNew = origXdsClientNew 211 } 212} 213 214// TestXDSConfigBalancerNameUpdate verifies different scenarios where the 215// balancer name in the lbConfig is updated. 216// 217// The test does the following: 218// * Builds a new xds balancer. 219// * Repeatedly pushes new ClientConnState which specifies different 220// balancerName in the lbConfig. We expect xdsClient objects to created 221// whenever the balancerName changes. 222func (s) TestXDSConfigBalancerNameUpdate(t *testing.T) { 223 oldBootstrapConfigNew := bootstrapConfigNew 224 bootstrapConfigNew = func() (*bootstrap.Config, error) { 225 // Return an error from bootstrap, so the eds balancer will use 226 // BalancerName from the config. 227 // 228 // TODO: remove this when deleting BalancerName from config. 229 return nil, fmt.Errorf("no bootstrap available") 230 } 231 defer func() { bootstrapConfigNew = oldBootstrapConfigNew }() 232 edsLBCh := testutils.NewChannel() 233 xdsClientCh := testutils.NewChannel() 234 cancel := setup(edsLBCh, xdsClientCh) 235 defer cancel() 236 237 builder := balancer.Get(edsName) 238 cc := newNoopTestClientConn() 239 edsB, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testEDSClusterName}}).(*edsBalancer) 240 if !ok { 241 t.Fatalf("builder.Build(%s) returned type {%T}, want {*edsBalancer}", edsName, edsB) 242 } 243 defer edsB.Close() 244 245 addrs := []resolver.Address{{Addr: "1.1.1.1:10001"}, {Addr: "2.2.2.2:10002"}, {Addr: "3.3.3.3:10003"}} 246 for i := 0; i < 2; i++ { 247 balancerName := fmt.Sprintf("balancer-%d", i) 248 edsB.UpdateClientConnState(balancer.ClientConnState{ 249 ResolverState: resolver.State{Addresses: addrs}, 250 BalancerConfig: &EDSConfig{ 251 BalancerName: balancerName, 252 EDSServiceName: testEDSClusterName, 253 }, 254 }) 255 256 xdsC := waitForNewXDSClientWithEDSWatch(t, xdsClientCh, balancerName) 257 xdsC.InvokeWatchEDSCallback(&xdsclient.EDSUpdate{}, nil) 258 } 259} 260 261const ( 262 fakeBalancerA = "fake_balancer_A" 263 fakeBalancerB = "fake_balancer_B" 264) 265 266// Install two fake balancers for service config update tests. 267// 268// ParseConfig only accepts the json if the balancer specified is registered. 269 270func init() { 271 balancer.Register(&fakeBalancerBuilder{name: fakeBalancerA}) 272 balancer.Register(&fakeBalancerBuilder{name: fakeBalancerB}) 273} 274 275type fakeBalancerBuilder struct { 276 name string 277} 278 279func (b *fakeBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer { 280 return &fakeBalancer{cc: cc} 281} 282 283func (b *fakeBalancerBuilder) Name() string { 284 return b.name 285} 286 287type fakeBalancer struct { 288 cc balancer.ClientConn 289} 290 291func (b *fakeBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) { 292 panic("implement me") 293} 294 295func (b *fakeBalancer) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) { 296 panic("implement me") 297} 298 299func (b *fakeBalancer) Close() {} 300 301// TestXDSConnfigChildPolicyUpdate verifies scenarios where the childPolicy 302// section of the lbConfig is updated. 303// 304// The test does the following: 305// * Builds a new xds balancer. 306// * Pushes a new ClientConnState with a childPolicy set to fakeBalancerA. 307// Verifies that a new xdsClient is created. It then pushes a new edsUpdate 308// through the fakexds client. Verifies that a new edsLB is created and it 309// receives the expected childPolicy. 310// * Pushes a new ClientConnState with a childPolicy set to fakeBalancerB. 311// This time around, we expect no new xdsClient or edsLB to be created. 312// Instead, we expect the existing edsLB to receive the new child policy. 313func (s) TestXDSConnfigChildPolicyUpdate(t *testing.T) { 314 edsLBCh := testutils.NewChannel() 315 xdsClientCh := testutils.NewChannel() 316 cancel := setup(edsLBCh, xdsClientCh) 317 defer cancel() 318 319 builder := balancer.Get(edsName) 320 cc := newNoopTestClientConn() 321 edsB, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}).(*edsBalancer) 322 if !ok { 323 t.Fatalf("builder.Build(%s) returned type {%T}, want {*edsBalancer}", edsName, edsB) 324 } 325 defer edsB.Close() 326 327 edsB.UpdateClientConnState(balancer.ClientConnState{ 328 BalancerConfig: &EDSConfig{ 329 BalancerName: testBalancerNameFooBar, 330 ChildPolicy: &loadBalancingConfig{ 331 Name: fakeBalancerA, 332 Config: json.RawMessage("{}"), 333 }, 334 EDSServiceName: testEDSClusterName, 335 }, 336 }) 337 xdsC := waitForNewXDSClientWithEDSWatch(t, xdsClientCh, testBalancerNameFooBar) 338 xdsC.InvokeWatchEDSCallback(&xdsclient.EDSUpdate{}, nil) 339 edsLB := waitForNewEDSLB(t, edsLBCh) 340 edsLB.waitForChildPolicy(&loadBalancingConfig{ 341 Name: string(fakeBalancerA), 342 Config: json.RawMessage(`{}`), 343 }) 344 345 edsB.UpdateClientConnState(balancer.ClientConnState{ 346 BalancerConfig: &EDSConfig{ 347 BalancerName: testBalancerNameFooBar, 348 ChildPolicy: &loadBalancingConfig{ 349 Name: fakeBalancerB, 350 Config: json.RawMessage("{}"), 351 }, 352 EDSServiceName: testEDSClusterName, 353 }, 354 }) 355 edsLB.waitForChildPolicy(&loadBalancingConfig{ 356 Name: string(fakeBalancerA), 357 Config: json.RawMessage(`{}`), 358 }) 359} 360 361// TestXDSSubConnStateChange verifies if the top-level edsBalancer passes on 362// the subConnStateChange to appropriate child balancers. 363func (s) TestXDSSubConnStateChange(t *testing.T) { 364 edsLBCh := testutils.NewChannel() 365 xdsClientCh := testutils.NewChannel() 366 cancel := setup(edsLBCh, xdsClientCh) 367 defer cancel() 368 369 builder := balancer.Get(edsName) 370 cc := newNoopTestClientConn() 371 edsB, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testEDSClusterName}}).(*edsBalancer) 372 if !ok { 373 t.Fatalf("builder.Build(%s) returned type {%T}, want {*edsBalancer}", edsName, edsB) 374 } 375 defer edsB.Close() 376 377 addrs := []resolver.Address{{Addr: "1.1.1.1:10001"}, {Addr: "2.2.2.2:10002"}, {Addr: "3.3.3.3:10003"}} 378 edsB.UpdateClientConnState(balancer.ClientConnState{ 379 ResolverState: resolver.State{Addresses: addrs}, 380 BalancerConfig: &EDSConfig{ 381 BalancerName: testBalancerNameFooBar, 382 EDSServiceName: testEDSClusterName, 383 }, 384 }) 385 386 xdsC := waitForNewXDSClientWithEDSWatch(t, xdsClientCh, testBalancerNameFooBar) 387 xdsC.InvokeWatchEDSCallback(&xdsclient.EDSUpdate{}, nil) 388 edsLB := waitForNewEDSLB(t, edsLBCh) 389 390 fsc := &fakeSubConn{} 391 state := connectivity.Ready 392 edsB.UpdateSubConnState(fsc, balancer.SubConnState{ConnectivityState: state}) 393 edsLB.waitForSubConnStateChange(&scStateChange{sc: fsc, state: state}) 394} 395 396func (s) TestXDSBalancerConfigParsing(t *testing.T) { 397 const testEDSName = "eds.service" 398 var testLRSName = "lrs.server" 399 b := bytes.NewBuffer(nil) 400 if err := (&jsonpb.Marshaler{}).Marshal(b, &scpb.XdsConfig{ 401 ChildPolicy: []*scpb.LoadBalancingConfig{ 402 {Policy: &scpb.LoadBalancingConfig_Xds{}}, 403 {Policy: &scpb.LoadBalancingConfig_RoundRobin{ 404 RoundRobin: &scpb.RoundRobinConfig{}, 405 }}, 406 }, 407 FallbackPolicy: []*scpb.LoadBalancingConfig{ 408 {Policy: &scpb.LoadBalancingConfig_Xds{}}, 409 {Policy: &scpb.LoadBalancingConfig_PickFirst{ 410 PickFirst: &scpb.PickFirstConfig{}, 411 }}, 412 }, 413 EdsServiceName: testEDSName, 414 LrsLoadReportingServerName: &wrapperspb.StringValue{Value: testLRSName}, 415 }); err != nil { 416 t.Fatalf("%v", err) 417 } 418 419 tests := []struct { 420 name string 421 js json.RawMessage 422 want serviceconfig.LoadBalancingConfig 423 wantErr bool 424 }{ 425 { 426 name: "jsonpb-generated", 427 js: b.Bytes(), 428 want: &EDSConfig{ 429 ChildPolicy: &loadBalancingConfig{ 430 Name: "round_robin", 431 Config: json.RawMessage("{}"), 432 }, 433 FallBackPolicy: &loadBalancingConfig{ 434 Name: "pick_first", 435 Config: json.RawMessage("{}"), 436 }, 437 EDSServiceName: testEDSName, 438 LrsLoadReportingServerName: &testLRSName, 439 }, 440 wantErr: false, 441 }, 442 { 443 // json with random balancers, and the first is not registered. 444 name: "manually-generated", 445 js: json.RawMessage(` 446{ 447 "balancerName": "fake.foo.bar", 448 "childPolicy": [ 449 {"fake_balancer_C": {}}, 450 {"fake_balancer_A": {}}, 451 {"fake_balancer_B": {}} 452 ], 453 "fallbackPolicy": [ 454 {"fake_balancer_C": {}}, 455 {"fake_balancer_B": {}}, 456 {"fake_balancer_A": {}} 457 ], 458 "edsServiceName": "eds.service", 459 "lrsLoadReportingServerName": "lrs.server" 460}`), 461 want: &EDSConfig{ 462 BalancerName: "fake.foo.bar", 463 ChildPolicy: &loadBalancingConfig{ 464 Name: "fake_balancer_A", 465 Config: json.RawMessage("{}"), 466 }, 467 FallBackPolicy: &loadBalancingConfig{ 468 Name: "fake_balancer_B", 469 Config: json.RawMessage("{}"), 470 }, 471 EDSServiceName: testEDSName, 472 LrsLoadReportingServerName: &testLRSName, 473 }, 474 wantErr: false, 475 }, 476 { 477 // json with no lrs server name, LrsLoadReportingServerName should 478 // be nil (not an empty string). 479 name: "no-lrs-server-name", 480 js: json.RawMessage(` 481{ 482 "balancerName": "fake.foo.bar", 483 "edsServiceName": "eds.service" 484}`), 485 want: &EDSConfig{ 486 BalancerName: "fake.foo.bar", 487 EDSServiceName: testEDSName, 488 LrsLoadReportingServerName: nil, 489 }, 490 wantErr: false, 491 }, 492 } 493 for _, tt := range tests { 494 t.Run(tt.name, func(t *testing.T) { 495 b := &edsBalancerBuilder{} 496 got, err := b.ParseConfig(tt.js) 497 if (err != nil) != tt.wantErr { 498 t.Errorf("edsBalancerBuilder.ParseConfig() error = %v, wantErr %v", err, tt.wantErr) 499 return 500 } 501 if !cmp.Equal(got, tt.want) { 502 t.Errorf(cmp.Diff(got, tt.want)) 503 } 504 }) 505 } 506} 507func (s) TestLoadbalancingConfigParsing(t *testing.T) { 508 tests := []struct { 509 name string 510 s string 511 want *EDSConfig 512 }{ 513 { 514 name: "empty", 515 s: "{}", 516 want: &EDSConfig{}, 517 }, 518 { 519 name: "success1", 520 s: `{"childPolicy":[{"pick_first":{}}]}`, 521 want: &EDSConfig{ 522 ChildPolicy: &loadBalancingConfig{ 523 Name: "pick_first", 524 Config: json.RawMessage(`{}`), 525 }, 526 }, 527 }, 528 { 529 name: "success2", 530 s: `{"childPolicy":[{"round_robin":{}},{"pick_first":{}}]}`, 531 want: &EDSConfig{ 532 ChildPolicy: &loadBalancingConfig{ 533 Name: "round_robin", 534 Config: json.RawMessage(`{}`), 535 }, 536 }, 537 }, 538 } 539 for _, tt := range tests { 540 t.Run(tt.name, func(t *testing.T) { 541 var cfg EDSConfig 542 if err := json.Unmarshal([]byte(tt.s), &cfg); err != nil || !cmp.Equal(&cfg, tt.want) { 543 t.Errorf("test name: %s, parseFullServiceConfig() = %+v, err: %v, want %+v, <nil>", tt.name, cfg, err, tt.want) 544 } 545 }) 546 } 547} 548 549func (s) TestEqualStringPointers(t *testing.T) { 550 var ( 551 ta1 = "test-a" 552 ta2 = "test-a" 553 tb = "test-b" 554 ) 555 tests := []struct { 556 name string 557 a *string 558 b *string 559 want bool 560 }{ 561 {"both-nil", nil, nil, true}, 562 {"a-non-nil", &ta1, nil, false}, 563 {"b-non-nil", nil, &tb, false}, 564 {"equal", &ta1, &ta2, true}, 565 {"different", &ta1, &tb, false}, 566 } 567 for _, tt := range tests { 568 t.Run(tt.name, func(t *testing.T) { 569 if got := equalStringPointers(tt.a, tt.b); got != tt.want { 570 t.Errorf("equalStringPointers() = %v, want %v", got, tt.want) 571 } 572 }) 573 } 574} 575