1// +build go1.12
2
3/*
4 *
5 * Copyright 2020 gRPC authors.
6 *
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 *     http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 *
19 */
20
21package clusterimpl
22
23import (
24	"context"
25	"fmt"
26	"strings"
27	"testing"
28	"time"
29
30	"github.com/google/go-cmp/cmp"
31	"github.com/google/go-cmp/cmp/cmpopts"
32	"google.golang.org/grpc/balancer"
33	"google.golang.org/grpc/balancer/roundrobin"
34	"google.golang.org/grpc/connectivity"
35	"google.golang.org/grpc/internal"
36	internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
37	"google.golang.org/grpc/resolver"
38	xdsinternal "google.golang.org/grpc/xds/internal"
39	"google.golang.org/grpc/xds/internal/testutils"
40	"google.golang.org/grpc/xds/internal/testutils/fakeclient"
41	"google.golang.org/grpc/xds/internal/xdsclient"
42	"google.golang.org/grpc/xds/internal/xdsclient/load"
43)
44
45const (
46	defaultTestTimeout      = 1 * time.Second
47	defaultShortTestTimeout = 100 * time.Microsecond
48
49	testClusterName   = "test-cluster"
50	testServiceName   = "test-eds-service"
51	testLRSServerName = "test-lrs-name"
52)
53
54var (
55	testBackendAddrs = []resolver.Address{
56		{Addr: "1.1.1.1:1"},
57	}
58
59	cmpOpts = cmp.Options{
60		cmpopts.EquateEmpty(),
61		cmpopts.IgnoreFields(load.Data{}, "ReportInterval"),
62	}
63)
64
65func subConnFromPicker(p balancer.Picker) func() balancer.SubConn {
66	return func() balancer.SubConn {
67		scst, _ := p.Pick(balancer.PickInfo{})
68		return scst.SubConn
69	}
70}
71
72func init() {
73	NewRandomWRR = testutils.NewTestWRR
74}
75
76// TestDropByCategory verifies that the balancer correctly drops the picks, and
77// that the drops are reported.
78func TestDropByCategory(t *testing.T) {
79	defer xdsclient.ClearCounterForTesting(testClusterName, testServiceName)
80	xdsC := fakeclient.NewClient()
81	defer xdsC.Close()
82
83	builder := balancer.Get(Name)
84	cc := testutils.NewTestClientConn(t)
85	b := builder.Build(cc, balancer.BuildOptions{})
86	defer b.Close()
87
88	const (
89		dropReason      = "test-dropping-category"
90		dropNumerator   = 1
91		dropDenominator = 2
92	)
93	if err := b.UpdateClientConnState(balancer.ClientConnState{
94		ResolverState: xdsclient.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC),
95		BalancerConfig: &LBConfig{
96			Cluster:                 testClusterName,
97			EDSServiceName:          testServiceName,
98			LoadReportingServerName: newString(testLRSServerName),
99			DropCategories: []DropConfig{{
100				Category:           dropReason,
101				RequestsPerMillion: million * dropNumerator / dropDenominator,
102			}},
103			ChildPolicy: &internalserviceconfig.BalancerConfig{
104				Name: roundrobin.Name,
105			},
106		},
107	}); err != nil {
108		t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
109	}
110
111	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
112	defer cancel()
113
114	got, err := xdsC.WaitForReportLoad(ctx)
115	if err != nil {
116		t.Fatalf("xdsClient.ReportLoad failed with error: %v", err)
117	}
118	if got.Server != testLRSServerName {
119		t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got.Server, testLRSServerName)
120	}
121
122	sc1 := <-cc.NewSubConnCh
123	b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
124	// This should get the connecting picker.
125	p0 := <-cc.NewPickerCh
126	for i := 0; i < 10; i++ {
127		_, err := p0.Pick(balancer.PickInfo{})
128		if err != balancer.ErrNoSubConnAvailable {
129			t.Fatalf("picker.Pick, got _,%v, want Err=%v", err, balancer.ErrNoSubConnAvailable)
130		}
131	}
132
133	b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
134	// Test pick with one backend.
135	p1 := <-cc.NewPickerCh
136	const rpcCount = 20
137	for i := 0; i < rpcCount; i++ {
138		gotSCSt, err := p1.Pick(balancer.PickInfo{})
139		// Even RPCs are dropped.
140		if i%2 == 0 {
141			if err == nil || !strings.Contains(err.Error(), "dropped") {
142				t.Fatalf("pick.Pick, got %v, %v, want error RPC dropped", gotSCSt, err)
143			}
144			continue
145		}
146		if err != nil || !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) {
147			t.Fatalf("picker.Pick, got %v, %v, want SubConn=%v", gotSCSt, err, sc1)
148		}
149		if gotSCSt.Done != nil {
150			gotSCSt.Done(balancer.DoneInfo{})
151		}
152	}
153
154	// Dump load data from the store and compare with expected counts.
155	loadStore := xdsC.LoadStore()
156	if loadStore == nil {
157		t.Fatal("loadStore is nil in xdsClient")
158	}
159	const dropCount = rpcCount * dropNumerator / dropDenominator
160	wantStatsData0 := []*load.Data{{
161		Cluster:    testClusterName,
162		Service:    testServiceName,
163		TotalDrops: dropCount,
164		Drops:      map[string]uint64{dropReason: dropCount},
165		LocalityStats: map[string]load.LocalityData{
166			assertString(xdsinternal.LocalityID{}.ToString): {RequestStats: load.RequestData{Succeeded: rpcCount - dropCount}},
167		},
168	}}
169
170	gotStatsData0 := loadStore.Stats([]string{testClusterName})
171	if diff := cmp.Diff(gotStatsData0, wantStatsData0, cmpOpts); diff != "" {
172		t.Fatalf("got unexpected reports, diff (-got, +want): %v", diff)
173	}
174
175	// Send an update with new drop configs.
176	const (
177		dropReason2      = "test-dropping-category-2"
178		dropNumerator2   = 1
179		dropDenominator2 = 4
180	)
181	if err := b.UpdateClientConnState(balancer.ClientConnState{
182		ResolverState: xdsclient.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC),
183		BalancerConfig: &LBConfig{
184			Cluster:                 testClusterName,
185			EDSServiceName:          testServiceName,
186			LoadReportingServerName: newString(testLRSServerName),
187			DropCategories: []DropConfig{{
188				Category:           dropReason2,
189				RequestsPerMillion: million * dropNumerator2 / dropDenominator2,
190			}},
191			ChildPolicy: &internalserviceconfig.BalancerConfig{
192				Name: roundrobin.Name,
193			},
194		},
195	}); err != nil {
196		t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
197	}
198
199	p2 := <-cc.NewPickerCh
200	for i := 0; i < rpcCount; i++ {
201		gotSCSt, err := p2.Pick(balancer.PickInfo{})
202		// Even RPCs are dropped.
203		if i%4 == 0 {
204			if err == nil || !strings.Contains(err.Error(), "dropped") {
205				t.Fatalf("pick.Pick, got %v, %v, want error RPC dropped", gotSCSt, err)
206			}
207			continue
208		}
209		if err != nil || !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) {
210			t.Fatalf("picker.Pick, got %v, %v, want SubConn=%v", gotSCSt, err, sc1)
211		}
212		if gotSCSt.Done != nil {
213			gotSCSt.Done(balancer.DoneInfo{})
214		}
215	}
216
217	const dropCount2 = rpcCount * dropNumerator2 / dropDenominator2
218	wantStatsData1 := []*load.Data{{
219		Cluster:    testClusterName,
220		Service:    testServiceName,
221		TotalDrops: dropCount2,
222		Drops:      map[string]uint64{dropReason2: dropCount2},
223		LocalityStats: map[string]load.LocalityData{
224			assertString(xdsinternal.LocalityID{}.ToString): {RequestStats: load.RequestData{Succeeded: rpcCount - dropCount2}},
225		},
226	}}
227
228	gotStatsData1 := loadStore.Stats([]string{testClusterName})
229	if diff := cmp.Diff(gotStatsData1, wantStatsData1, cmpOpts); diff != "" {
230		t.Fatalf("got unexpected reports, diff (-got, +want): %v", diff)
231	}
232}
233
234// TestDropCircuitBreaking verifies that the balancer correctly drops the picks
235// due to circuit breaking, and that the drops are reported.
236func TestDropCircuitBreaking(t *testing.T) {
237	defer xdsclient.ClearCounterForTesting(testClusterName, testServiceName)
238	xdsC := fakeclient.NewClient()
239	defer xdsC.Close()
240
241	builder := balancer.Get(Name)
242	cc := testutils.NewTestClientConn(t)
243	b := builder.Build(cc, balancer.BuildOptions{})
244	defer b.Close()
245
246	var maxRequest uint32 = 50
247	if err := b.UpdateClientConnState(balancer.ClientConnState{
248		ResolverState: xdsclient.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC),
249		BalancerConfig: &LBConfig{
250			Cluster:                 testClusterName,
251			EDSServiceName:          testServiceName,
252			LoadReportingServerName: newString(testLRSServerName),
253			MaxConcurrentRequests:   &maxRequest,
254			ChildPolicy: &internalserviceconfig.BalancerConfig{
255				Name: roundrobin.Name,
256			},
257		},
258	}); err != nil {
259		t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
260	}
261
262	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
263	defer cancel()
264
265	got, err := xdsC.WaitForReportLoad(ctx)
266	if err != nil {
267		t.Fatalf("xdsClient.ReportLoad failed with error: %v", err)
268	}
269	if got.Server != testLRSServerName {
270		t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got.Server, testLRSServerName)
271	}
272
273	sc1 := <-cc.NewSubConnCh
274	b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
275	// This should get the connecting picker.
276	p0 := <-cc.NewPickerCh
277	for i := 0; i < 10; i++ {
278		_, err := p0.Pick(balancer.PickInfo{})
279		if err != balancer.ErrNoSubConnAvailable {
280			t.Fatalf("picker.Pick, got _,%v, want Err=%v", err, balancer.ErrNoSubConnAvailable)
281		}
282	}
283
284	b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
285	// Test pick with one backend.
286	dones := []func(){}
287	p1 := <-cc.NewPickerCh
288	const rpcCount = 100
289	for i := 0; i < rpcCount; i++ {
290		gotSCSt, err := p1.Pick(balancer.PickInfo{})
291		if i < 50 && err != nil {
292			t.Errorf("The first 50%% picks should be non-drops, got error %v", err)
293		} else if i > 50 && err == nil {
294			t.Errorf("The second 50%% picks should be drops, got error <nil>")
295		}
296		dones = append(dones, func() {
297			if gotSCSt.Done != nil {
298				gotSCSt.Done(balancer.DoneInfo{})
299			}
300		})
301	}
302	for _, done := range dones {
303		done()
304	}
305
306	dones = []func(){}
307	// Pick without drops.
308	for i := 0; i < 50; i++ {
309		gotSCSt, err := p1.Pick(balancer.PickInfo{})
310		if err != nil {
311			t.Errorf("The third 50%% picks should be non-drops, got error %v", err)
312		}
313		dones = append(dones, func() {
314			if gotSCSt.Done != nil {
315				gotSCSt.Done(balancer.DoneInfo{})
316			}
317		})
318	}
319	for _, done := range dones {
320		done()
321	}
322
323	// Dump load data from the store and compare with expected counts.
324	loadStore := xdsC.LoadStore()
325	if loadStore == nil {
326		t.Fatal("loadStore is nil in xdsClient")
327	}
328
329	wantStatsData0 := []*load.Data{{
330		Cluster:    testClusterName,
331		Service:    testServiceName,
332		TotalDrops: uint64(maxRequest),
333		LocalityStats: map[string]load.LocalityData{
334			assertString(xdsinternal.LocalityID{}.ToString): {RequestStats: load.RequestData{Succeeded: uint64(rpcCount - maxRequest + 50)}},
335		},
336	}}
337
338	gotStatsData0 := loadStore.Stats([]string{testClusterName})
339	if diff := cmp.Diff(gotStatsData0, wantStatsData0, cmpOpts); diff != "" {
340		t.Fatalf("got unexpected drop reports, diff (-got, +want): %v", diff)
341	}
342}
343
344// TestPickerUpdateAfterClose covers the case that cluster_impl wants to update
345// picker after it's closed. Because picker updates are sent in the run()
346// goroutine.
347func TestPickerUpdateAfterClose(t *testing.T) {
348	defer xdsclient.ClearCounterForTesting(testClusterName, testServiceName)
349	xdsC := fakeclient.NewClient()
350	defer xdsC.Close()
351
352	builder := balancer.Get(Name)
353	cc := testutils.NewTestClientConn(t)
354	b := builder.Build(cc, balancer.BuildOptions{})
355
356	var maxRequest uint32 = 50
357	if err := b.UpdateClientConnState(balancer.ClientConnState{
358		ResolverState: xdsclient.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC),
359		BalancerConfig: &LBConfig{
360			Cluster:               testClusterName,
361			EDSServiceName:        testServiceName,
362			MaxConcurrentRequests: &maxRequest,
363			ChildPolicy: &internalserviceconfig.BalancerConfig{
364				Name: roundrobin.Name,
365			},
366		},
367	}); err != nil {
368		b.Close()
369		t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
370	}
371
372	// Send SubConn state changes to trigger picker updates. Balancer will
373	// closed in a defer.
374	sc1 := <-cc.NewSubConnCh
375	b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
376	// This close will race with the SubConn state update.
377	b.Close()
378
379	select {
380	case <-cc.NewPickerCh:
381		t.Fatalf("unexpected picker update after balancer is closed")
382	case <-time.After(time.Millisecond * 10):
383	}
384}
385
386// TestClusterNameInAddressAttributes covers the case that cluster name is
387// attached to the subconn address attributes.
388func TestClusterNameInAddressAttributes(t *testing.T) {
389	defer xdsclient.ClearCounterForTesting(testClusterName, testServiceName)
390	xdsC := fakeclient.NewClient()
391	defer xdsC.Close()
392
393	builder := balancer.Get(Name)
394	cc := testutils.NewTestClientConn(t)
395	b := builder.Build(cc, balancer.BuildOptions{})
396	defer b.Close()
397
398	if err := b.UpdateClientConnState(balancer.ClientConnState{
399		ResolverState: xdsclient.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC),
400		BalancerConfig: &LBConfig{
401			Cluster:        testClusterName,
402			EDSServiceName: testServiceName,
403			ChildPolicy: &internalserviceconfig.BalancerConfig{
404				Name: roundrobin.Name,
405			},
406		},
407	}); err != nil {
408		t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
409	}
410
411	sc1 := <-cc.NewSubConnCh
412	b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
413	// This should get the connecting picker.
414	p0 := <-cc.NewPickerCh
415	for i := 0; i < 10; i++ {
416		_, err := p0.Pick(balancer.PickInfo{})
417		if err != balancer.ErrNoSubConnAvailable {
418			t.Fatalf("picker.Pick, got _,%v, want Err=%v", err, balancer.ErrNoSubConnAvailable)
419		}
420	}
421
422	addrs1 := <-cc.NewSubConnAddrsCh
423	if got, want := addrs1[0].Addr, testBackendAddrs[0].Addr; got != want {
424		t.Fatalf("sc is created with addr %v, want %v", got, want)
425	}
426	cn, ok := internal.GetXDSHandshakeClusterName(addrs1[0].Attributes)
427	if !ok || cn != testClusterName {
428		t.Fatalf("sc is created with addr with cluster name %v, %v, want cluster name %v", cn, ok, testClusterName)
429	}
430
431	b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
432	// Test pick with one backend.
433	p1 := <-cc.NewPickerCh
434	const rpcCount = 20
435	for i := 0; i < rpcCount; i++ {
436		gotSCSt, err := p1.Pick(balancer.PickInfo{})
437		if err != nil || !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) {
438			t.Fatalf("picker.Pick, got %v, %v, want SubConn=%v", gotSCSt, err, sc1)
439		}
440		if gotSCSt.Done != nil {
441			gotSCSt.Done(balancer.DoneInfo{})
442		}
443	}
444
445	const testClusterName2 = "test-cluster-2"
446	var addr2 = resolver.Address{Addr: "2.2.2.2"}
447	if err := b.UpdateClientConnState(balancer.ClientConnState{
448		ResolverState: xdsclient.SetClient(resolver.State{Addresses: []resolver.Address{addr2}}, xdsC),
449		BalancerConfig: &LBConfig{
450			Cluster:        testClusterName2,
451			EDSServiceName: testServiceName,
452			ChildPolicy: &internalserviceconfig.BalancerConfig{
453				Name: roundrobin.Name,
454			},
455		},
456	}); err != nil {
457		t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
458	}
459
460	addrs2 := <-cc.NewSubConnAddrsCh
461	if got, want := addrs2[0].Addr, addr2.Addr; got != want {
462		t.Fatalf("sc is created with addr %v, want %v", got, want)
463	}
464	// New addresses should have the new cluster name.
465	cn2, ok := internal.GetXDSHandshakeClusterName(addrs2[0].Attributes)
466	if !ok || cn2 != testClusterName2 {
467		t.Fatalf("sc is created with addr with cluster name %v, %v, want cluster name %v", cn2, ok, testClusterName2)
468	}
469}
470
471// TestReResolution verifies that when a SubConn turns transient failure,
472// re-resolution is triggered.
473func TestReResolution(t *testing.T) {
474	defer xdsclient.ClearCounterForTesting(testClusterName, testServiceName)
475	xdsC := fakeclient.NewClient()
476	defer xdsC.Close()
477
478	builder := balancer.Get(Name)
479	cc := testutils.NewTestClientConn(t)
480	b := builder.Build(cc, balancer.BuildOptions{})
481	defer b.Close()
482
483	if err := b.UpdateClientConnState(balancer.ClientConnState{
484		ResolverState: xdsclient.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC),
485		BalancerConfig: &LBConfig{
486			Cluster:        testClusterName,
487			EDSServiceName: testServiceName,
488			ChildPolicy: &internalserviceconfig.BalancerConfig{
489				Name: roundrobin.Name,
490			},
491		},
492	}); err != nil {
493		t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
494	}
495
496	sc1 := <-cc.NewSubConnCh
497	b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
498	// This should get the connecting picker.
499	p0 := <-cc.NewPickerCh
500	for i := 0; i < 10; i++ {
501		_, err := p0.Pick(balancer.PickInfo{})
502		if err != balancer.ErrNoSubConnAvailable {
503			t.Fatalf("picker.Pick, got _,%v, want Err=%v", err, balancer.ErrNoSubConnAvailable)
504		}
505	}
506
507	b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
508	// This should get the transient failure picker.
509	p1 := <-cc.NewPickerCh
510	for i := 0; i < 10; i++ {
511		_, err := p1.Pick(balancer.PickInfo{})
512		if err == nil {
513			t.Fatalf("picker.Pick, got _,%v, want not nil", err)
514		}
515	}
516
517	// The transient failure should trigger a re-resolution.
518	select {
519	case <-cc.ResolveNowCh:
520	case <-time.After(defaultTestTimeout):
521		t.Fatalf("timeout waiting for ResolveNow()")
522	}
523
524	b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
525	// Test pick with one backend.
526	p2 := <-cc.NewPickerCh
527	want := []balancer.SubConn{sc1}
528	if err := testutils.IsRoundRobin(want, subConnFromPicker(p2)); err != nil {
529		t.Fatalf("want %v, got %v", want, err)
530	}
531
532	b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
533	// This should get the transient failure picker.
534	p3 := <-cc.NewPickerCh
535	for i := 0; i < 10; i++ {
536		_, err := p3.Pick(balancer.PickInfo{})
537		if err == nil {
538			t.Fatalf("picker.Pick, got _,%v, want not nil", err)
539		}
540	}
541
542	// The transient failure should trigger a re-resolution.
543	select {
544	case <-cc.ResolveNowCh:
545	case <-time.After(defaultTestTimeout):
546		t.Fatalf("timeout waiting for ResolveNow()")
547	}
548}
549
550func TestLoadReporting(t *testing.T) {
551	var testLocality = xdsinternal.LocalityID{
552		Region:  "test-region",
553		Zone:    "test-zone",
554		SubZone: "test-sub-zone",
555	}
556
557	xdsC := fakeclient.NewClient()
558	defer xdsC.Close()
559
560	builder := balancer.Get(Name)
561	cc := testutils.NewTestClientConn(t)
562	b := builder.Build(cc, balancer.BuildOptions{})
563	defer b.Close()
564
565	addrs := make([]resolver.Address, len(testBackendAddrs))
566	for i, a := range testBackendAddrs {
567		addrs[i] = xdsinternal.SetLocalityID(a, testLocality)
568	}
569	if err := b.UpdateClientConnState(balancer.ClientConnState{
570		ResolverState: xdsclient.SetClient(resolver.State{Addresses: addrs}, xdsC),
571		BalancerConfig: &LBConfig{
572			Cluster:                 testClusterName,
573			EDSServiceName:          testServiceName,
574			LoadReportingServerName: newString(testLRSServerName),
575			// Locality:                testLocality,
576			ChildPolicy: &internalserviceconfig.BalancerConfig{
577				Name: roundrobin.Name,
578			},
579		},
580	}); err != nil {
581		t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
582	}
583
584	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
585	defer cancel()
586
587	got, err := xdsC.WaitForReportLoad(ctx)
588	if err != nil {
589		t.Fatalf("xdsClient.ReportLoad failed with error: %v", err)
590	}
591	if got.Server != testLRSServerName {
592		t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got.Server, testLRSServerName)
593	}
594
595	sc1 := <-cc.NewSubConnCh
596	b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
597	// This should get the connecting picker.
598	p0 := <-cc.NewPickerCh
599	for i := 0; i < 10; i++ {
600		_, err := p0.Pick(balancer.PickInfo{})
601		if err != balancer.ErrNoSubConnAvailable {
602			t.Fatalf("picker.Pick, got _,%v, want Err=%v", err, balancer.ErrNoSubConnAvailable)
603		}
604	}
605
606	b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
607	// Test pick with one backend.
608	p1 := <-cc.NewPickerCh
609	const successCount = 5
610	for i := 0; i < successCount; i++ {
611		gotSCSt, err := p1.Pick(balancer.PickInfo{})
612		if !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) {
613			t.Fatalf("picker.Pick, got %v, %v, want SubConn=%v", gotSCSt, err, sc1)
614		}
615		gotSCSt.Done(balancer.DoneInfo{})
616	}
617	const errorCount = 5
618	for i := 0; i < errorCount; i++ {
619		gotSCSt, err := p1.Pick(balancer.PickInfo{})
620		if !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) {
621			t.Fatalf("picker.Pick, got %v, %v, want SubConn=%v", gotSCSt, err, sc1)
622		}
623		gotSCSt.Done(balancer.DoneInfo{Err: fmt.Errorf("error")})
624	}
625
626	// Dump load data from the store and compare with expected counts.
627	loadStore := xdsC.LoadStore()
628	if loadStore == nil {
629		t.Fatal("loadStore is nil in xdsClient")
630	}
631	sds := loadStore.Stats([]string{testClusterName})
632	if len(sds) == 0 {
633		t.Fatalf("loads for cluster %v not found in store", testClusterName)
634	}
635	sd := sds[0]
636	if sd.Cluster != testClusterName || sd.Service != testServiceName {
637		t.Fatalf("got unexpected load for %q, %q, want %q, %q", sd.Cluster, sd.Service, testClusterName, testServiceName)
638	}
639	testLocalityJSON, _ := testLocality.ToString()
640	localityData, ok := sd.LocalityStats[testLocalityJSON]
641	if !ok {
642		t.Fatalf("loads for %v not found in store", testLocality)
643	}
644	reqStats := localityData.RequestStats
645	if reqStats.Succeeded != successCount {
646		t.Errorf("got succeeded %v, want %v", reqStats.Succeeded, successCount)
647	}
648	if reqStats.Errored != errorCount {
649		t.Errorf("got errord %v, want %v", reqStats.Errored, errorCount)
650	}
651	if reqStats.InProgress != 0 {
652		t.Errorf("got inProgress %v, want %v", reqStats.InProgress, 0)
653	}
654
655	b.Close()
656	if err := xdsC.WaitForCancelReportLoad(ctx); err != nil {
657		t.Fatalf("unexpected error waiting form load report to be canceled: %v", err)
658	}
659}
660
661// TestUpdateLRSServer covers the cases
662// - the init config specifies "" as the LRS server
663// - config modifies LRS server to a different string
664// - config sets LRS server to nil to stop load reporting
665func TestUpdateLRSServer(t *testing.T) {
666	var testLocality = xdsinternal.LocalityID{
667		Region:  "test-region",
668		Zone:    "test-zone",
669		SubZone: "test-sub-zone",
670	}
671
672	xdsC := fakeclient.NewClient()
673	defer xdsC.Close()
674
675	builder := balancer.Get(Name)
676	cc := testutils.NewTestClientConn(t)
677	b := builder.Build(cc, balancer.BuildOptions{})
678	defer b.Close()
679
680	addrs := make([]resolver.Address, len(testBackendAddrs))
681	for i, a := range testBackendAddrs {
682		addrs[i] = xdsinternal.SetLocalityID(a, testLocality)
683	}
684	if err := b.UpdateClientConnState(balancer.ClientConnState{
685		ResolverState: xdsclient.SetClient(resolver.State{Addresses: addrs}, xdsC),
686		BalancerConfig: &LBConfig{
687			Cluster:                 testClusterName,
688			EDSServiceName:          testServiceName,
689			LoadReportingServerName: newString(""),
690			ChildPolicy: &internalserviceconfig.BalancerConfig{
691				Name: roundrobin.Name,
692			},
693		},
694	}); err != nil {
695		t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
696	}
697
698	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
699	defer cancel()
700
701	got, err := xdsC.WaitForReportLoad(ctx)
702	if err != nil {
703		t.Fatalf("xdsClient.ReportLoad failed with error: %v", err)
704	}
705	if got.Server != "" {
706		t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got.Server, "")
707	}
708
709	// Update LRS server to a different name.
710	if err := b.UpdateClientConnState(balancer.ClientConnState{
711		ResolverState: xdsclient.SetClient(resolver.State{Addresses: addrs}, xdsC),
712		BalancerConfig: &LBConfig{
713			Cluster:                 testClusterName,
714			EDSServiceName:          testServiceName,
715			LoadReportingServerName: newString(testLRSServerName),
716			ChildPolicy: &internalserviceconfig.BalancerConfig{
717				Name: roundrobin.Name,
718			},
719		},
720	}); err != nil {
721		t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
722	}
723	if err := xdsC.WaitForCancelReportLoad(ctx); err != nil {
724		t.Fatalf("unexpected error waiting form load report to be canceled: %v", err)
725	}
726	got2, err2 := xdsC.WaitForReportLoad(ctx)
727	if err2 != nil {
728		t.Fatalf("xdsClient.ReportLoad failed with error: %v", err2)
729	}
730	if got2.Server != testLRSServerName {
731		t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got2.Server, testLRSServerName)
732	}
733
734	// Update LRS server to nil, to disable LRS.
735	if err := b.UpdateClientConnState(balancer.ClientConnState{
736		ResolverState: xdsclient.SetClient(resolver.State{Addresses: addrs}, xdsC),
737		BalancerConfig: &LBConfig{
738			Cluster:                 testClusterName,
739			EDSServiceName:          testServiceName,
740			LoadReportingServerName: nil,
741			ChildPolicy: &internalserviceconfig.BalancerConfig{
742				Name: roundrobin.Name,
743			},
744		},
745	}); err != nil {
746		t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
747	}
748	if err := xdsC.WaitForCancelReportLoad(ctx); err != nil {
749		t.Fatalf("unexpected error waiting form load report to be canceled: %v", err)
750	}
751
752	shortCtx, shortCancel := context.WithTimeout(context.Background(), defaultShortTestTimeout)
753	defer shortCancel()
754	if s, err := xdsC.WaitForReportLoad(shortCtx); err != context.DeadlineExceeded {
755		t.Fatalf("unexpected load report to server: %q", s)
756	}
757}
758
759func assertString(f func() (string, error)) string {
760	s, err := f()
761	if err != nil {
762		panic(err.Error())
763	}
764	return s
765}
766