1// +build go1.12 2 3/* 4 * 5 * Copyright 2020 gRPC authors. 6 * 7 * Licensed under the Apache License, Version 2.0 (the "License"); 8 * you may not use this file except in compliance with the License. 9 * You may obtain a copy of the License at 10 * 11 * http://www.apache.org/licenses/LICENSE-2.0 12 * 13 * Unless required by applicable law or agreed to in writing, software 14 * distributed under the License is distributed on an "AS IS" BASIS, 15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 16 * See the License for the specific language governing permissions and 17 * limitations under the License. 18 * 19 */ 20 21package clusterimpl 22 23import ( 24 "context" 25 "fmt" 26 "strings" 27 "testing" 28 "time" 29 30 "github.com/google/go-cmp/cmp" 31 "github.com/google/go-cmp/cmp/cmpopts" 32 "google.golang.org/grpc/balancer" 33 "google.golang.org/grpc/balancer/roundrobin" 34 "google.golang.org/grpc/connectivity" 35 "google.golang.org/grpc/internal" 36 internalserviceconfig "google.golang.org/grpc/internal/serviceconfig" 37 "google.golang.org/grpc/resolver" 38 xdsinternal "google.golang.org/grpc/xds/internal" 39 "google.golang.org/grpc/xds/internal/testutils" 40 "google.golang.org/grpc/xds/internal/testutils/fakeclient" 41 "google.golang.org/grpc/xds/internal/xdsclient" 42 "google.golang.org/grpc/xds/internal/xdsclient/load" 43) 44 45const ( 46 defaultTestTimeout = 1 * time.Second 47 defaultShortTestTimeout = 100 * time.Microsecond 48 49 testClusterName = "test-cluster" 50 testServiceName = "test-eds-service" 51 testLRSServerName = "test-lrs-name" 52) 53 54var ( 55 testBackendAddrs = []resolver.Address{ 56 {Addr: "1.1.1.1:1"}, 57 } 58 59 cmpOpts = cmp.Options{ 60 cmpopts.EquateEmpty(), 61 cmpopts.IgnoreFields(load.Data{}, "ReportInterval"), 62 } 63) 64 65func subConnFromPicker(p balancer.Picker) func() balancer.SubConn { 66 return func() balancer.SubConn { 67 scst, _ := p.Pick(balancer.PickInfo{}) 68 return scst.SubConn 69 } 70} 71 72func init() { 73 NewRandomWRR = testutils.NewTestWRR 74} 75 76// TestDropByCategory verifies that the balancer correctly drops the picks, and 77// that the drops are reported. 78func TestDropByCategory(t *testing.T) { 79 defer xdsclient.ClearCounterForTesting(testClusterName, testServiceName) 80 xdsC := fakeclient.NewClient() 81 defer xdsC.Close() 82 83 builder := balancer.Get(Name) 84 cc := testutils.NewTestClientConn(t) 85 b := builder.Build(cc, balancer.BuildOptions{}) 86 defer b.Close() 87 88 const ( 89 dropReason = "test-dropping-category" 90 dropNumerator = 1 91 dropDenominator = 2 92 ) 93 if err := b.UpdateClientConnState(balancer.ClientConnState{ 94 ResolverState: xdsclient.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC), 95 BalancerConfig: &LBConfig{ 96 Cluster: testClusterName, 97 EDSServiceName: testServiceName, 98 LoadReportingServerName: newString(testLRSServerName), 99 DropCategories: []DropConfig{{ 100 Category: dropReason, 101 RequestsPerMillion: million * dropNumerator / dropDenominator, 102 }}, 103 ChildPolicy: &internalserviceconfig.BalancerConfig{ 104 Name: roundrobin.Name, 105 }, 106 }, 107 }); err != nil { 108 t.Fatalf("unexpected error from UpdateClientConnState: %v", err) 109 } 110 111 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) 112 defer cancel() 113 114 got, err := xdsC.WaitForReportLoad(ctx) 115 if err != nil { 116 t.Fatalf("xdsClient.ReportLoad failed with error: %v", err) 117 } 118 if got.Server != testLRSServerName { 119 t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got.Server, testLRSServerName) 120 } 121 122 sc1 := <-cc.NewSubConnCh 123 b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) 124 // This should get the connecting picker. 125 p0 := <-cc.NewPickerCh 126 for i := 0; i < 10; i++ { 127 _, err := p0.Pick(balancer.PickInfo{}) 128 if err != balancer.ErrNoSubConnAvailable { 129 t.Fatalf("picker.Pick, got _,%v, want Err=%v", err, balancer.ErrNoSubConnAvailable) 130 } 131 } 132 133 b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) 134 // Test pick with one backend. 135 p1 := <-cc.NewPickerCh 136 const rpcCount = 20 137 for i := 0; i < rpcCount; i++ { 138 gotSCSt, err := p1.Pick(balancer.PickInfo{}) 139 // Even RPCs are dropped. 140 if i%2 == 0 { 141 if err == nil || !strings.Contains(err.Error(), "dropped") { 142 t.Fatalf("pick.Pick, got %v, %v, want error RPC dropped", gotSCSt, err) 143 } 144 continue 145 } 146 if err != nil || !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) { 147 t.Fatalf("picker.Pick, got %v, %v, want SubConn=%v", gotSCSt, err, sc1) 148 } 149 if gotSCSt.Done != nil { 150 gotSCSt.Done(balancer.DoneInfo{}) 151 } 152 } 153 154 // Dump load data from the store and compare with expected counts. 155 loadStore := xdsC.LoadStore() 156 if loadStore == nil { 157 t.Fatal("loadStore is nil in xdsClient") 158 } 159 const dropCount = rpcCount * dropNumerator / dropDenominator 160 wantStatsData0 := []*load.Data{{ 161 Cluster: testClusterName, 162 Service: testServiceName, 163 TotalDrops: dropCount, 164 Drops: map[string]uint64{dropReason: dropCount}, 165 LocalityStats: map[string]load.LocalityData{ 166 assertString(xdsinternal.LocalityID{}.ToString): {RequestStats: load.RequestData{Succeeded: rpcCount - dropCount}}, 167 }, 168 }} 169 170 gotStatsData0 := loadStore.Stats([]string{testClusterName}) 171 if diff := cmp.Diff(gotStatsData0, wantStatsData0, cmpOpts); diff != "" { 172 t.Fatalf("got unexpected reports, diff (-got, +want): %v", diff) 173 } 174 175 // Send an update with new drop configs. 176 const ( 177 dropReason2 = "test-dropping-category-2" 178 dropNumerator2 = 1 179 dropDenominator2 = 4 180 ) 181 if err := b.UpdateClientConnState(balancer.ClientConnState{ 182 ResolverState: xdsclient.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC), 183 BalancerConfig: &LBConfig{ 184 Cluster: testClusterName, 185 EDSServiceName: testServiceName, 186 LoadReportingServerName: newString(testLRSServerName), 187 DropCategories: []DropConfig{{ 188 Category: dropReason2, 189 RequestsPerMillion: million * dropNumerator2 / dropDenominator2, 190 }}, 191 ChildPolicy: &internalserviceconfig.BalancerConfig{ 192 Name: roundrobin.Name, 193 }, 194 }, 195 }); err != nil { 196 t.Fatalf("unexpected error from UpdateClientConnState: %v", err) 197 } 198 199 p2 := <-cc.NewPickerCh 200 for i := 0; i < rpcCount; i++ { 201 gotSCSt, err := p2.Pick(balancer.PickInfo{}) 202 // Even RPCs are dropped. 203 if i%4 == 0 { 204 if err == nil || !strings.Contains(err.Error(), "dropped") { 205 t.Fatalf("pick.Pick, got %v, %v, want error RPC dropped", gotSCSt, err) 206 } 207 continue 208 } 209 if err != nil || !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) { 210 t.Fatalf("picker.Pick, got %v, %v, want SubConn=%v", gotSCSt, err, sc1) 211 } 212 if gotSCSt.Done != nil { 213 gotSCSt.Done(balancer.DoneInfo{}) 214 } 215 } 216 217 const dropCount2 = rpcCount * dropNumerator2 / dropDenominator2 218 wantStatsData1 := []*load.Data{{ 219 Cluster: testClusterName, 220 Service: testServiceName, 221 TotalDrops: dropCount2, 222 Drops: map[string]uint64{dropReason2: dropCount2}, 223 LocalityStats: map[string]load.LocalityData{ 224 assertString(xdsinternal.LocalityID{}.ToString): {RequestStats: load.RequestData{Succeeded: rpcCount - dropCount2}}, 225 }, 226 }} 227 228 gotStatsData1 := loadStore.Stats([]string{testClusterName}) 229 if diff := cmp.Diff(gotStatsData1, wantStatsData1, cmpOpts); diff != "" { 230 t.Fatalf("got unexpected reports, diff (-got, +want): %v", diff) 231 } 232} 233 234// TestDropCircuitBreaking verifies that the balancer correctly drops the picks 235// due to circuit breaking, and that the drops are reported. 236func TestDropCircuitBreaking(t *testing.T) { 237 defer xdsclient.ClearCounterForTesting(testClusterName, testServiceName) 238 xdsC := fakeclient.NewClient() 239 defer xdsC.Close() 240 241 builder := balancer.Get(Name) 242 cc := testutils.NewTestClientConn(t) 243 b := builder.Build(cc, balancer.BuildOptions{}) 244 defer b.Close() 245 246 var maxRequest uint32 = 50 247 if err := b.UpdateClientConnState(balancer.ClientConnState{ 248 ResolverState: xdsclient.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC), 249 BalancerConfig: &LBConfig{ 250 Cluster: testClusterName, 251 EDSServiceName: testServiceName, 252 LoadReportingServerName: newString(testLRSServerName), 253 MaxConcurrentRequests: &maxRequest, 254 ChildPolicy: &internalserviceconfig.BalancerConfig{ 255 Name: roundrobin.Name, 256 }, 257 }, 258 }); err != nil { 259 t.Fatalf("unexpected error from UpdateClientConnState: %v", err) 260 } 261 262 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) 263 defer cancel() 264 265 got, err := xdsC.WaitForReportLoad(ctx) 266 if err != nil { 267 t.Fatalf("xdsClient.ReportLoad failed with error: %v", err) 268 } 269 if got.Server != testLRSServerName { 270 t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got.Server, testLRSServerName) 271 } 272 273 sc1 := <-cc.NewSubConnCh 274 b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) 275 // This should get the connecting picker. 276 p0 := <-cc.NewPickerCh 277 for i := 0; i < 10; i++ { 278 _, err := p0.Pick(balancer.PickInfo{}) 279 if err != balancer.ErrNoSubConnAvailable { 280 t.Fatalf("picker.Pick, got _,%v, want Err=%v", err, balancer.ErrNoSubConnAvailable) 281 } 282 } 283 284 b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) 285 // Test pick with one backend. 286 dones := []func(){} 287 p1 := <-cc.NewPickerCh 288 const rpcCount = 100 289 for i := 0; i < rpcCount; i++ { 290 gotSCSt, err := p1.Pick(balancer.PickInfo{}) 291 if i < 50 && err != nil { 292 t.Errorf("The first 50%% picks should be non-drops, got error %v", err) 293 } else if i > 50 && err == nil { 294 t.Errorf("The second 50%% picks should be drops, got error <nil>") 295 } 296 dones = append(dones, func() { 297 if gotSCSt.Done != nil { 298 gotSCSt.Done(balancer.DoneInfo{}) 299 } 300 }) 301 } 302 for _, done := range dones { 303 done() 304 } 305 306 dones = []func(){} 307 // Pick without drops. 308 for i := 0; i < 50; i++ { 309 gotSCSt, err := p1.Pick(balancer.PickInfo{}) 310 if err != nil { 311 t.Errorf("The third 50%% picks should be non-drops, got error %v", err) 312 } 313 dones = append(dones, func() { 314 if gotSCSt.Done != nil { 315 gotSCSt.Done(balancer.DoneInfo{}) 316 } 317 }) 318 } 319 for _, done := range dones { 320 done() 321 } 322 323 // Dump load data from the store and compare with expected counts. 324 loadStore := xdsC.LoadStore() 325 if loadStore == nil { 326 t.Fatal("loadStore is nil in xdsClient") 327 } 328 329 wantStatsData0 := []*load.Data{{ 330 Cluster: testClusterName, 331 Service: testServiceName, 332 TotalDrops: uint64(maxRequest), 333 LocalityStats: map[string]load.LocalityData{ 334 assertString(xdsinternal.LocalityID{}.ToString): {RequestStats: load.RequestData{Succeeded: uint64(rpcCount - maxRequest + 50)}}, 335 }, 336 }} 337 338 gotStatsData0 := loadStore.Stats([]string{testClusterName}) 339 if diff := cmp.Diff(gotStatsData0, wantStatsData0, cmpOpts); diff != "" { 340 t.Fatalf("got unexpected drop reports, diff (-got, +want): %v", diff) 341 } 342} 343 344// TestPickerUpdateAfterClose covers the case that cluster_impl wants to update 345// picker after it's closed. Because picker updates are sent in the run() 346// goroutine. 347func TestPickerUpdateAfterClose(t *testing.T) { 348 defer xdsclient.ClearCounterForTesting(testClusterName, testServiceName) 349 xdsC := fakeclient.NewClient() 350 defer xdsC.Close() 351 352 builder := balancer.Get(Name) 353 cc := testutils.NewTestClientConn(t) 354 b := builder.Build(cc, balancer.BuildOptions{}) 355 356 var maxRequest uint32 = 50 357 if err := b.UpdateClientConnState(balancer.ClientConnState{ 358 ResolverState: xdsclient.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC), 359 BalancerConfig: &LBConfig{ 360 Cluster: testClusterName, 361 EDSServiceName: testServiceName, 362 MaxConcurrentRequests: &maxRequest, 363 ChildPolicy: &internalserviceconfig.BalancerConfig{ 364 Name: roundrobin.Name, 365 }, 366 }, 367 }); err != nil { 368 b.Close() 369 t.Fatalf("unexpected error from UpdateClientConnState: %v", err) 370 } 371 372 // Send SubConn state changes to trigger picker updates. Balancer will 373 // closed in a defer. 374 sc1 := <-cc.NewSubConnCh 375 b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) 376 // This close will race with the SubConn state update. 377 b.Close() 378 379 select { 380 case <-cc.NewPickerCh: 381 t.Fatalf("unexpected picker update after balancer is closed") 382 case <-time.After(time.Millisecond * 10): 383 } 384} 385 386// TestClusterNameInAddressAttributes covers the case that cluster name is 387// attached to the subconn address attributes. 388func TestClusterNameInAddressAttributes(t *testing.T) { 389 defer xdsclient.ClearCounterForTesting(testClusterName, testServiceName) 390 xdsC := fakeclient.NewClient() 391 defer xdsC.Close() 392 393 builder := balancer.Get(Name) 394 cc := testutils.NewTestClientConn(t) 395 b := builder.Build(cc, balancer.BuildOptions{}) 396 defer b.Close() 397 398 if err := b.UpdateClientConnState(balancer.ClientConnState{ 399 ResolverState: xdsclient.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC), 400 BalancerConfig: &LBConfig{ 401 Cluster: testClusterName, 402 EDSServiceName: testServiceName, 403 ChildPolicy: &internalserviceconfig.BalancerConfig{ 404 Name: roundrobin.Name, 405 }, 406 }, 407 }); err != nil { 408 t.Fatalf("unexpected error from UpdateClientConnState: %v", err) 409 } 410 411 sc1 := <-cc.NewSubConnCh 412 b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) 413 // This should get the connecting picker. 414 p0 := <-cc.NewPickerCh 415 for i := 0; i < 10; i++ { 416 _, err := p0.Pick(balancer.PickInfo{}) 417 if err != balancer.ErrNoSubConnAvailable { 418 t.Fatalf("picker.Pick, got _,%v, want Err=%v", err, balancer.ErrNoSubConnAvailable) 419 } 420 } 421 422 addrs1 := <-cc.NewSubConnAddrsCh 423 if got, want := addrs1[0].Addr, testBackendAddrs[0].Addr; got != want { 424 t.Fatalf("sc is created with addr %v, want %v", got, want) 425 } 426 cn, ok := internal.GetXDSHandshakeClusterName(addrs1[0].Attributes) 427 if !ok || cn != testClusterName { 428 t.Fatalf("sc is created with addr with cluster name %v, %v, want cluster name %v", cn, ok, testClusterName) 429 } 430 431 b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) 432 // Test pick with one backend. 433 p1 := <-cc.NewPickerCh 434 const rpcCount = 20 435 for i := 0; i < rpcCount; i++ { 436 gotSCSt, err := p1.Pick(balancer.PickInfo{}) 437 if err != nil || !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) { 438 t.Fatalf("picker.Pick, got %v, %v, want SubConn=%v", gotSCSt, err, sc1) 439 } 440 if gotSCSt.Done != nil { 441 gotSCSt.Done(balancer.DoneInfo{}) 442 } 443 } 444 445 const testClusterName2 = "test-cluster-2" 446 var addr2 = resolver.Address{Addr: "2.2.2.2"} 447 if err := b.UpdateClientConnState(balancer.ClientConnState{ 448 ResolverState: xdsclient.SetClient(resolver.State{Addresses: []resolver.Address{addr2}}, xdsC), 449 BalancerConfig: &LBConfig{ 450 Cluster: testClusterName2, 451 EDSServiceName: testServiceName, 452 ChildPolicy: &internalserviceconfig.BalancerConfig{ 453 Name: roundrobin.Name, 454 }, 455 }, 456 }); err != nil { 457 t.Fatalf("unexpected error from UpdateClientConnState: %v", err) 458 } 459 460 addrs2 := <-cc.NewSubConnAddrsCh 461 if got, want := addrs2[0].Addr, addr2.Addr; got != want { 462 t.Fatalf("sc is created with addr %v, want %v", got, want) 463 } 464 // New addresses should have the new cluster name. 465 cn2, ok := internal.GetXDSHandshakeClusterName(addrs2[0].Attributes) 466 if !ok || cn2 != testClusterName2 { 467 t.Fatalf("sc is created with addr with cluster name %v, %v, want cluster name %v", cn2, ok, testClusterName2) 468 } 469} 470 471// TestReResolution verifies that when a SubConn turns transient failure, 472// re-resolution is triggered. 473func TestReResolution(t *testing.T) { 474 defer xdsclient.ClearCounterForTesting(testClusterName, testServiceName) 475 xdsC := fakeclient.NewClient() 476 defer xdsC.Close() 477 478 builder := balancer.Get(Name) 479 cc := testutils.NewTestClientConn(t) 480 b := builder.Build(cc, balancer.BuildOptions{}) 481 defer b.Close() 482 483 if err := b.UpdateClientConnState(balancer.ClientConnState{ 484 ResolverState: xdsclient.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC), 485 BalancerConfig: &LBConfig{ 486 Cluster: testClusterName, 487 EDSServiceName: testServiceName, 488 ChildPolicy: &internalserviceconfig.BalancerConfig{ 489 Name: roundrobin.Name, 490 }, 491 }, 492 }); err != nil { 493 t.Fatalf("unexpected error from UpdateClientConnState: %v", err) 494 } 495 496 sc1 := <-cc.NewSubConnCh 497 b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) 498 // This should get the connecting picker. 499 p0 := <-cc.NewPickerCh 500 for i := 0; i < 10; i++ { 501 _, err := p0.Pick(balancer.PickInfo{}) 502 if err != balancer.ErrNoSubConnAvailable { 503 t.Fatalf("picker.Pick, got _,%v, want Err=%v", err, balancer.ErrNoSubConnAvailable) 504 } 505 } 506 507 b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) 508 // This should get the transient failure picker. 509 p1 := <-cc.NewPickerCh 510 for i := 0; i < 10; i++ { 511 _, err := p1.Pick(balancer.PickInfo{}) 512 if err == nil { 513 t.Fatalf("picker.Pick, got _,%v, want not nil", err) 514 } 515 } 516 517 // The transient failure should trigger a re-resolution. 518 select { 519 case <-cc.ResolveNowCh: 520 case <-time.After(defaultTestTimeout): 521 t.Fatalf("timeout waiting for ResolveNow()") 522 } 523 524 b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) 525 // Test pick with one backend. 526 p2 := <-cc.NewPickerCh 527 want := []balancer.SubConn{sc1} 528 if err := testutils.IsRoundRobin(want, subConnFromPicker(p2)); err != nil { 529 t.Fatalf("want %v, got %v", want, err) 530 } 531 532 b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) 533 // This should get the transient failure picker. 534 p3 := <-cc.NewPickerCh 535 for i := 0; i < 10; i++ { 536 _, err := p3.Pick(balancer.PickInfo{}) 537 if err == nil { 538 t.Fatalf("picker.Pick, got _,%v, want not nil", err) 539 } 540 } 541 542 // The transient failure should trigger a re-resolution. 543 select { 544 case <-cc.ResolveNowCh: 545 case <-time.After(defaultTestTimeout): 546 t.Fatalf("timeout waiting for ResolveNow()") 547 } 548} 549 550func TestLoadReporting(t *testing.T) { 551 var testLocality = xdsinternal.LocalityID{ 552 Region: "test-region", 553 Zone: "test-zone", 554 SubZone: "test-sub-zone", 555 } 556 557 xdsC := fakeclient.NewClient() 558 defer xdsC.Close() 559 560 builder := balancer.Get(Name) 561 cc := testutils.NewTestClientConn(t) 562 b := builder.Build(cc, balancer.BuildOptions{}) 563 defer b.Close() 564 565 addrs := make([]resolver.Address, len(testBackendAddrs)) 566 for i, a := range testBackendAddrs { 567 addrs[i] = xdsinternal.SetLocalityID(a, testLocality) 568 } 569 if err := b.UpdateClientConnState(balancer.ClientConnState{ 570 ResolverState: xdsclient.SetClient(resolver.State{Addresses: addrs}, xdsC), 571 BalancerConfig: &LBConfig{ 572 Cluster: testClusterName, 573 EDSServiceName: testServiceName, 574 LoadReportingServerName: newString(testLRSServerName), 575 // Locality: testLocality, 576 ChildPolicy: &internalserviceconfig.BalancerConfig{ 577 Name: roundrobin.Name, 578 }, 579 }, 580 }); err != nil { 581 t.Fatalf("unexpected error from UpdateClientConnState: %v", err) 582 } 583 584 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) 585 defer cancel() 586 587 got, err := xdsC.WaitForReportLoad(ctx) 588 if err != nil { 589 t.Fatalf("xdsClient.ReportLoad failed with error: %v", err) 590 } 591 if got.Server != testLRSServerName { 592 t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got.Server, testLRSServerName) 593 } 594 595 sc1 := <-cc.NewSubConnCh 596 b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) 597 // This should get the connecting picker. 598 p0 := <-cc.NewPickerCh 599 for i := 0; i < 10; i++ { 600 _, err := p0.Pick(balancer.PickInfo{}) 601 if err != balancer.ErrNoSubConnAvailable { 602 t.Fatalf("picker.Pick, got _,%v, want Err=%v", err, balancer.ErrNoSubConnAvailable) 603 } 604 } 605 606 b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) 607 // Test pick with one backend. 608 p1 := <-cc.NewPickerCh 609 const successCount = 5 610 for i := 0; i < successCount; i++ { 611 gotSCSt, err := p1.Pick(balancer.PickInfo{}) 612 if !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) { 613 t.Fatalf("picker.Pick, got %v, %v, want SubConn=%v", gotSCSt, err, sc1) 614 } 615 gotSCSt.Done(balancer.DoneInfo{}) 616 } 617 const errorCount = 5 618 for i := 0; i < errorCount; i++ { 619 gotSCSt, err := p1.Pick(balancer.PickInfo{}) 620 if !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) { 621 t.Fatalf("picker.Pick, got %v, %v, want SubConn=%v", gotSCSt, err, sc1) 622 } 623 gotSCSt.Done(balancer.DoneInfo{Err: fmt.Errorf("error")}) 624 } 625 626 // Dump load data from the store and compare with expected counts. 627 loadStore := xdsC.LoadStore() 628 if loadStore == nil { 629 t.Fatal("loadStore is nil in xdsClient") 630 } 631 sds := loadStore.Stats([]string{testClusterName}) 632 if len(sds) == 0 { 633 t.Fatalf("loads for cluster %v not found in store", testClusterName) 634 } 635 sd := sds[0] 636 if sd.Cluster != testClusterName || sd.Service != testServiceName { 637 t.Fatalf("got unexpected load for %q, %q, want %q, %q", sd.Cluster, sd.Service, testClusterName, testServiceName) 638 } 639 testLocalityJSON, _ := testLocality.ToString() 640 localityData, ok := sd.LocalityStats[testLocalityJSON] 641 if !ok { 642 t.Fatalf("loads for %v not found in store", testLocality) 643 } 644 reqStats := localityData.RequestStats 645 if reqStats.Succeeded != successCount { 646 t.Errorf("got succeeded %v, want %v", reqStats.Succeeded, successCount) 647 } 648 if reqStats.Errored != errorCount { 649 t.Errorf("got errord %v, want %v", reqStats.Errored, errorCount) 650 } 651 if reqStats.InProgress != 0 { 652 t.Errorf("got inProgress %v, want %v", reqStats.InProgress, 0) 653 } 654 655 b.Close() 656 if err := xdsC.WaitForCancelReportLoad(ctx); err != nil { 657 t.Fatalf("unexpected error waiting form load report to be canceled: %v", err) 658 } 659} 660 661// TestUpdateLRSServer covers the cases 662// - the init config specifies "" as the LRS server 663// - config modifies LRS server to a different string 664// - config sets LRS server to nil to stop load reporting 665func TestUpdateLRSServer(t *testing.T) { 666 var testLocality = xdsinternal.LocalityID{ 667 Region: "test-region", 668 Zone: "test-zone", 669 SubZone: "test-sub-zone", 670 } 671 672 xdsC := fakeclient.NewClient() 673 defer xdsC.Close() 674 675 builder := balancer.Get(Name) 676 cc := testutils.NewTestClientConn(t) 677 b := builder.Build(cc, balancer.BuildOptions{}) 678 defer b.Close() 679 680 addrs := make([]resolver.Address, len(testBackendAddrs)) 681 for i, a := range testBackendAddrs { 682 addrs[i] = xdsinternal.SetLocalityID(a, testLocality) 683 } 684 if err := b.UpdateClientConnState(balancer.ClientConnState{ 685 ResolverState: xdsclient.SetClient(resolver.State{Addresses: addrs}, xdsC), 686 BalancerConfig: &LBConfig{ 687 Cluster: testClusterName, 688 EDSServiceName: testServiceName, 689 LoadReportingServerName: newString(""), 690 ChildPolicy: &internalserviceconfig.BalancerConfig{ 691 Name: roundrobin.Name, 692 }, 693 }, 694 }); err != nil { 695 t.Fatalf("unexpected error from UpdateClientConnState: %v", err) 696 } 697 698 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) 699 defer cancel() 700 701 got, err := xdsC.WaitForReportLoad(ctx) 702 if err != nil { 703 t.Fatalf("xdsClient.ReportLoad failed with error: %v", err) 704 } 705 if got.Server != "" { 706 t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got.Server, "") 707 } 708 709 // Update LRS server to a different name. 710 if err := b.UpdateClientConnState(balancer.ClientConnState{ 711 ResolverState: xdsclient.SetClient(resolver.State{Addresses: addrs}, xdsC), 712 BalancerConfig: &LBConfig{ 713 Cluster: testClusterName, 714 EDSServiceName: testServiceName, 715 LoadReportingServerName: newString(testLRSServerName), 716 ChildPolicy: &internalserviceconfig.BalancerConfig{ 717 Name: roundrobin.Name, 718 }, 719 }, 720 }); err != nil { 721 t.Fatalf("unexpected error from UpdateClientConnState: %v", err) 722 } 723 if err := xdsC.WaitForCancelReportLoad(ctx); err != nil { 724 t.Fatalf("unexpected error waiting form load report to be canceled: %v", err) 725 } 726 got2, err2 := xdsC.WaitForReportLoad(ctx) 727 if err2 != nil { 728 t.Fatalf("xdsClient.ReportLoad failed with error: %v", err2) 729 } 730 if got2.Server != testLRSServerName { 731 t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got2.Server, testLRSServerName) 732 } 733 734 // Update LRS server to nil, to disable LRS. 735 if err := b.UpdateClientConnState(balancer.ClientConnState{ 736 ResolverState: xdsclient.SetClient(resolver.State{Addresses: addrs}, xdsC), 737 BalancerConfig: &LBConfig{ 738 Cluster: testClusterName, 739 EDSServiceName: testServiceName, 740 LoadReportingServerName: nil, 741 ChildPolicy: &internalserviceconfig.BalancerConfig{ 742 Name: roundrobin.Name, 743 }, 744 }, 745 }); err != nil { 746 t.Fatalf("unexpected error from UpdateClientConnState: %v", err) 747 } 748 if err := xdsC.WaitForCancelReportLoad(ctx); err != nil { 749 t.Fatalf("unexpected error waiting form load report to be canceled: %v", err) 750 } 751 752 shortCtx, shortCancel := context.WithTimeout(context.Background(), defaultShortTestTimeout) 753 defer shortCancel() 754 if s, err := xdsC.WaitForReportLoad(shortCtx); err != context.DeadlineExceeded { 755 t.Fatalf("unexpected load report to server: %q", s) 756 } 757} 758 759func assertString(f func() (string, error)) string { 760 s, err := f() 761 if err != nil { 762 panic(err.Error()) 763 } 764 return s 765} 766