1// +build go1.12
2
3/*
4 *
5 * Copyright 2019 gRPC authors.
6 *
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 *     http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 *
19 */
20
21package edsbalancer
22
23import (
24	"bytes"
25	"context"
26	"encoding/json"
27	"fmt"
28	"reflect"
29	"testing"
30	"time"
31
32	"github.com/golang/protobuf/jsonpb"
33	wrapperspb "github.com/golang/protobuf/ptypes/wrappers"
34	"github.com/google/go-cmp/cmp"
35	"google.golang.org/grpc/balancer"
36	"google.golang.org/grpc/connectivity"
37	"google.golang.org/grpc/internal/grpclog"
38	"google.golang.org/grpc/internal/grpctest"
39	scpb "google.golang.org/grpc/internal/proto/grpc_service_config"
40	"google.golang.org/grpc/internal/testutils"
41	"google.golang.org/grpc/resolver"
42	"google.golang.org/grpc/serviceconfig"
43	"google.golang.org/grpc/xds/internal"
44	xdsclient "google.golang.org/grpc/xds/internal/client"
45	"google.golang.org/grpc/xds/internal/client/load"
46	"google.golang.org/grpc/xds/internal/testutils/fakeclient"
47
48	_ "google.golang.org/grpc/xds/internal/client/v2" // V2 client registration.
49)
50
51const (
52	defaultTestTimeout      = 1 * time.Second
53	defaultTestShortTimeout = 10 * time.Millisecond
54	testServiceName         = "test/foo"
55	testEDSClusterName      = "test/service/eds"
56)
57
58var (
59	// A non-empty endpoints update which is expected to be accepted by the EDS
60	// LB policy.
61	defaultEndpointsUpdate = xdsclient.EndpointsUpdate{
62		Localities: []xdsclient.Locality{
63			{
64				Endpoints: []xdsclient.Endpoint{{Address: "endpoint1"}},
65				ID:        internal.LocalityID{Zone: "zone"},
66				Priority:  1,
67				Weight:    100,
68			},
69		},
70	}
71)
72
73func init() {
74	balancer.Register(&edsBalancerBuilder{})
75}
76
77func subConnFromPicker(p balancer.Picker) func() balancer.SubConn {
78	return func() balancer.SubConn {
79		scst, _ := p.Pick(balancer.PickInfo{})
80		return scst.SubConn
81	}
82}
83
84type s struct {
85	grpctest.Tester
86}
87
88func Test(t *testing.T) {
89	grpctest.RunSubTests(t, s{})
90}
91
92const testBalancerNameFooBar = "foo.bar"
93
94func newNoopTestClientConn() *noopTestClientConn {
95	return &noopTestClientConn{}
96}
97
98// noopTestClientConn is used in EDS balancer config update tests that only
99// cover the config update handling, but not SubConn/load-balancing.
100type noopTestClientConn struct {
101	balancer.ClientConn
102}
103
104func (t *noopTestClientConn) NewSubConn([]resolver.Address, balancer.NewSubConnOptions) (balancer.SubConn, error) {
105	return nil, nil
106}
107
108func (noopTestClientConn) Target() string { return testServiceName }
109
110type scStateChange struct {
111	sc    balancer.SubConn
112	state connectivity.State
113}
114
115type fakeEDSBalancer struct {
116	cc                 balancer.ClientConn
117	childPolicy        *testutils.Channel
118	subconnStateChange *testutils.Channel
119	edsUpdate          *testutils.Channel
120	serviceName        *testutils.Channel
121	serviceRequestMax  *testutils.Channel
122	clusterName        *testutils.Channel
123}
124
125func (f *fakeEDSBalancer) handleSubConnStateChange(sc balancer.SubConn, state connectivity.State) {
126	f.subconnStateChange.Send(&scStateChange{sc: sc, state: state})
127}
128
129func (f *fakeEDSBalancer) handleChildPolicy(name string, config json.RawMessage) {
130	f.childPolicy.Send(&loadBalancingConfig{Name: name, Config: config})
131}
132
133func (f *fakeEDSBalancer) handleEDSResponse(edsResp xdsclient.EndpointsUpdate) {
134	f.edsUpdate.Send(edsResp)
135}
136
137func (f *fakeEDSBalancer) updateState(priority priorityType, s balancer.State) {}
138
139func (f *fakeEDSBalancer) updateServiceRequestsConfig(serviceName string, max *uint32) {
140	f.serviceName.Send(serviceName)
141	f.serviceRequestMax.Send(max)
142}
143
144func (f *fakeEDSBalancer) updateClusterName(name string) {
145	f.clusterName.Send(name)
146}
147
148func (f *fakeEDSBalancer) close() {}
149
150func (f *fakeEDSBalancer) waitForChildPolicy(ctx context.Context, wantPolicy *loadBalancingConfig) error {
151	val, err := f.childPolicy.Receive(ctx)
152	if err != nil {
153		return err
154	}
155	gotPolicy := val.(*loadBalancingConfig)
156	if !cmp.Equal(gotPolicy, wantPolicy) {
157		return fmt.Errorf("got childPolicy %v, want %v", gotPolicy, wantPolicy)
158	}
159	return nil
160}
161
162func (f *fakeEDSBalancer) waitForSubConnStateChange(ctx context.Context, wantState *scStateChange) error {
163	val, err := f.subconnStateChange.Receive(ctx)
164	if err != nil {
165		return err
166	}
167	gotState := val.(*scStateChange)
168	if !cmp.Equal(gotState, wantState, cmp.AllowUnexported(scStateChange{})) {
169		return fmt.Errorf("got subconnStateChange %v, want %v", gotState, wantState)
170	}
171	return nil
172}
173
174func (f *fakeEDSBalancer) waitForEDSResponse(ctx context.Context, wantUpdate xdsclient.EndpointsUpdate) error {
175	val, err := f.edsUpdate.Receive(ctx)
176	if err != nil {
177		return err
178	}
179	gotUpdate := val.(xdsclient.EndpointsUpdate)
180	if !reflect.DeepEqual(gotUpdate, wantUpdate) {
181		return fmt.Errorf("got edsUpdate %+v, want %+v", gotUpdate, wantUpdate)
182	}
183	return nil
184}
185
186func (f *fakeEDSBalancer) waitForCounterUpdate(ctx context.Context, wantServiceName string) error {
187	val, err := f.serviceName.Receive(ctx)
188	if err != nil {
189		return err
190	}
191	gotServiceName := val.(string)
192	if gotServiceName != wantServiceName {
193		return fmt.Errorf("got serviceName %v, want %v", gotServiceName, wantServiceName)
194	}
195	return nil
196}
197
198func (f *fakeEDSBalancer) waitForCountMaxUpdate(ctx context.Context, want *uint32) error {
199	val, err := f.serviceRequestMax.Receive(ctx)
200	if err != nil {
201		return err
202	}
203	got := val.(*uint32)
204
205	if got == nil && want == nil {
206		return nil
207	}
208	if got != nil && want != nil {
209		if *got != *want {
210			return fmt.Errorf("got countMax %v, want %v", *got, *want)
211		}
212		return nil
213	}
214	return fmt.Errorf("got countMax %+v, want %+v", got, want)
215}
216
217func (f *fakeEDSBalancer) waitForClusterNameUpdate(ctx context.Context, wantClusterName string) error {
218	val, err := f.clusterName.Receive(ctx)
219	if err != nil {
220		return err
221	}
222	gotServiceName := val.(string)
223	if gotServiceName != wantClusterName {
224		return fmt.Errorf("got clusterName %v, want %v", gotServiceName, wantClusterName)
225	}
226	return nil
227}
228
229func newFakeEDSBalancer(cc balancer.ClientConn) edsBalancerImplInterface {
230	return &fakeEDSBalancer{
231		cc:                 cc,
232		childPolicy:        testutils.NewChannelWithSize(10),
233		subconnStateChange: testutils.NewChannelWithSize(10),
234		edsUpdate:          testutils.NewChannelWithSize(10),
235		serviceName:        testutils.NewChannelWithSize(10),
236		serviceRequestMax:  testutils.NewChannelWithSize(10),
237		clusterName:        testutils.NewChannelWithSize(10),
238	}
239}
240
241type fakeSubConn struct{}
242
243func (*fakeSubConn) UpdateAddresses([]resolver.Address) { panic("implement me") }
244func (*fakeSubConn) Connect()                           { panic("implement me") }
245
246// waitForNewEDSLB makes sure that a new edsLB is created by the top-level
247// edsBalancer.
248func waitForNewEDSLB(ctx context.Context, ch *testutils.Channel) (*fakeEDSBalancer, error) {
249	val, err := ch.Receive(ctx)
250	if err != nil {
251		return nil, fmt.Errorf("error when waiting for a new edsLB: %v", err)
252	}
253	return val.(*fakeEDSBalancer), nil
254}
255
256// setup overrides the functions which are used to create the xdsClient and the
257// edsLB, creates fake version of them and makes them available on the provided
258// channels. The returned cancel function should be called by the test for
259// cleanup.
260func setup(edsLBCh *testutils.Channel) (*fakeclient.Client, func()) {
261	xdsC := fakeclient.NewClientWithName(testBalancerNameFooBar)
262	oldNewXDSClient := newXDSClient
263	newXDSClient = func() (xdsClientInterface, error) { return xdsC, nil }
264
265	origNewEDSBalancer := newEDSBalancer
266	newEDSBalancer = func(cc balancer.ClientConn, _ balancer.BuildOptions, _ func(priorityType, balancer.State), _ load.PerClusterReporter, _ *grpclog.PrefixLogger) edsBalancerImplInterface {
267		edsLB := newFakeEDSBalancer(cc)
268		defer func() { edsLBCh.Send(edsLB) }()
269		return edsLB
270	}
271	return xdsC, func() {
272		newEDSBalancer = origNewEDSBalancer
273		newXDSClient = oldNewXDSClient
274	}
275}
276
277const (
278	fakeBalancerA = "fake_balancer_A"
279	fakeBalancerB = "fake_balancer_B"
280)
281
282// Install two fake balancers for service config update tests.
283//
284// ParseConfig only accepts the json if the balancer specified is registered.
285func init() {
286	balancer.Register(&fakeBalancerBuilder{name: fakeBalancerA})
287	balancer.Register(&fakeBalancerBuilder{name: fakeBalancerB})
288}
289
290type fakeBalancerBuilder struct {
291	name string
292}
293
294func (b *fakeBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
295	return &fakeBalancer{cc: cc}
296}
297
298func (b *fakeBalancerBuilder) Name() string {
299	return b.name
300}
301
302type fakeBalancer struct {
303	cc balancer.ClientConn
304}
305
306func (b *fakeBalancer) ResolverError(error) {
307	panic("implement me")
308}
309
310func (b *fakeBalancer) UpdateClientConnState(balancer.ClientConnState) error {
311	panic("implement me")
312}
313
314func (b *fakeBalancer) UpdateSubConnState(balancer.SubConn, balancer.SubConnState) {
315	panic("implement me")
316}
317
318func (b *fakeBalancer) Close() {}
319
320// TestConfigChildPolicyUpdate verifies scenarios where the childPolicy
321// section of the lbConfig is updated.
322//
323// The test does the following:
324// * Builds a new EDS balancer.
325// * Pushes a new ClientConnState with a childPolicy set to fakeBalancerA.
326//   Verifies that an EDS watch is registered. It then pushes a new edsUpdate
327//   through the fakexds client. Verifies that a new edsLB is created and it
328//   receives the expected childPolicy.
329// * Pushes a new ClientConnState with a childPolicy set to fakeBalancerB.
330//   Verifies that the existing edsLB receives the new child policy.
331func (s) TestConfigChildPolicyUpdate(t *testing.T) {
332	edsLBCh := testutils.NewChannel()
333	xdsC, cleanup := setup(edsLBCh)
334	defer cleanup()
335
336	builder := balancer.Get(edsName)
337	edsB := builder.Build(newNoopTestClientConn(), balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}})
338	if edsB == nil {
339		t.Fatalf("builder.Build(%s) failed and returned nil", edsName)
340	}
341	defer edsB.Close()
342
343	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
344	defer cancel()
345	edsLB, err := waitForNewEDSLB(ctx, edsLBCh)
346	if err != nil {
347		t.Fatal(err)
348	}
349
350	lbCfgA := &loadBalancingConfig{
351		Name:   fakeBalancerA,
352		Config: json.RawMessage("{}"),
353	}
354	if err := edsB.UpdateClientConnState(balancer.ClientConnState{
355		BalancerConfig: &EDSConfig{
356			ChildPolicy:    lbCfgA,
357			EDSServiceName: testServiceName,
358		},
359	}); err != nil {
360		t.Fatalf("edsB.UpdateClientConnState() failed: %v", err)
361	}
362
363	if _, err := xdsC.WaitForWatchEDS(ctx); err != nil {
364		t.Fatalf("xdsClient.WatchEndpoints failed with error: %v", err)
365	}
366	xdsC.InvokeWatchEDSCallback(defaultEndpointsUpdate, nil)
367	if err := edsLB.waitForChildPolicy(ctx, lbCfgA); err != nil {
368		t.Fatal(err)
369	}
370	if err := edsLB.waitForCounterUpdate(ctx, testServiceName); err != nil {
371		t.Fatal(err)
372	}
373	if err := edsLB.waitForCountMaxUpdate(ctx, nil); err != nil {
374		t.Fatal(err)
375	}
376
377	var testCountMax uint32 = 100
378	lbCfgB := &loadBalancingConfig{
379		Name:   fakeBalancerB,
380		Config: json.RawMessage("{}"),
381	}
382	if err := edsB.UpdateClientConnState(balancer.ClientConnState{
383		BalancerConfig: &EDSConfig{
384			ChildPolicy:           lbCfgB,
385			EDSServiceName:        testServiceName,
386			MaxConcurrentRequests: &testCountMax,
387		},
388	}); err != nil {
389		t.Fatalf("edsB.UpdateClientConnState() failed: %v", err)
390	}
391	if err := edsLB.waitForChildPolicy(ctx, lbCfgB); err != nil {
392		t.Fatal(err)
393	}
394	if err := edsLB.waitForCounterUpdate(ctx, testServiceName); err != nil {
395		// Counter is updated even though the service name didn't change. The
396		// eds_impl will compare the service names, and skip if it didn't change.
397		t.Fatal(err)
398	}
399	if err := edsLB.waitForCountMaxUpdate(ctx, &testCountMax); err != nil {
400		t.Fatal(err)
401	}
402}
403
404// TestSubConnStateChange verifies if the top-level edsBalancer passes on
405// the subConnStateChange to appropriate child balancer.
406func (s) TestSubConnStateChange(t *testing.T) {
407	edsLBCh := testutils.NewChannel()
408	xdsC, cleanup := setup(edsLBCh)
409	defer cleanup()
410
411	builder := balancer.Get(edsName)
412	edsB := builder.Build(newNoopTestClientConn(), balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}})
413	if edsB == nil {
414		t.Fatalf("builder.Build(%s) failed and returned nil", edsName)
415	}
416	defer edsB.Close()
417
418	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
419	defer cancel()
420	edsLB, err := waitForNewEDSLB(ctx, edsLBCh)
421	if err != nil {
422		t.Fatal(err)
423	}
424
425	if err := edsB.UpdateClientConnState(balancer.ClientConnState{
426		BalancerConfig: &EDSConfig{EDSServiceName: testServiceName},
427	}); err != nil {
428		t.Fatalf("edsB.UpdateClientConnState() failed: %v", err)
429	}
430
431	if _, err := xdsC.WaitForWatchEDS(ctx); err != nil {
432		t.Fatalf("xdsClient.WatchEndpoints failed with error: %v", err)
433	}
434	xdsC.InvokeWatchEDSCallback(defaultEndpointsUpdate, nil)
435
436	fsc := &fakeSubConn{}
437	state := connectivity.Ready
438	edsB.UpdateSubConnState(fsc, balancer.SubConnState{ConnectivityState: state})
439	if err := edsLB.waitForSubConnStateChange(ctx, &scStateChange{sc: fsc, state: state}); err != nil {
440		t.Fatal(err)
441	}
442}
443
444// TestErrorFromXDSClientUpdate verifies that an error from xdsClient update is
445// handled correctly.
446//
447// If it's resource-not-found, watch will NOT be canceled, the EDS impl will
448// receive an empty EDS update, and new RPCs will fail.
449//
450// If it's connection error, nothing will happen. This will need to change to
451// handle fallback.
452func (s) TestErrorFromXDSClientUpdate(t *testing.T) {
453	edsLBCh := testutils.NewChannel()
454	xdsC, cleanup := setup(edsLBCh)
455	defer cleanup()
456
457	builder := balancer.Get(edsName)
458	edsB := builder.Build(newNoopTestClientConn(), balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}})
459	if edsB == nil {
460		t.Fatalf("builder.Build(%s) failed and returned nil", edsName)
461	}
462	defer edsB.Close()
463
464	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
465	defer cancel()
466	edsLB, err := waitForNewEDSLB(ctx, edsLBCh)
467	if err != nil {
468		t.Fatal(err)
469	}
470
471	if err := edsB.UpdateClientConnState(balancer.ClientConnState{
472		BalancerConfig: &EDSConfig{EDSServiceName: testServiceName},
473	}); err != nil {
474		t.Fatal(err)
475	}
476
477	if _, err := xdsC.WaitForWatchEDS(ctx); err != nil {
478		t.Fatalf("xdsClient.WatchEndpoints failed with error: %v", err)
479	}
480	xdsC.InvokeWatchEDSCallback(xdsclient.EndpointsUpdate{}, nil)
481	if err := edsLB.waitForEDSResponse(ctx, xdsclient.EndpointsUpdate{}); err != nil {
482		t.Fatalf("EDS impl got unexpected EDS response: %v", err)
483	}
484
485	connectionErr := xdsclient.NewErrorf(xdsclient.ErrorTypeConnection, "connection error")
486	xdsC.InvokeWatchEDSCallback(xdsclient.EndpointsUpdate{}, connectionErr)
487
488	sCtx, sCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
489	defer sCancel()
490	if err := xdsC.WaitForCancelEDSWatch(sCtx); err != context.DeadlineExceeded {
491		t.Fatal("watch was canceled, want not canceled (timeout error)")
492	}
493
494	sCtx, sCancel = context.WithTimeout(context.Background(), defaultTestShortTimeout)
495	defer sCancel()
496	if err := edsLB.waitForEDSResponse(sCtx, xdsclient.EndpointsUpdate{}); err != context.DeadlineExceeded {
497		t.Fatal(err)
498	}
499
500	resourceErr := xdsclient.NewErrorf(xdsclient.ErrorTypeResourceNotFound, "edsBalancer resource not found error")
501	xdsC.InvokeWatchEDSCallback(xdsclient.EndpointsUpdate{}, resourceErr)
502	// Even if error is resource not found, watch shouldn't be canceled, because
503	// this is an EDS resource removed (and xds client actually never sends this
504	// error, but we still handles it).
505	sCtx, sCancel = context.WithTimeout(context.Background(), defaultTestShortTimeout)
506	defer sCancel()
507	if err := xdsC.WaitForCancelEDSWatch(sCtx); err != context.DeadlineExceeded {
508		t.Fatal("watch was canceled, want not canceled (timeout error)")
509	}
510	if err := edsLB.waitForEDSResponse(ctx, xdsclient.EndpointsUpdate{}); err != nil {
511		t.Fatalf("eds impl expecting empty update, got %v", err)
512	}
513}
514
515// TestErrorFromResolver verifies that resolver errors are handled correctly.
516//
517// If it's resource-not-found, watch will be canceled, the EDS impl will receive
518// an empty EDS update, and new RPCs will fail.
519//
520// If it's connection error, nothing will happen. This will need to change to
521// handle fallback.
522func (s) TestErrorFromResolver(t *testing.T) {
523	edsLBCh := testutils.NewChannel()
524	xdsC, cleanup := setup(edsLBCh)
525	defer cleanup()
526
527	builder := balancer.Get(edsName)
528	edsB := builder.Build(newNoopTestClientConn(), balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}})
529	if edsB == nil {
530		t.Fatalf("builder.Build(%s) failed and returned nil", edsName)
531	}
532	defer edsB.Close()
533
534	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
535	defer cancel()
536	edsLB, err := waitForNewEDSLB(ctx, edsLBCh)
537	if err != nil {
538		t.Fatal(err)
539	}
540
541	if err := edsB.UpdateClientConnState(balancer.ClientConnState{
542		BalancerConfig: &EDSConfig{EDSServiceName: testServiceName},
543	}); err != nil {
544		t.Fatal(err)
545	}
546
547	if _, err := xdsC.WaitForWatchEDS(ctx); err != nil {
548		t.Fatalf("xdsClient.WatchEndpoints failed with error: %v", err)
549	}
550	xdsC.InvokeWatchEDSCallback(xdsclient.EndpointsUpdate{}, nil)
551	if err := edsLB.waitForEDSResponse(ctx, xdsclient.EndpointsUpdate{}); err != nil {
552		t.Fatalf("EDS impl got unexpected EDS response: %v", err)
553	}
554
555	connectionErr := xdsclient.NewErrorf(xdsclient.ErrorTypeConnection, "connection error")
556	edsB.ResolverError(connectionErr)
557
558	sCtx, sCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
559	defer sCancel()
560	if err := xdsC.WaitForCancelEDSWatch(sCtx); err != context.DeadlineExceeded {
561		t.Fatal("watch was canceled, want not canceled (timeout error)")
562	}
563
564	sCtx, sCancel = context.WithTimeout(context.Background(), defaultTestShortTimeout)
565	defer sCancel()
566	if err := edsLB.waitForEDSResponse(sCtx, xdsclient.EndpointsUpdate{}); err != context.DeadlineExceeded {
567		t.Fatal("eds impl got EDS resp, want timeout error")
568	}
569
570	resourceErr := xdsclient.NewErrorf(xdsclient.ErrorTypeResourceNotFound, "edsBalancer resource not found error")
571	edsB.ResolverError(resourceErr)
572	if err := xdsC.WaitForCancelEDSWatch(ctx); err != nil {
573		t.Fatalf("want watch to be canceled, waitForCancel failed: %v", err)
574	}
575	if err := edsLB.waitForEDSResponse(ctx, xdsclient.EndpointsUpdate{}); err != nil {
576		t.Fatalf("EDS impl got unexpected EDS response: %v", err)
577	}
578}
579
580// Given a list of resource names, verifies that EDS requests for the same are
581// sent by the EDS balancer, through the fake xDS client.
582func verifyExpectedRequests(ctx context.Context, fc *fakeclient.Client, resourceNames ...string) error {
583	for _, name := range resourceNames {
584		if name == "" {
585			// ResourceName empty string indicates a cancel.
586			if err := fc.WaitForCancelEDSWatch(ctx); err != nil {
587				return fmt.Errorf("timed out when expecting resource %q", name)
588			}
589			return nil
590		}
591
592		resName, err := fc.WaitForWatchEDS(ctx)
593		if err != nil {
594			return fmt.Errorf("timed out when expecting resource %q, %p", name, fc)
595		}
596		if resName != name {
597			return fmt.Errorf("got EDS request for resource %q, expected: %q", resName, name)
598		}
599	}
600	return nil
601}
602
603// TestClientWatchEDS verifies that the xdsClient inside the top-level EDS LB
604// policy registers an EDS watch for expected resource upon receiving an update
605// from gRPC.
606func (s) TestClientWatchEDS(t *testing.T) {
607	edsLBCh := testutils.NewChannel()
608	xdsC, cleanup := setup(edsLBCh)
609	defer cleanup()
610
611	builder := balancer.Get(edsName)
612	edsB := builder.Build(newNoopTestClientConn(), balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}})
613	if edsB == nil {
614		t.Fatalf("builder.Build(%s) failed and returned nil", edsName)
615	}
616	defer edsB.Close()
617
618	// Update with an non-empty edsServiceName should trigger an EDS watch for
619	// the same.
620	if err := edsB.UpdateClientConnState(balancer.ClientConnState{
621		BalancerConfig: &EDSConfig{EDSServiceName: "foobar-1"},
622	}); err != nil {
623		t.Fatal(err)
624	}
625	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
626	defer cancel()
627	if err := verifyExpectedRequests(ctx, xdsC, "foobar-1"); err != nil {
628		t.Fatal(err)
629	}
630
631	// Also test the case where the edsServerName changes from one non-empty
632	// name to another, and make sure a new watch is registered. The previously
633	// registered watch will be cancelled, which will result in an EDS request
634	// with no resource names being sent to the server.
635	if err := edsB.UpdateClientConnState(balancer.ClientConnState{
636		BalancerConfig: &EDSConfig{EDSServiceName: "foobar-2"},
637	}); err != nil {
638		t.Fatal(err)
639	}
640	if err := verifyExpectedRequests(ctx, xdsC, "", "foobar-2"); err != nil {
641		t.Fatal(err)
642	}
643}
644
645// TestCounterUpdate verifies that the counter update is triggered with the
646// service name from an update's config.
647func (s) TestCounterUpdate(t *testing.T) {
648	edsLBCh := testutils.NewChannel()
649	_, cleanup := setup(edsLBCh)
650	defer cleanup()
651
652	builder := balancer.Get(edsName)
653	edsB := builder.Build(newNoopTestClientConn(), balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}})
654	if edsB == nil {
655		t.Fatalf("builder.Build(%s) failed and returned nil", edsName)
656	}
657	defer edsB.Close()
658
659	var testCountMax uint32 = 100
660	// Update should trigger counter update with provided service name.
661	if err := edsB.UpdateClientConnState(balancer.ClientConnState{
662		BalancerConfig: &EDSConfig{
663			EDSServiceName:        "foobar-1",
664			MaxConcurrentRequests: &testCountMax,
665		},
666	}); err != nil {
667		t.Fatal(err)
668	}
669	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
670	defer cancel()
671	edsI := edsB.(*edsBalancer).edsImpl.(*fakeEDSBalancer)
672	if err := edsI.waitForCounterUpdate(ctx, "foobar-1"); err != nil {
673		t.Fatal(err)
674	}
675	if err := edsI.waitForCountMaxUpdate(ctx, &testCountMax); err != nil {
676		t.Fatal(err)
677	}
678}
679
680// TestClusterNameUpdateInAddressAttributes verifies that cluster name update in
681// edsImpl is triggered with the update from a new service config.
682func (s) TestClusterNameUpdateInAddressAttributes(t *testing.T) {
683	edsLBCh := testutils.NewChannel()
684	xdsC, cleanup := setup(edsLBCh)
685	defer cleanup()
686
687	builder := balancer.Get(edsName)
688	edsB := builder.Build(newNoopTestClientConn(), balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}})
689	if edsB == nil {
690		t.Fatalf("builder.Build(%s) failed and returned nil", edsName)
691	}
692	defer edsB.Close()
693
694	// Update should trigger counter update with provided service name.
695	if err := edsB.UpdateClientConnState(balancer.ClientConnState{
696		BalancerConfig: &EDSConfig{
697			EDSServiceName: "foobar-1",
698		},
699	}); err != nil {
700		t.Fatal(err)
701	}
702	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
703	defer cancel()
704	gotCluster, err := xdsC.WaitForWatchEDS(ctx)
705	if err != nil || gotCluster != "foobar-1" {
706		t.Fatalf("unexpected EDS watch: %v, %v", gotCluster, err)
707	}
708	edsI := edsB.(*edsBalancer).edsImpl.(*fakeEDSBalancer)
709	if err := edsI.waitForClusterNameUpdate(ctx, "foobar-1"); err != nil {
710		t.Fatal(err)
711	}
712
713	// Update should trigger counter update with provided service name.
714	if err := edsB.UpdateClientConnState(balancer.ClientConnState{
715		BalancerConfig: &EDSConfig{
716			EDSServiceName: "foobar-2",
717		},
718	}); err != nil {
719		t.Fatal(err)
720	}
721	if err := xdsC.WaitForCancelEDSWatch(ctx); err != nil {
722		t.Fatalf("failed to wait for EDS cancel: %v", err)
723	}
724	gotCluster2, err := xdsC.WaitForWatchEDS(ctx)
725	if err != nil || gotCluster2 != "foobar-2" {
726		t.Fatalf("unexpected EDS watch: %v, %v", gotCluster2, err)
727	}
728	if err := edsI.waitForClusterNameUpdate(ctx, "foobar-2"); err != nil {
729		t.Fatal(err)
730	}
731}
732
733func (s) TestBalancerConfigParsing(t *testing.T) {
734	const testEDSName = "eds.service"
735	var testLRSName = "lrs.server"
736	b := bytes.NewBuffer(nil)
737	if err := (&jsonpb.Marshaler{}).Marshal(b, &scpb.XdsConfig{
738		ChildPolicy: []*scpb.LoadBalancingConfig{
739			{Policy: &scpb.LoadBalancingConfig_Xds{}},
740			{Policy: &scpb.LoadBalancingConfig_RoundRobin{
741				RoundRobin: &scpb.RoundRobinConfig{},
742			}},
743		},
744		FallbackPolicy: []*scpb.LoadBalancingConfig{
745			{Policy: &scpb.LoadBalancingConfig_Xds{}},
746			{Policy: &scpb.LoadBalancingConfig_PickFirst{
747				PickFirst: &scpb.PickFirstConfig{},
748			}},
749		},
750		EdsServiceName:             testEDSName,
751		LrsLoadReportingServerName: &wrapperspb.StringValue{Value: testLRSName},
752	}); err != nil {
753		t.Fatalf("%v", err)
754	}
755
756	var testMaxConcurrentRequests uint32 = 123
757	tests := []struct {
758		name    string
759		js      json.RawMessage
760		want    serviceconfig.LoadBalancingConfig
761		wantErr bool
762	}{
763		{
764			name:    "bad json",
765			js:      json.RawMessage(`i am not JSON`),
766			wantErr: true,
767		},
768		{
769			name: "empty",
770			js:   json.RawMessage(`{}`),
771			want: &EDSConfig{},
772		},
773		{
774			name: "jsonpb-generated",
775			js:   b.Bytes(),
776			want: &EDSConfig{
777				ChildPolicy: &loadBalancingConfig{
778					Name:   "round_robin",
779					Config: json.RawMessage("{}"),
780				},
781				FallBackPolicy: &loadBalancingConfig{
782					Name:   "pick_first",
783					Config: json.RawMessage("{}"),
784				},
785				EDSServiceName:             testEDSName,
786				LrsLoadReportingServerName: &testLRSName,
787			},
788		},
789		{
790			// json with random balancers, and the first is not registered.
791			name: "manually-generated",
792			js: json.RawMessage(`
793{
794  "childPolicy": [
795    {"fake_balancer_C": {}},
796    {"fake_balancer_A": {}},
797    {"fake_balancer_B": {}}
798  ],
799  "fallbackPolicy": [
800    {"fake_balancer_C": {}},
801    {"fake_balancer_B": {}},
802    {"fake_balancer_A": {}}
803  ],
804  "edsServiceName": "eds.service",
805  "maxConcurrentRequests": 123,
806  "lrsLoadReportingServerName": "lrs.server"
807}`),
808			want: &EDSConfig{
809				ChildPolicy: &loadBalancingConfig{
810					Name:   "fake_balancer_A",
811					Config: json.RawMessage("{}"),
812				},
813				FallBackPolicy: &loadBalancingConfig{
814					Name:   "fake_balancer_B",
815					Config: json.RawMessage("{}"),
816				},
817				EDSServiceName:             testEDSName,
818				MaxConcurrentRequests:      &testMaxConcurrentRequests,
819				LrsLoadReportingServerName: &testLRSName,
820			},
821		},
822		{
823			// json with no lrs server name, LoadReportingServerName should
824			// be nil (not an empty string).
825			name: "no-lrs-server-name",
826			js: json.RawMessage(`
827{
828  "edsServiceName": "eds.service"
829}`),
830			want: &EDSConfig{
831				EDSServiceName:             testEDSName,
832				LrsLoadReportingServerName: nil,
833			},
834		},
835		{
836			name: "good child policy",
837			js:   json.RawMessage(`{"childPolicy":[{"pick_first":{}}]}`),
838			want: &EDSConfig{
839				ChildPolicy: &loadBalancingConfig{
840					Name:   "pick_first",
841					Config: json.RawMessage(`{}`),
842				},
843			},
844		},
845		{
846			name: "multiple good child policies",
847			js:   json.RawMessage(`{"childPolicy":[{"round_robin":{}},{"pick_first":{}}]}`),
848			want: &EDSConfig{
849				ChildPolicy: &loadBalancingConfig{
850					Name:   "round_robin",
851					Config: json.RawMessage(`{}`),
852				},
853			},
854		},
855	}
856	for _, tt := range tests {
857		t.Run(tt.name, func(t *testing.T) {
858			b := &edsBalancerBuilder{}
859			got, err := b.ParseConfig(tt.js)
860			if (err != nil) != tt.wantErr {
861				t.Fatalf("edsBalancerBuilder.ParseConfig() error = %v, wantErr %v", err, tt.wantErr)
862			}
863			if tt.wantErr {
864				return
865			}
866			if !cmp.Equal(got, tt.want) {
867				t.Errorf(cmp.Diff(got, tt.want))
868			}
869		})
870	}
871}
872
873func (s) TestEqualStringPointers(t *testing.T) {
874	var (
875		ta1 = "test-a"
876		ta2 = "test-a"
877		tb  = "test-b"
878	)
879	tests := []struct {
880		name string
881		a    *string
882		b    *string
883		want bool
884	}{
885		{"both-nil", nil, nil, true},
886		{"a-non-nil", &ta1, nil, false},
887		{"b-non-nil", nil, &tb, false},
888		{"equal", &ta1, &ta2, true},
889		{"different", &ta1, &tb, false},
890	}
891	for _, tt := range tests {
892		t.Run(tt.name, func(t *testing.T) {
893			if got := equalStringPointers(tt.a, tt.b); got != tt.want {
894				t.Errorf("equalStringPointers() = %v, want %v", got, tt.want)
895			}
896		})
897	}
898}
899