1/*
2 *
3 * Copyright 2019 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package balancer
20
21import (
22	"bytes"
23	"encoding/json"
24	"fmt"
25	"reflect"
26	"sync"
27	"testing"
28	"time"
29
30	corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
31	"github.com/golang/protobuf/jsonpb"
32	wrapperspb "github.com/golang/protobuf/ptypes/wrappers"
33	"github.com/google/go-cmp/cmp"
34	"google.golang.org/grpc"
35	"google.golang.org/grpc/balancer"
36	"google.golang.org/grpc/connectivity"
37	"google.golang.org/grpc/internal/grpctest"
38	"google.golang.org/grpc/internal/leakcheck"
39	scpb "google.golang.org/grpc/internal/proto/grpc_service_config"
40	"google.golang.org/grpc/resolver"
41	"google.golang.org/grpc/serviceconfig"
42	"google.golang.org/grpc/xds/internal/balancer/lrs"
43	xdsclient "google.golang.org/grpc/xds/internal/client"
44	"google.golang.org/grpc/xds/internal/client/bootstrap"
45	"google.golang.org/grpc/xds/internal/testutils"
46	"google.golang.org/grpc/xds/internal/testutils/fakexds"
47)
48
49var lbABuilder = &balancerABuilder{}
50
51func init() {
52	balancer.Register(&edsBalancerBuilder{})
53	balancer.Register(lbABuilder)
54	balancer.Register(&balancerBBuilder{})
55
56	bootstrapConfigNew = func() *bootstrap.Config {
57		return &bootstrap.Config{
58			BalancerName: "",
59			Creds:        grpc.WithInsecure(),
60			NodeProto:    &corepb.Node{},
61		}
62	}
63}
64
65type s struct{}
66
67func (s) Teardown(t *testing.T) {
68	leakcheck.Check(t)
69}
70
71func Test(t *testing.T) {
72	grpctest.RunSubTests(t, s{})
73}
74
75const (
76	fakeBalancerA = "fake_balancer_A"
77	fakeBalancerB = "fake_balancer_B"
78)
79
80var (
81	testBalancerNameFooBar = "foo.bar"
82	testLBConfigFooBar     = &XDSConfig{
83		BalancerName:   testBalancerNameFooBar,
84		ChildPolicy:    &loadBalancingConfig{Name: fakeBalancerB},
85		FallBackPolicy: &loadBalancingConfig{Name: fakeBalancerA},
86		EDSServiceName: testEDSClusterName,
87	}
88
89	specialAddrForBalancerA = resolver.Address{Addr: "this.is.balancer.A"}
90	specialAddrForBalancerB = resolver.Address{Addr: "this.is.balancer.B"}
91)
92
93type balancerABuilder struct {
94	mu           sync.Mutex
95	lastBalancer *balancerA
96}
97
98func (b *balancerABuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
99	b.mu.Lock()
100	b.lastBalancer = &balancerA{cc: cc, subconnStateChange: testutils.NewChannelWithSize(10)}
101	b.mu.Unlock()
102	return b.lastBalancer
103}
104
105func (b *balancerABuilder) Name() string {
106	return string(fakeBalancerA)
107}
108
109func (b *balancerABuilder) getLastBalancer() *balancerA {
110	b.mu.Lock()
111	defer b.mu.Unlock()
112	return b.lastBalancer
113}
114
115func (b *balancerABuilder) clearLastBalancer() {
116	b.mu.Lock()
117	defer b.mu.Unlock()
118	b.lastBalancer = nil
119}
120
121type balancerBBuilder struct{}
122
123func (b *balancerBBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
124	return &balancerB{cc: cc}
125}
126
127func (*balancerBBuilder) Name() string {
128	return string(fakeBalancerB)
129}
130
131// A fake balancer implementation which does two things:
132// * Appends a unique address to the list of resolved addresses received before
133//   attempting to create a SubConn.
134// * Makes the received subConn state changes available through a channel, for
135//   the test to inspect.
136type balancerA struct {
137	cc                 balancer.ClientConn
138	subconnStateChange *testutils.Channel
139}
140
141func (b *balancerA) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) {
142	b.subconnStateChange.Send(&scStateChange{sc: sc, state: state})
143}
144
145func (b *balancerA) HandleResolvedAddrs(addrs []resolver.Address, err error) {
146	_, _ = b.cc.NewSubConn(append(addrs, specialAddrForBalancerA), balancer.NewSubConnOptions{})
147}
148
149func (b *balancerA) Close() {}
150
151func (b *balancerA) waitForSubConnStateChange(wantState *scStateChange) error {
152	return waitForSubConnStateChange(b.subconnStateChange, wantState)
153}
154
155// A fake balancer implementation which appends a unique address to the list of
156// resolved addresses received before attempting to create a SubConn.
157type balancerB struct {
158	cc balancer.ClientConn
159}
160
161func (b *balancerB) HandleResolvedAddrs(addrs []resolver.Address, err error) {
162	_, _ = b.cc.NewSubConn(append(addrs, specialAddrForBalancerB), balancer.NewSubConnOptions{})
163}
164
165func (balancerB) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) {
166	panic("implement me")
167}
168func (balancerB) Close() {}
169
170func newTestClientConn() *testClientConn {
171	return &testClientConn{newSubConns: testutils.NewChannelWithSize(10)}
172}
173
174type testClientConn struct {
175	newSubConns *testutils.Channel
176}
177
178func (t *testClientConn) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
179	t.newSubConns.Send(addrs)
180	return nil, nil
181}
182
183func (t *testClientConn) waitForNewSubConns(wantAddrs []resolver.Address) error {
184	val, err := t.newSubConns.Receive()
185	if err != nil {
186		return fmt.Errorf("error waiting for subconns: %v", err)
187	}
188	gotAddrs := val.([]resolver.Address)
189	if !reflect.DeepEqual(gotAddrs, wantAddrs) {
190		return fmt.Errorf("got subconn address %v, want %v", gotAddrs, wantAddrs)
191	}
192	return nil
193}
194
195func (testClientConn) RemoveSubConn(balancer.SubConn)                          {}
196func (testClientConn) UpdateBalancerState(connectivity.State, balancer.Picker) {}
197func (testClientConn) UpdateState(balancer.State)                              {}
198func (testClientConn) ResolveNow(resolver.ResolveNowOptions)                   {}
199func (testClientConn) Target() string                                          { return testServiceName }
200
201type scStateChange struct {
202	sc    balancer.SubConn
203	state connectivity.State
204}
205
206type fakeEDSBalancer struct {
207	cc                 balancer.ClientConn
208	childPolicy        *testutils.Channel
209	subconnStateChange *testutils.Channel
210	loadStore          lrs.Store
211}
212
213func (f *fakeEDSBalancer) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) {
214	f.subconnStateChange.Send(&scStateChange{sc: sc, state: state})
215}
216
217func (f *fakeEDSBalancer) HandleChildPolicy(name string, config json.RawMessage) {
218	f.childPolicy.Send(&loadBalancingConfig{Name: name, Config: config})
219}
220
221func (f *fakeEDSBalancer) Close()                                         {}
222func (f *fakeEDSBalancer) HandleEDSResponse(edsResp *xdsclient.EDSUpdate) {}
223
224func (f *fakeEDSBalancer) waitForChildPolicy(wantPolicy *loadBalancingConfig) error {
225	val, err := f.childPolicy.Receive()
226	if err != nil {
227		return fmt.Errorf("error waiting for childPolicy: %v", err)
228	}
229	gotPolicy := val.(*loadBalancingConfig)
230	if !reflect.DeepEqual(gotPolicy, wantPolicy) {
231		return fmt.Errorf("got childPolicy %v, want %v", gotPolicy, wantPolicy)
232	}
233	return nil
234}
235
236func (f *fakeEDSBalancer) waitForSubConnStateChange(wantState *scStateChange) error {
237	return waitForSubConnStateChange(f.subconnStateChange, wantState)
238}
239
240func waitForSubConnStateChange(ch *testutils.Channel, wantState *scStateChange) error {
241	val, err := ch.Receive()
242	if err != nil {
243		return fmt.Errorf("error waiting for subconnStateChange: %v", err)
244	}
245	gotState := val.(*scStateChange)
246	if !reflect.DeepEqual(gotState, wantState) {
247		return fmt.Errorf("got subconnStateChange %v, want %v", gotState, wantState)
248	}
249	return nil
250}
251
252func newFakeEDSBalancer(cc balancer.ClientConn, loadStore lrs.Store) edsBalancerInterface {
253	return &fakeEDSBalancer{
254		cc:                 cc,
255		childPolicy:        testutils.NewChannelWithSize(10),
256		subconnStateChange: testutils.NewChannelWithSize(10),
257		loadStore:          loadStore,
258	}
259}
260
261type fakeSubConn struct{}
262
263func (*fakeSubConn) UpdateAddresses([]resolver.Address) { panic("implement me") }
264func (*fakeSubConn) Connect()                           { panic("implement me") }
265
266// TestXDSFallbackResolvedAddrs verifies that the fallback balancer specified
267// in the provided lbconfig is initialized, and that it receives the addresses
268// pushed by the resolver.
269//
270// The test does the following:
271// * Builds a new xds balancer.
272// * Since there is no xDS server to respond to requests from the xds client
273//   (created as part of the xds balancer), we expect the fallback policy to
274//   kick in.
275// * Repeatedly pushes new ClientConnState which specifies the same fallback
276//   policy, but a different set of resolved addresses.
277// * The fallback policy is implemented by a fake balancer, which appends a
278//   unique address to the list of addresses it uses to create the SubConn.
279// * We also have a fake ClientConn which verifies that it receives the
280//   expected address list.
281func (s) TestXDSFallbackResolvedAddrs(t *testing.T) {
282	startupTimeout = 500 * time.Millisecond
283	defer func() { startupTimeout = defaultTimeout }()
284
285	builder := balancer.Get(edsName)
286	cc := newTestClientConn()
287	edsB, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}).(*edsBalancer)
288	if !ok {
289		t.Fatalf("builder.Build(%s) returned type {%T}, want {*edsBalancer}", edsName, edsB)
290	}
291	defer edsB.Close()
292
293	tests := []struct {
294		resolvedAddrs []resolver.Address
295		wantAddrs     []resolver.Address
296	}{
297		{
298			resolvedAddrs: []resolver.Address{{Addr: "1.1.1.1:10001"}, {Addr: "2.2.2.2:10002"}},
299			wantAddrs:     []resolver.Address{{Addr: "1.1.1.1:10001"}, {Addr: "2.2.2.2:10002"}, specialAddrForBalancerA},
300		},
301		{
302			resolvedAddrs: []resolver.Address{{Addr: "1.1.1.1:10001"}},
303			wantAddrs:     []resolver.Address{{Addr: "1.1.1.1:10001"}, specialAddrForBalancerA},
304		},
305	}
306	for _, test := range tests {
307		edsB.UpdateClientConnState(balancer.ClientConnState{
308			ResolverState:  resolver.State{Addresses: test.resolvedAddrs},
309			BalancerConfig: testLBConfigFooBar,
310		})
311		if err := cc.waitForNewSubConns(test.wantAddrs); err != nil {
312			t.Fatal(err)
313		}
314	}
315}
316
317// waitForNewXDSClientWithEDSWatch makes sure that a new xdsClient is created
318// with the provided name. It also make sure that the newly created client
319// registers an eds watcher.
320func waitForNewXDSClientWithEDSWatch(t *testing.T, ch *testutils.Channel, wantName string) *fakexds.Client {
321	t.Helper()
322
323	val, err := ch.Receive()
324	if err != nil {
325		t.Fatalf("error when waiting for a new xds client: %v", err)
326		return nil
327	}
328	xdsC := val.(*fakexds.Client)
329	if xdsC.Name() != wantName {
330		t.Fatalf("xdsClient created to balancer: %v, want %v", xdsC.Name(), wantName)
331		return nil
332	}
333	_, err = xdsC.WaitForWatchEDS()
334	if err != nil {
335		t.Fatalf("xdsClient.WatchEDS failed with error: %v", err)
336		return nil
337	}
338	return xdsC
339}
340
341// waitForNewEDSLB makes sure that a new edsLB is created by the top-level
342// edsBalancer.
343func waitForNewEDSLB(t *testing.T, ch *testutils.Channel) *fakeEDSBalancer {
344	t.Helper()
345
346	val, err := ch.Receive()
347	if err != nil {
348		t.Fatalf("error when waiting for a new edsLB: %v", err)
349		return nil
350	}
351	return val.(*fakeEDSBalancer)
352}
353
354// setup overrides the functions which are used to create the xdsClient and the
355// edsLB, creates fake version of them and makes them available on the provided
356// channels. The returned cancel function should be called by the test for
357// cleanup.
358func setup(edsLBCh *testutils.Channel, xdsClientCh *testutils.Channel) func() {
359	origNewEDSBalancer := newEDSBalancer
360	newEDSBalancer = func(cc balancer.ClientConn, loadStore lrs.Store) edsBalancerInterface {
361		edsLB := newFakeEDSBalancer(cc, loadStore)
362		defer func() { edsLBCh.Send(edsLB) }()
363		return edsLB
364	}
365
366	origXdsClientNew := xdsclientNew
367	xdsclientNew = func(opts xdsclient.Options) (xdsClientInterface, error) {
368		xdsC := fakexds.NewClientWithName(opts.Config.BalancerName)
369		defer func() { xdsClientCh.Send(xdsC) }()
370		return xdsC, nil
371	}
372	return func() {
373		newEDSBalancer = origNewEDSBalancer
374		xdsclientNew = origXdsClientNew
375	}
376}
377
378// setupForFallback performs everything that setup does and in addition
379// overrides the fallback startupTimeout to a small value to trigger fallback
380// in tests.
381func setupForFallback(edsLBCh *testutils.Channel, xdsClientCh *testutils.Channel) func() {
382	cancel := setup(edsLBCh, xdsClientCh)
383	startupTimeout = 500 * time.Millisecond
384	return func() {
385		cancel()
386		startupTimeout = defaultTimeout
387	}
388}
389
390// TestXDSConfigBalancerNameUpdate verifies different scenarios where the
391// balancer name in the lbConfig is updated.
392//
393// The test does the following:
394// * Builds a new xds balancer.
395// * Since there is no xDS server to respond to requests from the xds client
396//   (created as part of the xds balancer), we expect the fallback policy to
397//   kick in.
398// * Repeatedly pushes new ClientConnState which specifies different
399//   balancerName in the lbConfig. We expect xdsClient objects to created
400//   whenever the balancerName changes. We also expect a new edsLB to created
401//   the first time the client receives an edsUpdate.
402func (s) TestXDSConfigBalancerNameUpdate(t *testing.T) {
403	edsLBCh := testutils.NewChannel()
404	xdsClientCh := testutils.NewChannel()
405	cancel := setupForFallback(edsLBCh, xdsClientCh)
406	defer cancel()
407
408	builder := balancer.Get(edsName)
409	cc := newTestClientConn()
410	edsB, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testEDSClusterName}}).(*edsBalancer)
411	if !ok {
412		t.Fatalf("builder.Build(%s) returned type {%T}, want {*edsBalancer}", edsName, edsB)
413	}
414	defer edsB.Close()
415
416	addrs := []resolver.Address{{Addr: "1.1.1.1:10001"}, {Addr: "2.2.2.2:10002"}, {Addr: "3.3.3.3:10003"}}
417	edsB.UpdateClientConnState(balancer.ClientConnState{
418		ResolverState:  resolver.State{Addresses: addrs},
419		BalancerConfig: testLBConfigFooBar,
420	})
421
422	waitForNewXDSClientWithEDSWatch(t, xdsClientCh, testBalancerNameFooBar)
423	// Verify that fallbackLB (fakeBalancerA) takes over, since the xdsClient
424	// receives no edsUpdate.
425	if err := cc.waitForNewSubConns(append(addrs, specialAddrForBalancerA)); err != nil {
426		t.Fatal(err)
427	}
428
429	for i := 0; i < 2; i++ {
430		balancerName := fmt.Sprintf("balancer-%d", i)
431		edsB.UpdateClientConnState(balancer.ClientConnState{
432			ResolverState: resolver.State{Addresses: addrs},
433			BalancerConfig: &XDSConfig{
434				BalancerName:   balancerName,
435				ChildPolicy:    &loadBalancingConfig{Name: fakeBalancerA},
436				FallBackPolicy: &loadBalancingConfig{Name: fakeBalancerA},
437				EDSServiceName: testEDSClusterName,
438			},
439		})
440
441		xdsC := waitForNewXDSClientWithEDSWatch(t, xdsClientCh, balancerName)
442		xdsC.InvokeWatchEDSCallback(&xdsclient.EDSUpdate{}, nil)
443
444		// In the first iteration, an edsLB takes over from the fallbackLB. In the
445		// second iteration, a new xds client is created, but the same edsLB is used.
446		if i == 0 {
447			if _, err := edsLBCh.Receive(); err != nil {
448				t.Fatalf("edsBalancer did not create edsLB after receiveing EDS update: %v, %d", err, i)
449			}
450		} else {
451			if _, err := edsLBCh.Receive(); err == nil {
452				t.Fatal("edsBalancer created new edsLB when it was not expected to")
453			}
454		}
455	}
456}
457
458// TestXDSConnfigChildPolicyUpdate verifies scenarios where the childPolicy
459// section of the lbConfig is updated.
460//
461// The test does the following:
462// * Builds a new xds balancer.
463// * Pushes a new ClientConnState with a childPolicy set to fakeBalancerA.
464//   Verifies that a new xdsClient is created. It then pushes a new edsUpdate
465//   through the fakexds client. Verifies that a new edsLB is created and it
466//   receives the expected childPolicy.
467// * Pushes a new ClientConnState with a childPolicy set to fakeBalancerB.
468//   This time around, we expect no new xdsClient or edsLB to be created.
469//   Instead, we expect the existing edsLB to receive the new child policy.
470func (s) TestXDSConnfigChildPolicyUpdate(t *testing.T) {
471	edsLBCh := testutils.NewChannel()
472	xdsClientCh := testutils.NewChannel()
473	cancel := setup(edsLBCh, xdsClientCh)
474	defer cancel()
475
476	builder := balancer.Get(edsName)
477	cc := newTestClientConn()
478	edsB, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}).(*edsBalancer)
479	if !ok {
480		t.Fatalf("builder.Build(%s) returned type {%T}, want {*edsBalancer}", edsName, edsB)
481	}
482	defer edsB.Close()
483
484	edsB.UpdateClientConnState(balancer.ClientConnState{
485		BalancerConfig: &XDSConfig{
486			BalancerName: testBalancerNameFooBar,
487			ChildPolicy: &loadBalancingConfig{
488				Name:   fakeBalancerA,
489				Config: json.RawMessage("{}"),
490			},
491			EDSServiceName: testEDSClusterName,
492		},
493	})
494	xdsC := waitForNewXDSClientWithEDSWatch(t, xdsClientCh, testBalancerNameFooBar)
495	xdsC.InvokeWatchEDSCallback(&xdsclient.EDSUpdate{}, nil)
496	edsLB := waitForNewEDSLB(t, edsLBCh)
497	edsLB.waitForChildPolicy(&loadBalancingConfig{
498		Name:   string(fakeBalancerA),
499		Config: json.RawMessage(`{}`),
500	})
501
502	edsB.UpdateClientConnState(balancer.ClientConnState{
503		BalancerConfig: &XDSConfig{
504			BalancerName: testBalancerNameFooBar,
505			ChildPolicy: &loadBalancingConfig{
506				Name:   fakeBalancerB,
507				Config: json.RawMessage("{}"),
508			},
509			EDSServiceName: testEDSClusterName,
510		},
511	})
512	edsLB.waitForChildPolicy(&loadBalancingConfig{
513		Name:   string(fakeBalancerA),
514		Config: json.RawMessage(`{}`),
515	})
516}
517
518// TestXDSConfigFallBackUpdate verifies different scenarios where the fallback
519// config part of the lbConfig is updated.
520//
521// The test does the following:
522// * Builds a top-level edsBalancer
523// * Fakes the xdsClient and the underlying edsLB implementations.
524// * Sends a ClientConn update to the edsBalancer with a bogus balancerName.
525//   This will get the balancer into fallback monitoring, but since the
526//   startupTimeout package variable is not overridden to a small value, fallback
527//   will not kick-in as yet.
528// * Sends another ClientConn update with fallback addresses. Still fallback
529//   would not have kicked in because the startupTimeout hasn't expired.
530// * Sends an EDSUpdate through the fakexds.Client object. This will trigger
531//   the creation of an edsLB object. This is verified.
532// * Trigger fallback by directly calling the loseContact method on the
533//   top-level edsBalancer. This should instantiate the fallbackLB and should
534//   send the appropriate subConns.
535// * Update the fallback policy to specify and different fallback LB and make
536//   sure the new LB receives appropriate subConns.
537func (s) TestXDSConfigFallBackUpdate(t *testing.T) {
538	edsLBCh := testutils.NewChannel()
539	xdsClientCh := testutils.NewChannel()
540	cancel := setup(edsLBCh, xdsClientCh)
541	defer cancel()
542
543	builder := balancer.Get(edsName)
544	cc := newTestClientConn()
545	edsB, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testEDSClusterName}}).(*edsBalancer)
546	if !ok {
547		t.Fatalf("builder.Build(%s) returned type {%T}, want {*edsBalancer}", edsName, edsB)
548	}
549	defer edsB.Close()
550
551	bogusBalancerName := "wrong-balancer-name"
552	edsB.UpdateClientConnState(balancer.ClientConnState{
553		BalancerConfig: &XDSConfig{
554			BalancerName:   bogusBalancerName,
555			FallBackPolicy: &loadBalancingConfig{Name: fakeBalancerA},
556		},
557	})
558	xdsC := waitForNewXDSClientWithEDSWatch(t, xdsClientCh, bogusBalancerName)
559
560	addrs := []resolver.Address{{Addr: "1.1.1.1:10001"}, {Addr: "2.2.2.2:10002"}, {Addr: "3.3.3.3:10003"}}
561	edsB.UpdateClientConnState(balancer.ClientConnState{
562		ResolverState: resolver.State{Addresses: addrs},
563		BalancerConfig: &XDSConfig{
564			BalancerName:   bogusBalancerName,
565			FallBackPolicy: &loadBalancingConfig{Name: fakeBalancerB},
566		},
567	})
568
569	xdsC.InvokeWatchEDSCallback(&xdsclient.EDSUpdate{}, nil)
570	if _, err := edsLBCh.Receive(); err != nil {
571		t.Fatalf("edsBalancer did not create edsLB after receiveing EDS update: %v", err)
572	}
573
574	// Call loseContact explicitly, error in EDS callback is not handled.
575	// Eventually, this should call EDS ballback with an error that indicates
576	// "lost contact".
577	edsB.loseContact()
578
579	// Verify that fallback (fakeBalancerB) takes over.
580	if err := cc.waitForNewSubConns(append(addrs, specialAddrForBalancerB)); err != nil {
581		t.Fatal(err)
582	}
583
584	edsB.UpdateClientConnState(balancer.ClientConnState{
585		ResolverState: resolver.State{Addresses: addrs},
586		BalancerConfig: &XDSConfig{
587			BalancerName:   bogusBalancerName,
588			FallBackPolicy: &loadBalancingConfig{Name: fakeBalancerA},
589		},
590	})
591
592	// Verify that fallbackLB (fakeBalancerA) takes over.
593	if err := cc.waitForNewSubConns(append(addrs, specialAddrForBalancerA)); err != nil {
594		t.Fatal(err)
595	}
596}
597
598// TestXDSSubConnStateChange verifies if the top-level edsBalancer passes on
599// the subConnStateChange to appropriate child balancers (it tests for edsLB
600// and a fallbackLB).
601func (s) TestXDSSubConnStateChange(t *testing.T) {
602	edsLBCh := testutils.NewChannel()
603	xdsClientCh := testutils.NewChannel()
604	cancel := setup(edsLBCh, xdsClientCh)
605	defer cancel()
606
607	builder := balancer.Get(edsName)
608	cc := newTestClientConn()
609	edsB, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testEDSClusterName}}).(*edsBalancer)
610	if !ok {
611		t.Fatalf("builder.Build(%s) returned type {%T}, want {*edsBalancer}", edsName, edsB)
612	}
613	defer edsB.Close()
614
615	addrs := []resolver.Address{{Addr: "1.1.1.1:10001"}, {Addr: "2.2.2.2:10002"}, {Addr: "3.3.3.3:10003"}}
616	edsB.UpdateClientConnState(balancer.ClientConnState{
617		ResolverState: resolver.State{Addresses: addrs},
618		BalancerConfig: &XDSConfig{
619			BalancerName:   testBalancerNameFooBar,
620			ChildPolicy:    &loadBalancingConfig{Name: fakeBalancerA},
621			FallBackPolicy: &loadBalancingConfig{Name: fakeBalancerA},
622			EDSServiceName: testEDSClusterName,
623		},
624	})
625
626	xdsC := waitForNewXDSClientWithEDSWatch(t, xdsClientCh, testBalancerNameFooBar)
627	xdsC.InvokeWatchEDSCallback(&xdsclient.EDSUpdate{}, nil)
628	edsLB := waitForNewEDSLB(t, edsLBCh)
629
630	fsc := &fakeSubConn{}
631	state := connectivity.Ready
632	edsB.UpdateSubConnState(fsc, balancer.SubConnState{ConnectivityState: state})
633	edsLB.waitForSubConnStateChange(&scStateChange{sc: fsc, state: state})
634
635	// lbABuilder maintains a pointer to the last balancerA that it created. We
636	// need to clear that to make sure a new one is created when we attempt to
637	// fallback in the next line.
638	lbABuilder.clearLastBalancer()
639	// Call loseContact explicitly, error in EDS callback is not handled.
640	// Eventually, this should call EDS ballback with an error that indicates
641	// "lost contact".
642	edsB.loseContact()
643	// Verify that fallback (fakeBalancerA) takes over.
644	if err := cc.waitForNewSubConns(append(addrs, specialAddrForBalancerA)); err != nil {
645		t.Fatal(err)
646	}
647	fblb := lbABuilder.getLastBalancer()
648	if fblb == nil {
649		t.Fatal("expected fallback balancerA to be built on fallback")
650	}
651	edsB.UpdateSubConnState(fsc, balancer.SubConnState{ConnectivityState: state})
652	fblb.waitForSubConnStateChange(&scStateChange{sc: fsc, state: state})
653}
654
655func TestXdsBalancerConfigParsing(t *testing.T) {
656	const testEDSName = "eds.service"
657	var testLRSName = "lrs.server"
658	b := bytes.NewBuffer(nil)
659	if err := (&jsonpb.Marshaler{}).Marshal(b, &scpb.XdsConfig{
660		ChildPolicy: []*scpb.LoadBalancingConfig{
661			{Policy: &scpb.LoadBalancingConfig_Xds{}},
662			{Policy: &scpb.LoadBalancingConfig_RoundRobin{
663				RoundRobin: &scpb.RoundRobinConfig{},
664			}},
665		},
666		FallbackPolicy: []*scpb.LoadBalancingConfig{
667			{Policy: &scpb.LoadBalancingConfig_Xds{}},
668			{Policy: &scpb.LoadBalancingConfig_PickFirst{
669				PickFirst: &scpb.PickFirstConfig{},
670			}},
671		},
672		EdsServiceName:             testEDSName,
673		LrsLoadReportingServerName: &wrapperspb.StringValue{Value: testLRSName},
674	}); err != nil {
675		t.Fatalf("%v", err)
676	}
677
678	tests := []struct {
679		name    string
680		js      json.RawMessage
681		want    serviceconfig.LoadBalancingConfig
682		wantErr bool
683	}{
684		{
685			name: "jsonpb-generated",
686			js:   b.Bytes(),
687			want: &XDSConfig{
688				ChildPolicy: &loadBalancingConfig{
689					Name:   "round_robin",
690					Config: json.RawMessage("{}"),
691				},
692				FallBackPolicy: &loadBalancingConfig{
693					Name:   "pick_first",
694					Config: json.RawMessage("{}"),
695				},
696				EDSServiceName:             testEDSName,
697				LrsLoadReportingServerName: &testLRSName,
698			},
699			wantErr: false,
700		},
701		{
702			// json with random balancers, and the first is not registered.
703			name: "manually-generated",
704			js: json.RawMessage(`
705{
706  "balancerName": "fake.foo.bar",
707  "childPolicy": [
708    {"fake_balancer_C": {}},
709    {"fake_balancer_A": {}},
710    {"fake_balancer_B": {}}
711  ],
712  "fallbackPolicy": [
713    {"fake_balancer_C": {}},
714    {"fake_balancer_B": {}},
715    {"fake_balancer_A": {}}
716  ],
717  "edsServiceName": "eds.service",
718  "lrsLoadReportingServerName": "lrs.server"
719}`),
720			want: &XDSConfig{
721				BalancerName: "fake.foo.bar",
722				ChildPolicy: &loadBalancingConfig{
723					Name:   "fake_balancer_A",
724					Config: json.RawMessage("{}"),
725				},
726				FallBackPolicy: &loadBalancingConfig{
727					Name:   "fake_balancer_B",
728					Config: json.RawMessage("{}"),
729				},
730				EDSServiceName:             testEDSName,
731				LrsLoadReportingServerName: &testLRSName,
732			},
733			wantErr: false,
734		},
735		{
736			// json with no lrs server name, LrsLoadReportingServerName should
737			// be nil (not an empty string).
738			name: "no-lrs-server-name",
739			js: json.RawMessage(`
740{
741  "balancerName": "fake.foo.bar",
742  "edsServiceName": "eds.service"
743}`),
744			want: &XDSConfig{
745				BalancerName:               "fake.foo.bar",
746				EDSServiceName:             testEDSName,
747				LrsLoadReportingServerName: nil,
748			},
749			wantErr: false,
750		},
751	}
752	for _, tt := range tests {
753		t.Run(tt.name, func(t *testing.T) {
754			b := &edsBalancerBuilder{}
755			got, err := b.ParseConfig(tt.js)
756			if (err != nil) != tt.wantErr {
757				t.Errorf("edsBalancerBuilder.ParseConfig() error = %v, wantErr %v", err, tt.wantErr)
758				return
759			}
760			if !cmp.Equal(got, tt.want) {
761				t.Errorf(cmp.Diff(got, tt.want))
762			}
763		})
764	}
765}
766func TestLoadbalancingConfigParsing(t *testing.T) {
767	tests := []struct {
768		name string
769		s    string
770		want *XDSConfig
771	}{
772		{
773			name: "empty",
774			s:    "{}",
775			want: &XDSConfig{},
776		},
777		{
778			name: "success1",
779			s:    `{"childPolicy":[{"pick_first":{}}]}`,
780			want: &XDSConfig{
781				ChildPolicy: &loadBalancingConfig{
782					Name:   "pick_first",
783					Config: json.RawMessage(`{}`),
784				},
785			},
786		},
787		{
788			name: "success2",
789			s:    `{"childPolicy":[{"round_robin":{}},{"pick_first":{}}]}`,
790			want: &XDSConfig{
791				ChildPolicy: &loadBalancingConfig{
792					Name:   "round_robin",
793					Config: json.RawMessage(`{}`),
794				},
795			},
796		},
797	}
798	for _, tt := range tests {
799		t.Run(tt.name, func(t *testing.T) {
800			var cfg XDSConfig
801			if err := json.Unmarshal([]byte(tt.s), &cfg); err != nil || !reflect.DeepEqual(&cfg, tt.want) {
802				t.Errorf("test name: %s, parseFullServiceConfig() = %+v, err: %v, want %+v, <nil>", tt.name, cfg, err, tt.want)
803			}
804		})
805	}
806}
807
808func TestEqualStringPointers(t *testing.T) {
809	var (
810		ta1 = "test-a"
811		ta2 = "test-a"
812		tb  = "test-b"
813	)
814	tests := []struct {
815		name string
816		a    *string
817		b    *string
818		want bool
819	}{
820		{"both-nil", nil, nil, true},
821		{"a-non-nil", &ta1, nil, false},
822		{"b-non-nil", nil, &tb, false},
823		{"equal", &ta1, &ta2, true},
824		{"different", &ta1, &tb, false},
825	}
826	for _, tt := range tests {
827		t.Run(tt.name, func(t *testing.T) {
828			if got := equalStringPointers(tt.a, tt.b); got != tt.want {
829				t.Errorf("equalStringPointers() = %v, want %v", got, tt.want)
830			}
831		})
832	}
833}
834