1/*
2 *
3 * Copyright 2019 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package edsbalancer
20
21import (
22	"bytes"
23	"encoding/json"
24	"fmt"
25	"reflect"
26	"testing"
27
28	corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
29	"github.com/golang/protobuf/jsonpb"
30	wrapperspb "github.com/golang/protobuf/ptypes/wrappers"
31	"github.com/google/go-cmp/cmp"
32	"google.golang.org/grpc"
33	"google.golang.org/grpc/balancer"
34	"google.golang.org/grpc/connectivity"
35	"google.golang.org/grpc/internal/grpctest"
36	"google.golang.org/grpc/internal/leakcheck"
37	scpb "google.golang.org/grpc/internal/proto/grpc_service_config"
38	"google.golang.org/grpc/resolver"
39	"google.golang.org/grpc/serviceconfig"
40	"google.golang.org/grpc/xds/internal/balancer/lrs"
41	xdsclient "google.golang.org/grpc/xds/internal/client"
42	"google.golang.org/grpc/xds/internal/client/bootstrap"
43	"google.golang.org/grpc/xds/internal/testutils"
44	"google.golang.org/grpc/xds/internal/testutils/fakeclient"
45)
46
47func init() {
48	balancer.Register(&edsBalancerBuilder{})
49
50	bootstrapConfigNew = func() (*bootstrap.Config, error) {
51		return &bootstrap.Config{
52			BalancerName: testBalancerNameFooBar,
53			Creds:        grpc.WithInsecure(),
54			NodeProto:    &corepb.Node{},
55		}, nil
56	}
57}
58
59type s struct{}
60
61func (s) Teardown(t *testing.T) {
62	leakcheck.Check(t)
63}
64
65func Test(t *testing.T) {
66	grpctest.RunSubTests(t, s{})
67}
68
69const testBalancerNameFooBar = "foo.bar"
70
71func newNoopTestClientConn() *noopTestClientConn {
72	return &noopTestClientConn{}
73}
74
75// noopTestClientConn is used in EDS balancer config update tests that only
76// cover the config update handling, but not SubConn/load-balancing.
77type noopTestClientConn struct {
78	balancer.ClientConn
79}
80
81func (t *noopTestClientConn) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
82	return nil, nil
83}
84
85func (noopTestClientConn) Target() string { return testServiceName }
86
87type scStateChange struct {
88	sc    balancer.SubConn
89	state connectivity.State
90}
91
92type fakeEDSBalancer struct {
93	cc                 balancer.ClientConn
94	childPolicy        *testutils.Channel
95	subconnStateChange *testutils.Channel
96	loadStore          lrs.Store
97}
98
99func (f *fakeEDSBalancer) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) {
100	f.subconnStateChange.Send(&scStateChange{sc: sc, state: state})
101}
102
103func (f *fakeEDSBalancer) HandleChildPolicy(name string, config json.RawMessage) {
104	f.childPolicy.Send(&loadBalancingConfig{Name: name, Config: config})
105}
106
107func (f *fakeEDSBalancer) Close()                                         {}
108func (f *fakeEDSBalancer) HandleEDSResponse(edsResp *xdsclient.EDSUpdate) {}
109
110func (f *fakeEDSBalancer) waitForChildPolicy(wantPolicy *loadBalancingConfig) error {
111	val, err := f.childPolicy.Receive()
112	if err != nil {
113		return fmt.Errorf("error waiting for childPolicy: %v", err)
114	}
115	gotPolicy := val.(*loadBalancingConfig)
116	if !reflect.DeepEqual(gotPolicy, wantPolicy) {
117		return fmt.Errorf("got childPolicy %v, want %v", gotPolicy, wantPolicy)
118	}
119	return nil
120}
121
122func (f *fakeEDSBalancer) waitForSubConnStateChange(wantState *scStateChange) error {
123	val, err := f.subconnStateChange.Receive()
124	if err != nil {
125		return fmt.Errorf("error waiting for subconnStateChange: %v", err)
126	}
127	gotState := val.(*scStateChange)
128	if !reflect.DeepEqual(gotState, wantState) {
129		return fmt.Errorf("got subconnStateChange %v, want %v", gotState, wantState)
130	}
131	return nil
132}
133
134func newFakeEDSBalancer(cc balancer.ClientConn, loadStore lrs.Store) edsBalancerImplInterface {
135	return &fakeEDSBalancer{
136		cc:                 cc,
137		childPolicy:        testutils.NewChannelWithSize(10),
138		subconnStateChange: testutils.NewChannelWithSize(10),
139		loadStore:          loadStore,
140	}
141}
142
143type fakeSubConn struct{}
144
145func (*fakeSubConn) UpdateAddresses([]resolver.Address) { panic("implement me") }
146func (*fakeSubConn) Connect()                           { panic("implement me") }
147
148// waitForNewXDSClientWithEDSWatch makes sure that a new xdsClient is created
149// with the provided name. It also make sure that the newly created client
150// registers an eds watcher.
151func waitForNewXDSClientWithEDSWatch(t *testing.T, ch *testutils.Channel, wantName string) *fakeclient.Client {
152	t.Helper()
153
154	val, err := ch.Receive()
155	if err != nil {
156		t.Fatalf("error when waiting for a new xds client: %v", err)
157		return nil
158	}
159	xdsC := val.(*fakeclient.Client)
160	if xdsC.Name() != wantName {
161		t.Fatalf("xdsClient created to balancer: %v, want %v", xdsC.Name(), wantName)
162		return nil
163	}
164	_, err = xdsC.WaitForWatchEDS()
165	if err != nil {
166		t.Fatalf("xdsClient.WatchEDS failed with error: %v", err)
167		return nil
168	}
169	return xdsC
170}
171
172// waitForNewEDSLB makes sure that a new edsLB is created by the top-level
173// edsBalancer.
174func waitForNewEDSLB(t *testing.T, ch *testutils.Channel) *fakeEDSBalancer {
175	t.Helper()
176
177	val, err := ch.Receive()
178	if err != nil {
179		t.Fatalf("error when waiting for a new edsLB: %v", err)
180		return nil
181	}
182	return val.(*fakeEDSBalancer)
183}
184
185// setup overrides the functions which are used to create the xdsClient and the
186// edsLB, creates fake version of them and makes them available on the provided
187// channels. The returned cancel function should be called by the test for
188// cleanup.
189func setup(edsLBCh *testutils.Channel, xdsClientCh *testutils.Channel) func() {
190	origNewEDSBalancer := newEDSBalancer
191	newEDSBalancer = func(cc balancer.ClientConn, loadStore lrs.Store) edsBalancerImplInterface {
192		edsLB := newFakeEDSBalancer(cc, loadStore)
193		defer func() { edsLBCh.Send(edsLB) }()
194		return edsLB
195	}
196
197	origXdsClientNew := xdsclientNew
198	xdsclientNew = func(opts xdsclient.Options) (xdsClientInterface, error) {
199		xdsC := fakeclient.NewClientWithName(opts.Config.BalancerName)
200		defer func() { xdsClientCh.Send(xdsC) }()
201		return xdsC, nil
202	}
203	return func() {
204		newEDSBalancer = origNewEDSBalancer
205		xdsclientNew = origXdsClientNew
206	}
207}
208
209// TestXDSConfigBalancerNameUpdate verifies different scenarios where the
210// balancer name in the lbConfig is updated.
211//
212// The test does the following:
213// * Builds a new xds balancer.
214// * Repeatedly pushes new ClientConnState which specifies different
215//   balancerName in the lbConfig. We expect xdsClient objects to created
216//   whenever the balancerName changes.
217func (s) TestXDSConfigBalancerNameUpdate(t *testing.T) {
218	oldBootstrapConfigNew := bootstrapConfigNew
219	bootstrapConfigNew = func() (*bootstrap.Config, error) {
220		// Return an error from bootstrap, so the eds balancer will use
221		// BalancerName from the config.
222		//
223		// TODO: remove this when deleting BalancerName from config.
224		return nil, fmt.Errorf("no bootstrap available")
225	}
226	defer func() { bootstrapConfigNew = oldBootstrapConfigNew }()
227	edsLBCh := testutils.NewChannel()
228	xdsClientCh := testutils.NewChannel()
229	cancel := setup(edsLBCh, xdsClientCh)
230	defer cancel()
231
232	builder := balancer.Get(edsName)
233	cc := newNoopTestClientConn()
234	edsB, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testEDSClusterName}}).(*edsBalancer)
235	if !ok {
236		t.Fatalf("builder.Build(%s) returned type {%T}, want {*edsBalancer}", edsName, edsB)
237	}
238	defer edsB.Close()
239
240	addrs := []resolver.Address{{Addr: "1.1.1.1:10001"}, {Addr: "2.2.2.2:10002"}, {Addr: "3.3.3.3:10003"}}
241	for i := 0; i < 2; i++ {
242		balancerName := fmt.Sprintf("balancer-%d", i)
243		edsB.UpdateClientConnState(balancer.ClientConnState{
244			ResolverState: resolver.State{Addresses: addrs},
245			BalancerConfig: &EDSConfig{
246				BalancerName:   balancerName,
247				EDSServiceName: testEDSClusterName,
248			},
249		})
250
251		xdsC := waitForNewXDSClientWithEDSWatch(t, xdsClientCh, balancerName)
252		xdsC.InvokeWatchEDSCallback(&xdsclient.EDSUpdate{}, nil)
253	}
254}
255
256const (
257	fakeBalancerA = "fake_balancer_A"
258	fakeBalancerB = "fake_balancer_B"
259)
260
261// Install two fake balancers for service config update tests.
262//
263// ParseConfig only accepts the json if the balancer specified is registered.
264
265func init() {
266	balancer.Register(&fakeBalancerBuilder{name: fakeBalancerA})
267	balancer.Register(&fakeBalancerBuilder{name: fakeBalancerB})
268}
269
270type fakeBalancerBuilder struct {
271	name string
272}
273
274func (b *fakeBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
275	return &fakeBalancer{cc: cc}
276}
277
278func (b *fakeBalancerBuilder) Name() string {
279	return b.name
280}
281
282type fakeBalancer struct {
283	cc balancer.ClientConn
284}
285
286func (b *fakeBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
287	panic("implement me")
288}
289
290func (b *fakeBalancer) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) {
291	panic("implement me")
292}
293
294func (b *fakeBalancer) Close() {}
295
296// TestXDSConnfigChildPolicyUpdate verifies scenarios where the childPolicy
297// section of the lbConfig is updated.
298//
299// The test does the following:
300// * Builds a new xds balancer.
301// * Pushes a new ClientConnState with a childPolicy set to fakeBalancerA.
302//   Verifies that a new xdsClient is created. It then pushes a new edsUpdate
303//   through the fakexds client. Verifies that a new edsLB is created and it
304//   receives the expected childPolicy.
305// * Pushes a new ClientConnState with a childPolicy set to fakeBalancerB.
306//   This time around, we expect no new xdsClient or edsLB to be created.
307//   Instead, we expect the existing edsLB to receive the new child policy.
308func (s) TestXDSConnfigChildPolicyUpdate(t *testing.T) {
309	edsLBCh := testutils.NewChannel()
310	xdsClientCh := testutils.NewChannel()
311	cancel := setup(edsLBCh, xdsClientCh)
312	defer cancel()
313
314	builder := balancer.Get(edsName)
315	cc := newNoopTestClientConn()
316	edsB, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}).(*edsBalancer)
317	if !ok {
318		t.Fatalf("builder.Build(%s) returned type {%T}, want {*edsBalancer}", edsName, edsB)
319	}
320	defer edsB.Close()
321
322	edsB.UpdateClientConnState(balancer.ClientConnState{
323		BalancerConfig: &EDSConfig{
324			BalancerName: testBalancerNameFooBar,
325			ChildPolicy: &loadBalancingConfig{
326				Name:   fakeBalancerA,
327				Config: json.RawMessage("{}"),
328			},
329			EDSServiceName: testEDSClusterName,
330		},
331	})
332	xdsC := waitForNewXDSClientWithEDSWatch(t, xdsClientCh, testBalancerNameFooBar)
333	xdsC.InvokeWatchEDSCallback(&xdsclient.EDSUpdate{}, nil)
334	edsLB := waitForNewEDSLB(t, edsLBCh)
335	edsLB.waitForChildPolicy(&loadBalancingConfig{
336		Name:   string(fakeBalancerA),
337		Config: json.RawMessage(`{}`),
338	})
339
340	edsB.UpdateClientConnState(balancer.ClientConnState{
341		BalancerConfig: &EDSConfig{
342			BalancerName: testBalancerNameFooBar,
343			ChildPolicy: &loadBalancingConfig{
344				Name:   fakeBalancerB,
345				Config: json.RawMessage("{}"),
346			},
347			EDSServiceName: testEDSClusterName,
348		},
349	})
350	edsLB.waitForChildPolicy(&loadBalancingConfig{
351		Name:   string(fakeBalancerA),
352		Config: json.RawMessage(`{}`),
353	})
354}
355
356// TestXDSSubConnStateChange verifies if the top-level edsBalancer passes on
357// the subConnStateChange to appropriate child balancers.
358func (s) TestXDSSubConnStateChange(t *testing.T) {
359	edsLBCh := testutils.NewChannel()
360	xdsClientCh := testutils.NewChannel()
361	cancel := setup(edsLBCh, xdsClientCh)
362	defer cancel()
363
364	builder := balancer.Get(edsName)
365	cc := newNoopTestClientConn()
366	edsB, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testEDSClusterName}}).(*edsBalancer)
367	if !ok {
368		t.Fatalf("builder.Build(%s) returned type {%T}, want {*edsBalancer}", edsName, edsB)
369	}
370	defer edsB.Close()
371
372	addrs := []resolver.Address{{Addr: "1.1.1.1:10001"}, {Addr: "2.2.2.2:10002"}, {Addr: "3.3.3.3:10003"}}
373	edsB.UpdateClientConnState(balancer.ClientConnState{
374		ResolverState: resolver.State{Addresses: addrs},
375		BalancerConfig: &EDSConfig{
376			BalancerName:   testBalancerNameFooBar,
377			EDSServiceName: testEDSClusterName,
378		},
379	})
380
381	xdsC := waitForNewXDSClientWithEDSWatch(t, xdsClientCh, testBalancerNameFooBar)
382	xdsC.InvokeWatchEDSCallback(&xdsclient.EDSUpdate{}, nil)
383	edsLB := waitForNewEDSLB(t, edsLBCh)
384
385	fsc := &fakeSubConn{}
386	state := connectivity.Ready
387	edsB.UpdateSubConnState(fsc, balancer.SubConnState{ConnectivityState: state})
388	edsLB.waitForSubConnStateChange(&scStateChange{sc: fsc, state: state})
389}
390
391func TestXDSBalancerConfigParsing(t *testing.T) {
392	const testEDSName = "eds.service"
393	var testLRSName = "lrs.server"
394	b := bytes.NewBuffer(nil)
395	if err := (&jsonpb.Marshaler{}).Marshal(b, &scpb.XdsConfig{
396		ChildPolicy: []*scpb.LoadBalancingConfig{
397			{Policy: &scpb.LoadBalancingConfig_Xds{}},
398			{Policy: &scpb.LoadBalancingConfig_RoundRobin{
399				RoundRobin: &scpb.RoundRobinConfig{},
400			}},
401		},
402		FallbackPolicy: []*scpb.LoadBalancingConfig{
403			{Policy: &scpb.LoadBalancingConfig_Xds{}},
404			{Policy: &scpb.LoadBalancingConfig_PickFirst{
405				PickFirst: &scpb.PickFirstConfig{},
406			}},
407		},
408		EdsServiceName:             testEDSName,
409		LrsLoadReportingServerName: &wrapperspb.StringValue{Value: testLRSName},
410	}); err != nil {
411		t.Fatalf("%v", err)
412	}
413
414	tests := []struct {
415		name    string
416		js      json.RawMessage
417		want    serviceconfig.LoadBalancingConfig
418		wantErr bool
419	}{
420		{
421			name: "jsonpb-generated",
422			js:   b.Bytes(),
423			want: &EDSConfig{
424				ChildPolicy: &loadBalancingConfig{
425					Name:   "round_robin",
426					Config: json.RawMessage("{}"),
427				},
428				FallBackPolicy: &loadBalancingConfig{
429					Name:   "pick_first",
430					Config: json.RawMessage("{}"),
431				},
432				EDSServiceName:             testEDSName,
433				LrsLoadReportingServerName: &testLRSName,
434			},
435			wantErr: false,
436		},
437		{
438			// json with random balancers, and the first is not registered.
439			name: "manually-generated",
440			js: json.RawMessage(`
441{
442  "balancerName": "fake.foo.bar",
443  "childPolicy": [
444    {"fake_balancer_C": {}},
445    {"fake_balancer_A": {}},
446    {"fake_balancer_B": {}}
447  ],
448  "fallbackPolicy": [
449    {"fake_balancer_C": {}},
450    {"fake_balancer_B": {}},
451    {"fake_balancer_A": {}}
452  ],
453  "edsServiceName": "eds.service",
454  "lrsLoadReportingServerName": "lrs.server"
455}`),
456			want: &EDSConfig{
457				BalancerName: "fake.foo.bar",
458				ChildPolicy: &loadBalancingConfig{
459					Name:   "fake_balancer_A",
460					Config: json.RawMessage("{}"),
461				},
462				FallBackPolicy: &loadBalancingConfig{
463					Name:   "fake_balancer_B",
464					Config: json.RawMessage("{}"),
465				},
466				EDSServiceName:             testEDSName,
467				LrsLoadReportingServerName: &testLRSName,
468			},
469			wantErr: false,
470		},
471		{
472			// json with no lrs server name, LrsLoadReportingServerName should
473			// be nil (not an empty string).
474			name: "no-lrs-server-name",
475			js: json.RawMessage(`
476{
477  "balancerName": "fake.foo.bar",
478  "edsServiceName": "eds.service"
479}`),
480			want: &EDSConfig{
481				BalancerName:               "fake.foo.bar",
482				EDSServiceName:             testEDSName,
483				LrsLoadReportingServerName: nil,
484			},
485			wantErr: false,
486		},
487	}
488	for _, tt := range tests {
489		t.Run(tt.name, func(t *testing.T) {
490			b := &edsBalancerBuilder{}
491			got, err := b.ParseConfig(tt.js)
492			if (err != nil) != tt.wantErr {
493				t.Errorf("edsBalancerBuilder.ParseConfig() error = %v, wantErr %v", err, tt.wantErr)
494				return
495			}
496			if !cmp.Equal(got, tt.want) {
497				t.Errorf(cmp.Diff(got, tt.want))
498			}
499		})
500	}
501}
502func TestLoadbalancingConfigParsing(t *testing.T) {
503	tests := []struct {
504		name string
505		s    string
506		want *EDSConfig
507	}{
508		{
509			name: "empty",
510			s:    "{}",
511			want: &EDSConfig{},
512		},
513		{
514			name: "success1",
515			s:    `{"childPolicy":[{"pick_first":{}}]}`,
516			want: &EDSConfig{
517				ChildPolicy: &loadBalancingConfig{
518					Name:   "pick_first",
519					Config: json.RawMessage(`{}`),
520				},
521			},
522		},
523		{
524			name: "success2",
525			s:    `{"childPolicy":[{"round_robin":{}},{"pick_first":{}}]}`,
526			want: &EDSConfig{
527				ChildPolicy: &loadBalancingConfig{
528					Name:   "round_robin",
529					Config: json.RawMessage(`{}`),
530				},
531			},
532		},
533	}
534	for _, tt := range tests {
535		t.Run(tt.name, func(t *testing.T) {
536			var cfg EDSConfig
537			if err := json.Unmarshal([]byte(tt.s), &cfg); err != nil || !reflect.DeepEqual(&cfg, tt.want) {
538				t.Errorf("test name: %s, parseFullServiceConfig() = %+v, err: %v, want %+v, <nil>", tt.name, cfg, err, tt.want)
539			}
540		})
541	}
542}
543
544func TestEqualStringPointers(t *testing.T) {
545	var (
546		ta1 = "test-a"
547		ta2 = "test-a"
548		tb  = "test-b"
549	)
550	tests := []struct {
551		name string
552		a    *string
553		b    *string
554		want bool
555	}{
556		{"both-nil", nil, nil, true},
557		{"a-non-nil", &ta1, nil, false},
558		{"b-non-nil", nil, &tb, false},
559		{"equal", &ta1, &ta2, true},
560		{"different", &ta1, &tb, false},
561	}
562	for _, tt := range tests {
563		t.Run(tt.name, func(t *testing.T) {
564			if got := equalStringPointers(tt.a, tt.b); got != tt.want {
565				t.Errorf("equalStringPointers() = %v, want %v", got, tt.want)
566			}
567		})
568	}
569}
570