1/*
2 *
3 * Copyright 2019 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package edsbalancer
20
21import (
22	"bytes"
23	"encoding/json"
24	"fmt"
25	"testing"
26
27	corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
28	"github.com/golang/protobuf/jsonpb"
29	wrapperspb "github.com/golang/protobuf/ptypes/wrappers"
30	"github.com/google/go-cmp/cmp"
31	"google.golang.org/grpc"
32	"google.golang.org/grpc/balancer"
33	"google.golang.org/grpc/connectivity"
34	"google.golang.org/grpc/internal/grpclog"
35	"google.golang.org/grpc/internal/grpctest"
36	scpb "google.golang.org/grpc/internal/proto/grpc_service_config"
37	"google.golang.org/grpc/resolver"
38	"google.golang.org/grpc/serviceconfig"
39	"google.golang.org/grpc/xds/internal/balancer/lrs"
40	xdsclient "google.golang.org/grpc/xds/internal/client"
41	"google.golang.org/grpc/xds/internal/client/bootstrap"
42	"google.golang.org/grpc/xds/internal/testutils"
43	"google.golang.org/grpc/xds/internal/testutils/fakeclient"
44)
45
46func init() {
47	balancer.Register(&edsBalancerBuilder{})
48
49	bootstrapConfigNew = func() (*bootstrap.Config, error) {
50		return &bootstrap.Config{
51			BalancerName: testBalancerNameFooBar,
52			Creds:        grpc.WithInsecure(),
53			NodeProto:    &corepb.Node{},
54		}, nil
55	}
56}
57
58func subConnFromPicker(p balancer.V2Picker) func() balancer.SubConn {
59	return func() balancer.SubConn {
60		scst, _ := p.Pick(balancer.PickInfo{})
61		return scst.SubConn
62	}
63}
64
65type s struct {
66	grpctest.Tester
67}
68
69func Test(t *testing.T) {
70	grpctest.RunSubTests(t, s{})
71}
72
73const testBalancerNameFooBar = "foo.bar"
74
75func newNoopTestClientConn() *noopTestClientConn {
76	return &noopTestClientConn{}
77}
78
79// noopTestClientConn is used in EDS balancer config update tests that only
80// cover the config update handling, but not SubConn/load-balancing.
81type noopTestClientConn struct {
82	balancer.ClientConn
83}
84
85func (t *noopTestClientConn) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
86	return nil, nil
87}
88
89func (noopTestClientConn) Target() string { return testServiceName }
90
91type scStateChange struct {
92	sc    balancer.SubConn
93	state connectivity.State
94}
95
96type fakeEDSBalancer struct {
97	cc                 balancer.ClientConn
98	childPolicy        *testutils.Channel
99	subconnStateChange *testutils.Channel
100	loadStore          lrs.Store
101}
102
103func (f *fakeEDSBalancer) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) {
104	f.subconnStateChange.Send(&scStateChange{sc: sc, state: state})
105}
106
107func (f *fakeEDSBalancer) HandleChildPolicy(name string, config json.RawMessage) {
108	f.childPolicy.Send(&loadBalancingConfig{Name: name, Config: config})
109}
110
111func (f *fakeEDSBalancer) Close()                                              {}
112func (f *fakeEDSBalancer) HandleEDSResponse(edsResp *xdsclient.EDSUpdate)      {}
113func (f *fakeEDSBalancer) updateState(priority priorityType, s balancer.State) {}
114
115func (f *fakeEDSBalancer) waitForChildPolicy(wantPolicy *loadBalancingConfig) error {
116	val, err := f.childPolicy.Receive()
117	if err != nil {
118		return fmt.Errorf("error waiting for childPolicy: %v", err)
119	}
120	gotPolicy := val.(*loadBalancingConfig)
121	if !cmp.Equal(gotPolicy, wantPolicy) {
122		return fmt.Errorf("got childPolicy %v, want %v", gotPolicy, wantPolicy)
123	}
124	return nil
125}
126
127func (f *fakeEDSBalancer) waitForSubConnStateChange(wantState *scStateChange) error {
128	val, err := f.subconnStateChange.Receive()
129	if err != nil {
130		return fmt.Errorf("error waiting for subconnStateChange: %v", err)
131	}
132	gotState := val.(*scStateChange)
133	if !cmp.Equal(gotState, wantState, cmp.AllowUnexported(scStateChange{})) {
134		return fmt.Errorf("got subconnStateChange %v, want %v", gotState, wantState)
135	}
136	return nil
137}
138
139func newFakeEDSBalancer(cc balancer.ClientConn, loadStore lrs.Store) edsBalancerImplInterface {
140	return &fakeEDSBalancer{
141		cc:                 cc,
142		childPolicy:        testutils.NewChannelWithSize(10),
143		subconnStateChange: testutils.NewChannelWithSize(10),
144		loadStore:          loadStore,
145	}
146}
147
148type fakeSubConn struct{}
149
150func (*fakeSubConn) UpdateAddresses([]resolver.Address) { panic("implement me") }
151func (*fakeSubConn) Connect()                           { panic("implement me") }
152
153// waitForNewXDSClientWithEDSWatch makes sure that a new xdsClient is created
154// with the provided name. It also make sure that the newly created client
155// registers an eds watcher.
156func waitForNewXDSClientWithEDSWatch(t *testing.T, ch *testutils.Channel, wantName string) *fakeclient.Client {
157	t.Helper()
158
159	val, err := ch.Receive()
160	if err != nil {
161		t.Fatalf("error when waiting for a new xds client: %v", err)
162		return nil
163	}
164	xdsC := val.(*fakeclient.Client)
165	if xdsC.Name() != wantName {
166		t.Fatalf("xdsClient created to balancer: %v, want %v", xdsC.Name(), wantName)
167		return nil
168	}
169	_, err = xdsC.WaitForWatchEDS()
170	if err != nil {
171		t.Fatalf("xdsClient.WatchEndpoints failed with error: %v", err)
172		return nil
173	}
174	return xdsC
175}
176
177// waitForNewEDSLB makes sure that a new edsLB is created by the top-level
178// edsBalancer.
179func waitForNewEDSLB(t *testing.T, ch *testutils.Channel) *fakeEDSBalancer {
180	t.Helper()
181
182	val, err := ch.Receive()
183	if err != nil {
184		t.Fatalf("error when waiting for a new edsLB: %v", err)
185		return nil
186	}
187	return val.(*fakeEDSBalancer)
188}
189
190// setup overrides the functions which are used to create the xdsClient and the
191// edsLB, creates fake version of them and makes them available on the provided
192// channels. The returned cancel function should be called by the test for
193// cleanup.
194func setup(edsLBCh *testutils.Channel, xdsClientCh *testutils.Channel) func() {
195	origNewEDSBalancer := newEDSBalancer
196	newEDSBalancer = func(cc balancer.ClientConn, enqueue func(priorityType, balancer.State), loadStore lrs.Store, logger *grpclog.PrefixLogger) edsBalancerImplInterface {
197		edsLB := newFakeEDSBalancer(cc, loadStore)
198		defer func() { edsLBCh.Send(edsLB) }()
199		return edsLB
200	}
201
202	origXdsClientNew := xdsclientNew
203	xdsclientNew = func(opts xdsclient.Options) (xdsClientInterface, error) {
204		xdsC := fakeclient.NewClientWithName(opts.Config.BalancerName)
205		defer func() { xdsClientCh.Send(xdsC) }()
206		return xdsC, nil
207	}
208	return func() {
209		newEDSBalancer = origNewEDSBalancer
210		xdsclientNew = origXdsClientNew
211	}
212}
213
214// TestXDSConfigBalancerNameUpdate verifies different scenarios where the
215// balancer name in the lbConfig is updated.
216//
217// The test does the following:
218// * Builds a new xds balancer.
219// * Repeatedly pushes new ClientConnState which specifies different
220//   balancerName in the lbConfig. We expect xdsClient objects to created
221//   whenever the balancerName changes.
222func (s) TestXDSConfigBalancerNameUpdate(t *testing.T) {
223	oldBootstrapConfigNew := bootstrapConfigNew
224	bootstrapConfigNew = func() (*bootstrap.Config, error) {
225		// Return an error from bootstrap, so the eds balancer will use
226		// BalancerName from the config.
227		//
228		// TODO: remove this when deleting BalancerName from config.
229		return nil, fmt.Errorf("no bootstrap available")
230	}
231	defer func() { bootstrapConfigNew = oldBootstrapConfigNew }()
232	edsLBCh := testutils.NewChannel()
233	xdsClientCh := testutils.NewChannel()
234	cancel := setup(edsLBCh, xdsClientCh)
235	defer cancel()
236
237	builder := balancer.Get(edsName)
238	cc := newNoopTestClientConn()
239	edsB, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testEDSClusterName}}).(*edsBalancer)
240	if !ok {
241		t.Fatalf("builder.Build(%s) returned type {%T}, want {*edsBalancer}", edsName, edsB)
242	}
243	defer edsB.Close()
244
245	addrs := []resolver.Address{{Addr: "1.1.1.1:10001"}, {Addr: "2.2.2.2:10002"}, {Addr: "3.3.3.3:10003"}}
246	for i := 0; i < 2; i++ {
247		balancerName := fmt.Sprintf("balancer-%d", i)
248		edsB.UpdateClientConnState(balancer.ClientConnState{
249			ResolverState: resolver.State{Addresses: addrs},
250			BalancerConfig: &EDSConfig{
251				BalancerName:   balancerName,
252				EDSServiceName: testEDSClusterName,
253			},
254		})
255
256		xdsC := waitForNewXDSClientWithEDSWatch(t, xdsClientCh, balancerName)
257		xdsC.InvokeWatchEDSCallback(&xdsclient.EDSUpdate{}, nil)
258	}
259}
260
261const (
262	fakeBalancerA = "fake_balancer_A"
263	fakeBalancerB = "fake_balancer_B"
264)
265
266// Install two fake balancers for service config update tests.
267//
268// ParseConfig only accepts the json if the balancer specified is registered.
269
270func init() {
271	balancer.Register(&fakeBalancerBuilder{name: fakeBalancerA})
272	balancer.Register(&fakeBalancerBuilder{name: fakeBalancerB})
273}
274
275type fakeBalancerBuilder struct {
276	name string
277}
278
279func (b *fakeBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
280	return &fakeBalancer{cc: cc}
281}
282
283func (b *fakeBalancerBuilder) Name() string {
284	return b.name
285}
286
287type fakeBalancer struct {
288	cc balancer.ClientConn
289}
290
291func (b *fakeBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
292	panic("implement me")
293}
294
295func (b *fakeBalancer) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) {
296	panic("implement me")
297}
298
299func (b *fakeBalancer) Close() {}
300
301// TestXDSConnfigChildPolicyUpdate verifies scenarios where the childPolicy
302// section of the lbConfig is updated.
303//
304// The test does the following:
305// * Builds a new xds balancer.
306// * Pushes a new ClientConnState with a childPolicy set to fakeBalancerA.
307//   Verifies that a new xdsClient is created. It then pushes a new edsUpdate
308//   through the fakexds client. Verifies that a new edsLB is created and it
309//   receives the expected childPolicy.
310// * Pushes a new ClientConnState with a childPolicy set to fakeBalancerB.
311//   This time around, we expect no new xdsClient or edsLB to be created.
312//   Instead, we expect the existing edsLB to receive the new child policy.
313func (s) TestXDSConnfigChildPolicyUpdate(t *testing.T) {
314	edsLBCh := testutils.NewChannel()
315	xdsClientCh := testutils.NewChannel()
316	cancel := setup(edsLBCh, xdsClientCh)
317	defer cancel()
318
319	builder := balancer.Get(edsName)
320	cc := newNoopTestClientConn()
321	edsB, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}).(*edsBalancer)
322	if !ok {
323		t.Fatalf("builder.Build(%s) returned type {%T}, want {*edsBalancer}", edsName, edsB)
324	}
325	defer edsB.Close()
326
327	edsB.UpdateClientConnState(balancer.ClientConnState{
328		BalancerConfig: &EDSConfig{
329			BalancerName: testBalancerNameFooBar,
330			ChildPolicy: &loadBalancingConfig{
331				Name:   fakeBalancerA,
332				Config: json.RawMessage("{}"),
333			},
334			EDSServiceName: testEDSClusterName,
335		},
336	})
337	xdsC := waitForNewXDSClientWithEDSWatch(t, xdsClientCh, testBalancerNameFooBar)
338	xdsC.InvokeWatchEDSCallback(&xdsclient.EDSUpdate{}, nil)
339	edsLB := waitForNewEDSLB(t, edsLBCh)
340	edsLB.waitForChildPolicy(&loadBalancingConfig{
341		Name:   string(fakeBalancerA),
342		Config: json.RawMessage(`{}`),
343	})
344
345	edsB.UpdateClientConnState(balancer.ClientConnState{
346		BalancerConfig: &EDSConfig{
347			BalancerName: testBalancerNameFooBar,
348			ChildPolicy: &loadBalancingConfig{
349				Name:   fakeBalancerB,
350				Config: json.RawMessage("{}"),
351			},
352			EDSServiceName: testEDSClusterName,
353		},
354	})
355	edsLB.waitForChildPolicy(&loadBalancingConfig{
356		Name:   string(fakeBalancerA),
357		Config: json.RawMessage(`{}`),
358	})
359}
360
361// TestXDSSubConnStateChange verifies if the top-level edsBalancer passes on
362// the subConnStateChange to appropriate child balancers.
363func (s) TestXDSSubConnStateChange(t *testing.T) {
364	edsLBCh := testutils.NewChannel()
365	xdsClientCh := testutils.NewChannel()
366	cancel := setup(edsLBCh, xdsClientCh)
367	defer cancel()
368
369	builder := balancer.Get(edsName)
370	cc := newNoopTestClientConn()
371	edsB, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testEDSClusterName}}).(*edsBalancer)
372	if !ok {
373		t.Fatalf("builder.Build(%s) returned type {%T}, want {*edsBalancer}", edsName, edsB)
374	}
375	defer edsB.Close()
376
377	addrs := []resolver.Address{{Addr: "1.1.1.1:10001"}, {Addr: "2.2.2.2:10002"}, {Addr: "3.3.3.3:10003"}}
378	edsB.UpdateClientConnState(balancer.ClientConnState{
379		ResolverState: resolver.State{Addresses: addrs},
380		BalancerConfig: &EDSConfig{
381			BalancerName:   testBalancerNameFooBar,
382			EDSServiceName: testEDSClusterName,
383		},
384	})
385
386	xdsC := waitForNewXDSClientWithEDSWatch(t, xdsClientCh, testBalancerNameFooBar)
387	xdsC.InvokeWatchEDSCallback(&xdsclient.EDSUpdate{}, nil)
388	edsLB := waitForNewEDSLB(t, edsLBCh)
389
390	fsc := &fakeSubConn{}
391	state := connectivity.Ready
392	edsB.UpdateSubConnState(fsc, balancer.SubConnState{ConnectivityState: state})
393	edsLB.waitForSubConnStateChange(&scStateChange{sc: fsc, state: state})
394}
395
396func (s) TestXDSBalancerConfigParsing(t *testing.T) {
397	const testEDSName = "eds.service"
398	var testLRSName = "lrs.server"
399	b := bytes.NewBuffer(nil)
400	if err := (&jsonpb.Marshaler{}).Marshal(b, &scpb.XdsConfig{
401		ChildPolicy: []*scpb.LoadBalancingConfig{
402			{Policy: &scpb.LoadBalancingConfig_Xds{}},
403			{Policy: &scpb.LoadBalancingConfig_RoundRobin{
404				RoundRobin: &scpb.RoundRobinConfig{},
405			}},
406		},
407		FallbackPolicy: []*scpb.LoadBalancingConfig{
408			{Policy: &scpb.LoadBalancingConfig_Xds{}},
409			{Policy: &scpb.LoadBalancingConfig_PickFirst{
410				PickFirst: &scpb.PickFirstConfig{},
411			}},
412		},
413		EdsServiceName:             testEDSName,
414		LrsLoadReportingServerName: &wrapperspb.StringValue{Value: testLRSName},
415	}); err != nil {
416		t.Fatalf("%v", err)
417	}
418
419	tests := []struct {
420		name    string
421		js      json.RawMessage
422		want    serviceconfig.LoadBalancingConfig
423		wantErr bool
424	}{
425		{
426			name: "jsonpb-generated",
427			js:   b.Bytes(),
428			want: &EDSConfig{
429				ChildPolicy: &loadBalancingConfig{
430					Name:   "round_robin",
431					Config: json.RawMessage("{}"),
432				},
433				FallBackPolicy: &loadBalancingConfig{
434					Name:   "pick_first",
435					Config: json.RawMessage("{}"),
436				},
437				EDSServiceName:             testEDSName,
438				LrsLoadReportingServerName: &testLRSName,
439			},
440			wantErr: false,
441		},
442		{
443			// json with random balancers, and the first is not registered.
444			name: "manually-generated",
445			js: json.RawMessage(`
446{
447  "balancerName": "fake.foo.bar",
448  "childPolicy": [
449    {"fake_balancer_C": {}},
450    {"fake_balancer_A": {}},
451    {"fake_balancer_B": {}}
452  ],
453  "fallbackPolicy": [
454    {"fake_balancer_C": {}},
455    {"fake_balancer_B": {}},
456    {"fake_balancer_A": {}}
457  ],
458  "edsServiceName": "eds.service",
459  "lrsLoadReportingServerName": "lrs.server"
460}`),
461			want: &EDSConfig{
462				BalancerName: "fake.foo.bar",
463				ChildPolicy: &loadBalancingConfig{
464					Name:   "fake_balancer_A",
465					Config: json.RawMessage("{}"),
466				},
467				FallBackPolicy: &loadBalancingConfig{
468					Name:   "fake_balancer_B",
469					Config: json.RawMessage("{}"),
470				},
471				EDSServiceName:             testEDSName,
472				LrsLoadReportingServerName: &testLRSName,
473			},
474			wantErr: false,
475		},
476		{
477			// json with no lrs server name, LrsLoadReportingServerName should
478			// be nil (not an empty string).
479			name: "no-lrs-server-name",
480			js: json.RawMessage(`
481{
482  "balancerName": "fake.foo.bar",
483  "edsServiceName": "eds.service"
484}`),
485			want: &EDSConfig{
486				BalancerName:               "fake.foo.bar",
487				EDSServiceName:             testEDSName,
488				LrsLoadReportingServerName: nil,
489			},
490			wantErr: false,
491		},
492	}
493	for _, tt := range tests {
494		t.Run(tt.name, func(t *testing.T) {
495			b := &edsBalancerBuilder{}
496			got, err := b.ParseConfig(tt.js)
497			if (err != nil) != tt.wantErr {
498				t.Errorf("edsBalancerBuilder.ParseConfig() error = %v, wantErr %v", err, tt.wantErr)
499				return
500			}
501			if !cmp.Equal(got, tt.want) {
502				t.Errorf(cmp.Diff(got, tt.want))
503			}
504		})
505	}
506}
507func (s) TestLoadbalancingConfigParsing(t *testing.T) {
508	tests := []struct {
509		name string
510		s    string
511		want *EDSConfig
512	}{
513		{
514			name: "empty",
515			s:    "{}",
516			want: &EDSConfig{},
517		},
518		{
519			name: "success1",
520			s:    `{"childPolicy":[{"pick_first":{}}]}`,
521			want: &EDSConfig{
522				ChildPolicy: &loadBalancingConfig{
523					Name:   "pick_first",
524					Config: json.RawMessage(`{}`),
525				},
526			},
527		},
528		{
529			name: "success2",
530			s:    `{"childPolicy":[{"round_robin":{}},{"pick_first":{}}]}`,
531			want: &EDSConfig{
532				ChildPolicy: &loadBalancingConfig{
533					Name:   "round_robin",
534					Config: json.RawMessage(`{}`),
535				},
536			},
537		},
538	}
539	for _, tt := range tests {
540		t.Run(tt.name, func(t *testing.T) {
541			var cfg EDSConfig
542			if err := json.Unmarshal([]byte(tt.s), &cfg); err != nil || !cmp.Equal(&cfg, tt.want) {
543				t.Errorf("test name: %s, parseFullServiceConfig() = %+v, err: %v, want %+v, <nil>", tt.name, cfg, err, tt.want)
544			}
545		})
546	}
547}
548
549func (s) TestEqualStringPointers(t *testing.T) {
550	var (
551		ta1 = "test-a"
552		ta2 = "test-a"
553		tb  = "test-b"
554	)
555	tests := []struct {
556		name string
557		a    *string
558		b    *string
559		want bool
560	}{
561		{"both-nil", nil, nil, true},
562		{"a-non-nil", &ta1, nil, false},
563		{"b-non-nil", nil, &tb, false},
564		{"equal", &ta1, &ta2, true},
565		{"different", &ta1, &tb, false},
566	}
567	for _, tt := range tests {
568		t.Run(tt.name, func(t *testing.T) {
569			if got := equalStringPointers(tt.a, tt.b); got != tt.want {
570				t.Errorf("equalStringPointers() = %v, want %v", got, tt.want)
571			}
572		})
573	}
574}
575