1/*
2 *
3 * Copyright 2020 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package clusterimpl
20
21import (
22	"context"
23	"errors"
24	"fmt"
25	"strings"
26	"testing"
27	"time"
28
29	"github.com/google/go-cmp/cmp"
30	"github.com/google/go-cmp/cmp/cmpopts"
31	"google.golang.org/grpc/balancer"
32	"google.golang.org/grpc/balancer/base"
33	"google.golang.org/grpc/balancer/roundrobin"
34	"google.golang.org/grpc/connectivity"
35	"google.golang.org/grpc/internal"
36	"google.golang.org/grpc/internal/balancer/stub"
37	"google.golang.org/grpc/internal/grpctest"
38	internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
39	"google.golang.org/grpc/resolver"
40	xdsinternal "google.golang.org/grpc/xds/internal"
41	"google.golang.org/grpc/xds/internal/testutils"
42	"google.golang.org/grpc/xds/internal/testutils/fakeclient"
43	"google.golang.org/grpc/xds/internal/xdsclient"
44	"google.golang.org/grpc/xds/internal/xdsclient/load"
45)
46
47const (
48	defaultTestTimeout      = 1 * time.Second
49	defaultShortTestTimeout = 100 * time.Microsecond
50
51	testClusterName   = "test-cluster"
52	testServiceName   = "test-eds-service"
53	testLRSServerName = "test-lrs-name"
54)
55
56var (
57	testBackendAddrs = []resolver.Address{
58		{Addr: "1.1.1.1:1"},
59	}
60
61	cmpOpts = cmp.Options{
62		cmpopts.EquateEmpty(),
63		cmpopts.IgnoreFields(load.Data{}, "ReportInterval"),
64	}
65)
66
67type s struct {
68	grpctest.Tester
69}
70
71func Test(t *testing.T) {
72	grpctest.RunSubTests(t, s{})
73}
74
75func subConnFromPicker(p balancer.Picker) func() balancer.SubConn {
76	return func() balancer.SubConn {
77		scst, _ := p.Pick(balancer.PickInfo{})
78		return scst.SubConn
79	}
80}
81
82func init() {
83	NewRandomWRR = testutils.NewTestWRR
84}
85
86// TestDropByCategory verifies that the balancer correctly drops the picks, and
87// that the drops are reported.
88func (s) TestDropByCategory(t *testing.T) {
89	defer xdsclient.ClearCounterForTesting(testClusterName, testServiceName)
90	xdsC := fakeclient.NewClient()
91	defer xdsC.Close()
92
93	builder := balancer.Get(Name)
94	cc := testutils.NewTestClientConn(t)
95	b := builder.Build(cc, balancer.BuildOptions{})
96	defer b.Close()
97
98	const (
99		dropReason      = "test-dropping-category"
100		dropNumerator   = 1
101		dropDenominator = 2
102	)
103	if err := b.UpdateClientConnState(balancer.ClientConnState{
104		ResolverState: xdsclient.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC),
105		BalancerConfig: &LBConfig{
106			Cluster:                 testClusterName,
107			EDSServiceName:          testServiceName,
108			LoadReportingServerName: newString(testLRSServerName),
109			DropCategories: []DropConfig{{
110				Category:           dropReason,
111				RequestsPerMillion: million * dropNumerator / dropDenominator,
112			}},
113			ChildPolicy: &internalserviceconfig.BalancerConfig{
114				Name: roundrobin.Name,
115			},
116		},
117	}); err != nil {
118		t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
119	}
120
121	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
122	defer cancel()
123
124	got, err := xdsC.WaitForReportLoad(ctx)
125	if err != nil {
126		t.Fatalf("xdsClient.ReportLoad failed with error: %v", err)
127	}
128	if got.Server != testLRSServerName {
129		t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got.Server, testLRSServerName)
130	}
131
132	sc1 := <-cc.NewSubConnCh
133	b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
134	// This should get the connecting picker.
135	p0 := <-cc.NewPickerCh
136	for i := 0; i < 10; i++ {
137		_, err := p0.Pick(balancer.PickInfo{})
138		if err != balancer.ErrNoSubConnAvailable {
139			t.Fatalf("picker.Pick, got _,%v, want Err=%v", err, balancer.ErrNoSubConnAvailable)
140		}
141	}
142
143	b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
144	// Test pick with one backend.
145	p1 := <-cc.NewPickerCh
146	const rpcCount = 20
147	for i := 0; i < rpcCount; i++ {
148		gotSCSt, err := p1.Pick(balancer.PickInfo{})
149		// Even RPCs are dropped.
150		if i%2 == 0 {
151			if err == nil || !strings.Contains(err.Error(), "dropped") {
152				t.Fatalf("pick.Pick, got %v, %v, want error RPC dropped", gotSCSt, err)
153			}
154			continue
155		}
156		if err != nil || !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) {
157			t.Fatalf("picker.Pick, got %v, %v, want SubConn=%v", gotSCSt, err, sc1)
158		}
159		if gotSCSt.Done != nil {
160			gotSCSt.Done(balancer.DoneInfo{})
161		}
162	}
163
164	// Dump load data from the store and compare with expected counts.
165	loadStore := xdsC.LoadStore()
166	if loadStore == nil {
167		t.Fatal("loadStore is nil in xdsClient")
168	}
169	const dropCount = rpcCount * dropNumerator / dropDenominator
170	wantStatsData0 := []*load.Data{{
171		Cluster:    testClusterName,
172		Service:    testServiceName,
173		TotalDrops: dropCount,
174		Drops:      map[string]uint64{dropReason: dropCount},
175		LocalityStats: map[string]load.LocalityData{
176			assertString(xdsinternal.LocalityID{}.ToString): {RequestStats: load.RequestData{Succeeded: rpcCount - dropCount}},
177		},
178	}}
179
180	gotStatsData0 := loadStore.Stats([]string{testClusterName})
181	if diff := cmp.Diff(gotStatsData0, wantStatsData0, cmpOpts); diff != "" {
182		t.Fatalf("got unexpected reports, diff (-got, +want): %v", diff)
183	}
184
185	// Send an update with new drop configs.
186	const (
187		dropReason2      = "test-dropping-category-2"
188		dropNumerator2   = 1
189		dropDenominator2 = 4
190	)
191	if err := b.UpdateClientConnState(balancer.ClientConnState{
192		ResolverState: xdsclient.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC),
193		BalancerConfig: &LBConfig{
194			Cluster:                 testClusterName,
195			EDSServiceName:          testServiceName,
196			LoadReportingServerName: newString(testLRSServerName),
197			DropCategories: []DropConfig{{
198				Category:           dropReason2,
199				RequestsPerMillion: million * dropNumerator2 / dropDenominator2,
200			}},
201			ChildPolicy: &internalserviceconfig.BalancerConfig{
202				Name: roundrobin.Name,
203			},
204		},
205	}); err != nil {
206		t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
207	}
208
209	p2 := <-cc.NewPickerCh
210	for i := 0; i < rpcCount; i++ {
211		gotSCSt, err := p2.Pick(balancer.PickInfo{})
212		// Even RPCs are dropped.
213		if i%4 == 0 {
214			if err == nil || !strings.Contains(err.Error(), "dropped") {
215				t.Fatalf("pick.Pick, got %v, %v, want error RPC dropped", gotSCSt, err)
216			}
217			continue
218		}
219		if err != nil || !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) {
220			t.Fatalf("picker.Pick, got %v, %v, want SubConn=%v", gotSCSt, err, sc1)
221		}
222		if gotSCSt.Done != nil {
223			gotSCSt.Done(balancer.DoneInfo{})
224		}
225	}
226
227	const dropCount2 = rpcCount * dropNumerator2 / dropDenominator2
228	wantStatsData1 := []*load.Data{{
229		Cluster:    testClusterName,
230		Service:    testServiceName,
231		TotalDrops: dropCount2,
232		Drops:      map[string]uint64{dropReason2: dropCount2},
233		LocalityStats: map[string]load.LocalityData{
234			assertString(xdsinternal.LocalityID{}.ToString): {RequestStats: load.RequestData{Succeeded: rpcCount - dropCount2}},
235		},
236	}}
237
238	gotStatsData1 := loadStore.Stats([]string{testClusterName})
239	if diff := cmp.Diff(gotStatsData1, wantStatsData1, cmpOpts); diff != "" {
240		t.Fatalf("got unexpected reports, diff (-got, +want): %v", diff)
241	}
242}
243
244// TestDropCircuitBreaking verifies that the balancer correctly drops the picks
245// due to circuit breaking, and that the drops are reported.
246func (s) TestDropCircuitBreaking(t *testing.T) {
247	defer xdsclient.ClearCounterForTesting(testClusterName, testServiceName)
248	xdsC := fakeclient.NewClient()
249	defer xdsC.Close()
250
251	builder := balancer.Get(Name)
252	cc := testutils.NewTestClientConn(t)
253	b := builder.Build(cc, balancer.BuildOptions{})
254	defer b.Close()
255
256	var maxRequest uint32 = 50
257	if err := b.UpdateClientConnState(balancer.ClientConnState{
258		ResolverState: xdsclient.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC),
259		BalancerConfig: &LBConfig{
260			Cluster:                 testClusterName,
261			EDSServiceName:          testServiceName,
262			LoadReportingServerName: newString(testLRSServerName),
263			MaxConcurrentRequests:   &maxRequest,
264			ChildPolicy: &internalserviceconfig.BalancerConfig{
265				Name: roundrobin.Name,
266			},
267		},
268	}); err != nil {
269		t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
270	}
271
272	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
273	defer cancel()
274
275	got, err := xdsC.WaitForReportLoad(ctx)
276	if err != nil {
277		t.Fatalf("xdsClient.ReportLoad failed with error: %v", err)
278	}
279	if got.Server != testLRSServerName {
280		t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got.Server, testLRSServerName)
281	}
282
283	sc1 := <-cc.NewSubConnCh
284	b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
285	// This should get the connecting picker.
286	p0 := <-cc.NewPickerCh
287	for i := 0; i < 10; i++ {
288		_, err := p0.Pick(balancer.PickInfo{})
289		if err != balancer.ErrNoSubConnAvailable {
290			t.Fatalf("picker.Pick, got _,%v, want Err=%v", err, balancer.ErrNoSubConnAvailable)
291		}
292	}
293
294	b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
295	// Test pick with one backend.
296	dones := []func(){}
297	p1 := <-cc.NewPickerCh
298	const rpcCount = 100
299	for i := 0; i < rpcCount; i++ {
300		gotSCSt, err := p1.Pick(balancer.PickInfo{})
301		if i < 50 && err != nil {
302			t.Errorf("The first 50%% picks should be non-drops, got error %v", err)
303		} else if i > 50 && err == nil {
304			t.Errorf("The second 50%% picks should be drops, got error <nil>")
305		}
306		dones = append(dones, func() {
307			if gotSCSt.Done != nil {
308				gotSCSt.Done(balancer.DoneInfo{})
309			}
310		})
311	}
312	for _, done := range dones {
313		done()
314	}
315
316	dones = []func(){}
317	// Pick without drops.
318	for i := 0; i < 50; i++ {
319		gotSCSt, err := p1.Pick(balancer.PickInfo{})
320		if err != nil {
321			t.Errorf("The third 50%% picks should be non-drops, got error %v", err)
322		}
323		dones = append(dones, func() {
324			if gotSCSt.Done != nil {
325				gotSCSt.Done(balancer.DoneInfo{})
326			}
327		})
328	}
329	for _, done := range dones {
330		done()
331	}
332
333	// Dump load data from the store and compare with expected counts.
334	loadStore := xdsC.LoadStore()
335	if loadStore == nil {
336		t.Fatal("loadStore is nil in xdsClient")
337	}
338
339	wantStatsData0 := []*load.Data{{
340		Cluster:    testClusterName,
341		Service:    testServiceName,
342		TotalDrops: uint64(maxRequest),
343		LocalityStats: map[string]load.LocalityData{
344			assertString(xdsinternal.LocalityID{}.ToString): {RequestStats: load.RequestData{Succeeded: uint64(rpcCount - maxRequest + 50)}},
345		},
346	}}
347
348	gotStatsData0 := loadStore.Stats([]string{testClusterName})
349	if diff := cmp.Diff(gotStatsData0, wantStatsData0, cmpOpts); diff != "" {
350		t.Fatalf("got unexpected drop reports, diff (-got, +want): %v", diff)
351	}
352}
353
354// TestPickerUpdateAfterClose covers the case where a child policy sends a
355// picker update after the cluster_impl policy is closed. Because picker updates
356// are handled in the run() goroutine, which exits before Close() returns, we
357// expect the above picker update to be dropped.
358func (s) TestPickerUpdateAfterClose(t *testing.T) {
359	defer xdsclient.ClearCounterForTesting(testClusterName, testServiceName)
360	xdsC := fakeclient.NewClient()
361	defer xdsC.Close()
362
363	builder := balancer.Get(Name)
364	cc := testutils.NewTestClientConn(t)
365	b := builder.Build(cc, balancer.BuildOptions{})
366
367	// Create a stub balancer which waits for the cluster_impl policy to be
368	// closed before sending a picker update (upon receipt of a subConn state
369	// change).
370	closeCh := make(chan struct{})
371	const childPolicyName = "stubBalancer-TestPickerUpdateAfterClose"
372	stub.Register(childPolicyName, stub.BalancerFuncs{
373		UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
374			// Create a subConn which will be used later on to test the race
375			// between UpdateSubConnState() and Close().
376			bd.ClientConn.NewSubConn(ccs.ResolverState.Addresses, balancer.NewSubConnOptions{})
377			return nil
378		},
379		UpdateSubConnState: func(bd *stub.BalancerData, _ balancer.SubConn, _ balancer.SubConnState) {
380			go func() {
381				// Wait for Close() to be called on the parent policy before
382				// sending the picker update.
383				<-closeCh
384				bd.ClientConn.UpdateState(balancer.State{
385					Picker: base.NewErrPicker(errors.New("dummy error picker")),
386				})
387			}()
388		},
389	})
390
391	var maxRequest uint32 = 50
392	if err := b.UpdateClientConnState(balancer.ClientConnState{
393		ResolverState: xdsclient.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC),
394		BalancerConfig: &LBConfig{
395			Cluster:               testClusterName,
396			EDSServiceName:        testServiceName,
397			MaxConcurrentRequests: &maxRequest,
398			ChildPolicy: &internalserviceconfig.BalancerConfig{
399				Name: childPolicyName,
400			},
401		},
402	}); err != nil {
403		b.Close()
404		t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
405	}
406
407	// Send a subConn state change to trigger a picker update. The stub balancer
408	// that we use as the child policy will not send a picker update until the
409	// parent policy is closed.
410	sc1 := <-cc.NewSubConnCh
411	b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
412	b.Close()
413	close(closeCh)
414
415	select {
416	case <-cc.NewPickerCh:
417		t.Fatalf("unexpected picker update after balancer is closed")
418	case <-time.After(defaultShortTestTimeout):
419	}
420}
421
422// TestClusterNameInAddressAttributes covers the case that cluster name is
423// attached to the subconn address attributes.
424func (s) TestClusterNameInAddressAttributes(t *testing.T) {
425	defer xdsclient.ClearCounterForTesting(testClusterName, testServiceName)
426	xdsC := fakeclient.NewClient()
427	defer xdsC.Close()
428
429	builder := balancer.Get(Name)
430	cc := testutils.NewTestClientConn(t)
431	b := builder.Build(cc, balancer.BuildOptions{})
432	defer b.Close()
433
434	if err := b.UpdateClientConnState(balancer.ClientConnState{
435		ResolverState: xdsclient.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC),
436		BalancerConfig: &LBConfig{
437			Cluster:        testClusterName,
438			EDSServiceName: testServiceName,
439			ChildPolicy: &internalserviceconfig.BalancerConfig{
440				Name: roundrobin.Name,
441			},
442		},
443	}); err != nil {
444		t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
445	}
446
447	sc1 := <-cc.NewSubConnCh
448	b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
449	// This should get the connecting picker.
450	p0 := <-cc.NewPickerCh
451	for i := 0; i < 10; i++ {
452		_, err := p0.Pick(balancer.PickInfo{})
453		if err != balancer.ErrNoSubConnAvailable {
454			t.Fatalf("picker.Pick, got _,%v, want Err=%v", err, balancer.ErrNoSubConnAvailable)
455		}
456	}
457
458	addrs1 := <-cc.NewSubConnAddrsCh
459	if got, want := addrs1[0].Addr, testBackendAddrs[0].Addr; got != want {
460		t.Fatalf("sc is created with addr %v, want %v", got, want)
461	}
462	cn, ok := internal.GetXDSHandshakeClusterName(addrs1[0].Attributes)
463	if !ok || cn != testClusterName {
464		t.Fatalf("sc is created with addr with cluster name %v, %v, want cluster name %v", cn, ok, testClusterName)
465	}
466
467	b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
468	// Test pick with one backend.
469	p1 := <-cc.NewPickerCh
470	const rpcCount = 20
471	for i := 0; i < rpcCount; i++ {
472		gotSCSt, err := p1.Pick(balancer.PickInfo{})
473		if err != nil || !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) {
474			t.Fatalf("picker.Pick, got %v, %v, want SubConn=%v", gotSCSt, err, sc1)
475		}
476		if gotSCSt.Done != nil {
477			gotSCSt.Done(balancer.DoneInfo{})
478		}
479	}
480
481	const testClusterName2 = "test-cluster-2"
482	var addr2 = resolver.Address{Addr: "2.2.2.2"}
483	if err := b.UpdateClientConnState(balancer.ClientConnState{
484		ResolverState: xdsclient.SetClient(resolver.State{Addresses: []resolver.Address{addr2}}, xdsC),
485		BalancerConfig: &LBConfig{
486			Cluster:        testClusterName2,
487			EDSServiceName: testServiceName,
488			ChildPolicy: &internalserviceconfig.BalancerConfig{
489				Name: roundrobin.Name,
490			},
491		},
492	}); err != nil {
493		t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
494	}
495
496	addrs2 := <-cc.NewSubConnAddrsCh
497	if got, want := addrs2[0].Addr, addr2.Addr; got != want {
498		t.Fatalf("sc is created with addr %v, want %v", got, want)
499	}
500	// New addresses should have the new cluster name.
501	cn2, ok := internal.GetXDSHandshakeClusterName(addrs2[0].Attributes)
502	if !ok || cn2 != testClusterName2 {
503		t.Fatalf("sc is created with addr with cluster name %v, %v, want cluster name %v", cn2, ok, testClusterName2)
504	}
505}
506
507// TestReResolution verifies that when a SubConn turns transient failure,
508// re-resolution is triggered.
509func (s) TestReResolution(t *testing.T) {
510	defer xdsclient.ClearCounterForTesting(testClusterName, testServiceName)
511	xdsC := fakeclient.NewClient()
512	defer xdsC.Close()
513
514	builder := balancer.Get(Name)
515	cc := testutils.NewTestClientConn(t)
516	b := builder.Build(cc, balancer.BuildOptions{})
517	defer b.Close()
518
519	if err := b.UpdateClientConnState(balancer.ClientConnState{
520		ResolverState: xdsclient.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC),
521		BalancerConfig: &LBConfig{
522			Cluster:        testClusterName,
523			EDSServiceName: testServiceName,
524			ChildPolicy: &internalserviceconfig.BalancerConfig{
525				Name: roundrobin.Name,
526			},
527		},
528	}); err != nil {
529		t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
530	}
531
532	sc1 := <-cc.NewSubConnCh
533	b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
534	// This should get the connecting picker.
535	p0 := <-cc.NewPickerCh
536	for i := 0; i < 10; i++ {
537		_, err := p0.Pick(balancer.PickInfo{})
538		if err != balancer.ErrNoSubConnAvailable {
539			t.Fatalf("picker.Pick, got _,%v, want Err=%v", err, balancer.ErrNoSubConnAvailable)
540		}
541	}
542
543	b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
544	// This should get the transient failure picker.
545	p1 := <-cc.NewPickerCh
546	for i := 0; i < 10; i++ {
547		_, err := p1.Pick(balancer.PickInfo{})
548		if err == nil {
549			t.Fatalf("picker.Pick, got _,%v, want not nil", err)
550		}
551	}
552
553	// The transient failure should trigger a re-resolution.
554	select {
555	case <-cc.ResolveNowCh:
556	case <-time.After(defaultTestTimeout):
557		t.Fatalf("timeout waiting for ResolveNow()")
558	}
559
560	b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
561	// Test pick with one backend.
562	p2 := <-cc.NewPickerCh
563	want := []balancer.SubConn{sc1}
564	if err := testutils.IsRoundRobin(want, subConnFromPicker(p2)); err != nil {
565		t.Fatalf("want %v, got %v", want, err)
566	}
567
568	b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
569	// This should get the transient failure picker.
570	p3 := <-cc.NewPickerCh
571	for i := 0; i < 10; i++ {
572		_, err := p3.Pick(balancer.PickInfo{})
573		if err == nil {
574			t.Fatalf("picker.Pick, got _,%v, want not nil", err)
575		}
576	}
577
578	// The transient failure should trigger a re-resolution.
579	select {
580	case <-cc.ResolveNowCh:
581	case <-time.After(defaultTestTimeout):
582		t.Fatalf("timeout waiting for ResolveNow()")
583	}
584}
585
586func (s) TestLoadReporting(t *testing.T) {
587	var testLocality = xdsinternal.LocalityID{
588		Region:  "test-region",
589		Zone:    "test-zone",
590		SubZone: "test-sub-zone",
591	}
592
593	xdsC := fakeclient.NewClient()
594	defer xdsC.Close()
595
596	builder := balancer.Get(Name)
597	cc := testutils.NewTestClientConn(t)
598	b := builder.Build(cc, balancer.BuildOptions{})
599	defer b.Close()
600
601	addrs := make([]resolver.Address, len(testBackendAddrs))
602	for i, a := range testBackendAddrs {
603		addrs[i] = xdsinternal.SetLocalityID(a, testLocality)
604	}
605	if err := b.UpdateClientConnState(balancer.ClientConnState{
606		ResolverState: xdsclient.SetClient(resolver.State{Addresses: addrs}, xdsC),
607		BalancerConfig: &LBConfig{
608			Cluster:                 testClusterName,
609			EDSServiceName:          testServiceName,
610			LoadReportingServerName: newString(testLRSServerName),
611			// Locality:                testLocality,
612			ChildPolicy: &internalserviceconfig.BalancerConfig{
613				Name: roundrobin.Name,
614			},
615		},
616	}); err != nil {
617		t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
618	}
619
620	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
621	defer cancel()
622
623	got, err := xdsC.WaitForReportLoad(ctx)
624	if err != nil {
625		t.Fatalf("xdsClient.ReportLoad failed with error: %v", err)
626	}
627	if got.Server != testLRSServerName {
628		t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got.Server, testLRSServerName)
629	}
630
631	sc1 := <-cc.NewSubConnCh
632	b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
633	// This should get the connecting picker.
634	p0 := <-cc.NewPickerCh
635	for i := 0; i < 10; i++ {
636		_, err := p0.Pick(balancer.PickInfo{})
637		if err != balancer.ErrNoSubConnAvailable {
638			t.Fatalf("picker.Pick, got _,%v, want Err=%v", err, balancer.ErrNoSubConnAvailable)
639		}
640	}
641
642	b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
643	// Test pick with one backend.
644	p1 := <-cc.NewPickerCh
645	const successCount = 5
646	for i := 0; i < successCount; i++ {
647		gotSCSt, err := p1.Pick(balancer.PickInfo{})
648		if !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) {
649			t.Fatalf("picker.Pick, got %v, %v, want SubConn=%v", gotSCSt, err, sc1)
650		}
651		gotSCSt.Done(balancer.DoneInfo{})
652	}
653	const errorCount = 5
654	for i := 0; i < errorCount; i++ {
655		gotSCSt, err := p1.Pick(balancer.PickInfo{})
656		if !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) {
657			t.Fatalf("picker.Pick, got %v, %v, want SubConn=%v", gotSCSt, err, sc1)
658		}
659		gotSCSt.Done(balancer.DoneInfo{Err: fmt.Errorf("error")})
660	}
661
662	// Dump load data from the store and compare with expected counts.
663	loadStore := xdsC.LoadStore()
664	if loadStore == nil {
665		t.Fatal("loadStore is nil in xdsClient")
666	}
667	sds := loadStore.Stats([]string{testClusterName})
668	if len(sds) == 0 {
669		t.Fatalf("loads for cluster %v not found in store", testClusterName)
670	}
671	sd := sds[0]
672	if sd.Cluster != testClusterName || sd.Service != testServiceName {
673		t.Fatalf("got unexpected load for %q, %q, want %q, %q", sd.Cluster, sd.Service, testClusterName, testServiceName)
674	}
675	testLocalityJSON, _ := testLocality.ToString()
676	localityData, ok := sd.LocalityStats[testLocalityJSON]
677	if !ok {
678		t.Fatalf("loads for %v not found in store", testLocality)
679	}
680	reqStats := localityData.RequestStats
681	if reqStats.Succeeded != successCount {
682		t.Errorf("got succeeded %v, want %v", reqStats.Succeeded, successCount)
683	}
684	if reqStats.Errored != errorCount {
685		t.Errorf("got errord %v, want %v", reqStats.Errored, errorCount)
686	}
687	if reqStats.InProgress != 0 {
688		t.Errorf("got inProgress %v, want %v", reqStats.InProgress, 0)
689	}
690
691	b.Close()
692	if err := xdsC.WaitForCancelReportLoad(ctx); err != nil {
693		t.Fatalf("unexpected error waiting form load report to be canceled: %v", err)
694	}
695}
696
697// TestUpdateLRSServer covers the cases
698// - the init config specifies "" as the LRS server
699// - config modifies LRS server to a different string
700// - config sets LRS server to nil to stop load reporting
701func (s) TestUpdateLRSServer(t *testing.T) {
702	var testLocality = xdsinternal.LocalityID{
703		Region:  "test-region",
704		Zone:    "test-zone",
705		SubZone: "test-sub-zone",
706	}
707
708	xdsC := fakeclient.NewClient()
709	defer xdsC.Close()
710
711	builder := balancer.Get(Name)
712	cc := testutils.NewTestClientConn(t)
713	b := builder.Build(cc, balancer.BuildOptions{})
714	defer b.Close()
715
716	addrs := make([]resolver.Address, len(testBackendAddrs))
717	for i, a := range testBackendAddrs {
718		addrs[i] = xdsinternal.SetLocalityID(a, testLocality)
719	}
720	if err := b.UpdateClientConnState(balancer.ClientConnState{
721		ResolverState: xdsclient.SetClient(resolver.State{Addresses: addrs}, xdsC),
722		BalancerConfig: &LBConfig{
723			Cluster:                 testClusterName,
724			EDSServiceName:          testServiceName,
725			LoadReportingServerName: newString(""),
726			ChildPolicy: &internalserviceconfig.BalancerConfig{
727				Name: roundrobin.Name,
728			},
729		},
730	}); err != nil {
731		t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
732	}
733
734	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
735	defer cancel()
736
737	got, err := xdsC.WaitForReportLoad(ctx)
738	if err != nil {
739		t.Fatalf("xdsClient.ReportLoad failed with error: %v", err)
740	}
741	if got.Server != "" {
742		t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got.Server, "")
743	}
744
745	// Update LRS server to a different name.
746	if err := b.UpdateClientConnState(balancer.ClientConnState{
747		ResolverState: xdsclient.SetClient(resolver.State{Addresses: addrs}, xdsC),
748		BalancerConfig: &LBConfig{
749			Cluster:                 testClusterName,
750			EDSServiceName:          testServiceName,
751			LoadReportingServerName: newString(testLRSServerName),
752			ChildPolicy: &internalserviceconfig.BalancerConfig{
753				Name: roundrobin.Name,
754			},
755		},
756	}); err != nil {
757		t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
758	}
759	if err := xdsC.WaitForCancelReportLoad(ctx); err != nil {
760		t.Fatalf("unexpected error waiting form load report to be canceled: %v", err)
761	}
762	got2, err2 := xdsC.WaitForReportLoad(ctx)
763	if err2 != nil {
764		t.Fatalf("xdsClient.ReportLoad failed with error: %v", err2)
765	}
766	if got2.Server != testLRSServerName {
767		t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got2.Server, testLRSServerName)
768	}
769
770	// Update LRS server to nil, to disable LRS.
771	if err := b.UpdateClientConnState(balancer.ClientConnState{
772		ResolverState: xdsclient.SetClient(resolver.State{Addresses: addrs}, xdsC),
773		BalancerConfig: &LBConfig{
774			Cluster:                 testClusterName,
775			EDSServiceName:          testServiceName,
776			LoadReportingServerName: nil,
777			ChildPolicy: &internalserviceconfig.BalancerConfig{
778				Name: roundrobin.Name,
779			},
780		},
781	}); err != nil {
782		t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
783	}
784	if err := xdsC.WaitForCancelReportLoad(ctx); err != nil {
785		t.Fatalf("unexpected error waiting form load report to be canceled: %v", err)
786	}
787
788	shortCtx, shortCancel := context.WithTimeout(context.Background(), defaultShortTestTimeout)
789	defer shortCancel()
790	if s, err := xdsC.WaitForReportLoad(shortCtx); err != context.DeadlineExceeded {
791		t.Fatalf("unexpected load report to server: %q", s)
792	}
793}
794
795func assertString(f func() (string, error)) string {
796	s, err := f()
797	if err != nil {
798		panic(err.Error())
799	}
800	return s
801}
802