1/*
2 *
3 * Copyright 2019 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package edsbalancer
20
21import (
22	"bytes"
23	"encoding/json"
24	"fmt"
25	"testing"
26
27	corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
28	"github.com/golang/protobuf/jsonpb"
29	wrapperspb "github.com/golang/protobuf/ptypes/wrappers"
30	"github.com/google/go-cmp/cmp"
31	"google.golang.org/grpc"
32	"google.golang.org/grpc/balancer"
33	"google.golang.org/grpc/connectivity"
34	"google.golang.org/grpc/internal/grpclog"
35	"google.golang.org/grpc/internal/grpctest"
36	scpb "google.golang.org/grpc/internal/proto/grpc_service_config"
37	"google.golang.org/grpc/resolver"
38	"google.golang.org/grpc/serviceconfig"
39	"google.golang.org/grpc/xds/internal/balancer/lrs"
40	xdsclient "google.golang.org/grpc/xds/internal/client"
41	"google.golang.org/grpc/xds/internal/client/bootstrap"
42	"google.golang.org/grpc/xds/internal/testutils"
43	"google.golang.org/grpc/xds/internal/testutils/fakeclient"
44)
45
46func init() {
47	balancer.Register(&edsBalancerBuilder{})
48
49	bootstrapConfigNew = func() (*bootstrap.Config, error) {
50		return &bootstrap.Config{
51			BalancerName: testBalancerNameFooBar,
52			Creds:        grpc.WithInsecure(),
53			NodeProto:    &corepb.Node{},
54		}, nil
55	}
56}
57
58type s struct {
59	grpctest.Tester
60}
61
62func Test(t *testing.T) {
63	grpctest.RunSubTests(t, s{})
64}
65
66const testBalancerNameFooBar = "foo.bar"
67
68func newNoopTestClientConn() *noopTestClientConn {
69	return &noopTestClientConn{}
70}
71
72// noopTestClientConn is used in EDS balancer config update tests that only
73// cover the config update handling, but not SubConn/load-balancing.
74type noopTestClientConn struct {
75	balancer.ClientConn
76}
77
78func (t *noopTestClientConn) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
79	return nil, nil
80}
81
82func (noopTestClientConn) Target() string { return testServiceName }
83
84type scStateChange struct {
85	sc    balancer.SubConn
86	state connectivity.State
87}
88
89type fakeEDSBalancer struct {
90	cc                 balancer.ClientConn
91	childPolicy        *testutils.Channel
92	subconnStateChange *testutils.Channel
93	loadStore          lrs.Store
94}
95
96func (f *fakeEDSBalancer) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) {
97	f.subconnStateChange.Send(&scStateChange{sc: sc, state: state})
98}
99
100func (f *fakeEDSBalancer) HandleChildPolicy(name string, config json.RawMessage) {
101	f.childPolicy.Send(&loadBalancingConfig{Name: name, Config: config})
102}
103
104func (f *fakeEDSBalancer) Close()                                              {}
105func (f *fakeEDSBalancer) HandleEDSResponse(edsResp *xdsclient.EDSUpdate)      {}
106func (f *fakeEDSBalancer) updateState(priority priorityType, s balancer.State) {}
107
108func (f *fakeEDSBalancer) waitForChildPolicy(wantPolicy *loadBalancingConfig) error {
109	val, err := f.childPolicy.Receive()
110	if err != nil {
111		return fmt.Errorf("error waiting for childPolicy: %v", err)
112	}
113	gotPolicy := val.(*loadBalancingConfig)
114	if !cmp.Equal(gotPolicy, wantPolicy) {
115		return fmt.Errorf("got childPolicy %v, want %v", gotPolicy, wantPolicy)
116	}
117	return nil
118}
119
120func (f *fakeEDSBalancer) waitForSubConnStateChange(wantState *scStateChange) error {
121	val, err := f.subconnStateChange.Receive()
122	if err != nil {
123		return fmt.Errorf("error waiting for subconnStateChange: %v", err)
124	}
125	gotState := val.(*scStateChange)
126	if !cmp.Equal(gotState, wantState, cmp.AllowUnexported(scStateChange{})) {
127		return fmt.Errorf("got subconnStateChange %v, want %v", gotState, wantState)
128	}
129	return nil
130}
131
132func newFakeEDSBalancer(cc balancer.ClientConn, loadStore lrs.Store) edsBalancerImplInterface {
133	return &fakeEDSBalancer{
134		cc:                 cc,
135		childPolicy:        testutils.NewChannelWithSize(10),
136		subconnStateChange: testutils.NewChannelWithSize(10),
137		loadStore:          loadStore,
138	}
139}
140
141type fakeSubConn struct{}
142
143func (*fakeSubConn) UpdateAddresses([]resolver.Address) { panic("implement me") }
144func (*fakeSubConn) Connect()                           { panic("implement me") }
145
146// waitForNewXDSClientWithEDSWatch makes sure that a new xdsClient is created
147// with the provided name. It also make sure that the newly created client
148// registers an eds watcher.
149func waitForNewXDSClientWithEDSWatch(t *testing.T, ch *testutils.Channel, wantName string) *fakeclient.Client {
150	t.Helper()
151
152	val, err := ch.Receive()
153	if err != nil {
154		t.Fatalf("error when waiting for a new xds client: %v", err)
155		return nil
156	}
157	xdsC := val.(*fakeclient.Client)
158	if xdsC.Name() != wantName {
159		t.Fatalf("xdsClient created to balancer: %v, want %v", xdsC.Name(), wantName)
160		return nil
161	}
162	_, err = xdsC.WaitForWatchEDS()
163	if err != nil {
164		t.Fatalf("xdsClient.WatchEDS failed with error: %v", err)
165		return nil
166	}
167	return xdsC
168}
169
170// waitForNewEDSLB makes sure that a new edsLB is created by the top-level
171// edsBalancer.
172func waitForNewEDSLB(t *testing.T, ch *testutils.Channel) *fakeEDSBalancer {
173	t.Helper()
174
175	val, err := ch.Receive()
176	if err != nil {
177		t.Fatalf("error when waiting for a new edsLB: %v", err)
178		return nil
179	}
180	return val.(*fakeEDSBalancer)
181}
182
183// setup overrides the functions which are used to create the xdsClient and the
184// edsLB, creates fake version of them and makes them available on the provided
185// channels. The returned cancel function should be called by the test for
186// cleanup.
187func setup(edsLBCh *testutils.Channel, xdsClientCh *testutils.Channel) func() {
188	origNewEDSBalancer := newEDSBalancer
189	newEDSBalancer = func(cc balancer.ClientConn, enqueue func(priorityType, balancer.State), loadStore lrs.Store, logger *grpclog.PrefixLogger) edsBalancerImplInterface {
190		edsLB := newFakeEDSBalancer(cc, loadStore)
191		defer func() { edsLBCh.Send(edsLB) }()
192		return edsLB
193	}
194
195	origXdsClientNew := xdsclientNew
196	xdsclientNew = func(opts xdsclient.Options) (xdsClientInterface, error) {
197		xdsC := fakeclient.NewClientWithName(opts.Config.BalancerName)
198		defer func() { xdsClientCh.Send(xdsC) }()
199		return xdsC, nil
200	}
201	return func() {
202		newEDSBalancer = origNewEDSBalancer
203		xdsclientNew = origXdsClientNew
204	}
205}
206
207// TestXDSConfigBalancerNameUpdate verifies different scenarios where the
208// balancer name in the lbConfig is updated.
209//
210// The test does the following:
211// * Builds a new xds balancer.
212// * Repeatedly pushes new ClientConnState which specifies different
213//   balancerName in the lbConfig. We expect xdsClient objects to created
214//   whenever the balancerName changes.
215func (s) TestXDSConfigBalancerNameUpdate(t *testing.T) {
216	oldBootstrapConfigNew := bootstrapConfigNew
217	bootstrapConfigNew = func() (*bootstrap.Config, error) {
218		// Return an error from bootstrap, so the eds balancer will use
219		// BalancerName from the config.
220		//
221		// TODO: remove this when deleting BalancerName from config.
222		return nil, fmt.Errorf("no bootstrap available")
223	}
224	defer func() { bootstrapConfigNew = oldBootstrapConfigNew }()
225	edsLBCh := testutils.NewChannel()
226	xdsClientCh := testutils.NewChannel()
227	cancel := setup(edsLBCh, xdsClientCh)
228	defer cancel()
229
230	builder := balancer.Get(edsName)
231	cc := newNoopTestClientConn()
232	edsB, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testEDSClusterName}}).(*edsBalancer)
233	if !ok {
234		t.Fatalf("builder.Build(%s) returned type {%T}, want {*edsBalancer}", edsName, edsB)
235	}
236	defer edsB.Close()
237
238	addrs := []resolver.Address{{Addr: "1.1.1.1:10001"}, {Addr: "2.2.2.2:10002"}, {Addr: "3.3.3.3:10003"}}
239	for i := 0; i < 2; i++ {
240		balancerName := fmt.Sprintf("balancer-%d", i)
241		edsB.UpdateClientConnState(balancer.ClientConnState{
242			ResolverState: resolver.State{Addresses: addrs},
243			BalancerConfig: &EDSConfig{
244				BalancerName:   balancerName,
245				EDSServiceName: testEDSClusterName,
246			},
247		})
248
249		xdsC := waitForNewXDSClientWithEDSWatch(t, xdsClientCh, balancerName)
250		xdsC.InvokeWatchEDSCallback(&xdsclient.EDSUpdate{}, nil)
251	}
252}
253
254const (
255	fakeBalancerA = "fake_balancer_A"
256	fakeBalancerB = "fake_balancer_B"
257)
258
259// Install two fake balancers for service config update tests.
260//
261// ParseConfig only accepts the json if the balancer specified is registered.
262
263func init() {
264	balancer.Register(&fakeBalancerBuilder{name: fakeBalancerA})
265	balancer.Register(&fakeBalancerBuilder{name: fakeBalancerB})
266}
267
268type fakeBalancerBuilder struct {
269	name string
270}
271
272func (b *fakeBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
273	return &fakeBalancer{cc: cc}
274}
275
276func (b *fakeBalancerBuilder) Name() string {
277	return b.name
278}
279
280type fakeBalancer struct {
281	cc balancer.ClientConn
282}
283
284func (b *fakeBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
285	panic("implement me")
286}
287
288func (b *fakeBalancer) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) {
289	panic("implement me")
290}
291
292func (b *fakeBalancer) Close() {}
293
294// TestXDSConnfigChildPolicyUpdate verifies scenarios where the childPolicy
295// section of the lbConfig is updated.
296//
297// The test does the following:
298// * Builds a new xds balancer.
299// * Pushes a new ClientConnState with a childPolicy set to fakeBalancerA.
300//   Verifies that a new xdsClient is created. It then pushes a new edsUpdate
301//   through the fakexds client. Verifies that a new edsLB is created and it
302//   receives the expected childPolicy.
303// * Pushes a new ClientConnState with a childPolicy set to fakeBalancerB.
304//   This time around, we expect no new xdsClient or edsLB to be created.
305//   Instead, we expect the existing edsLB to receive the new child policy.
306func (s) TestXDSConnfigChildPolicyUpdate(t *testing.T) {
307	edsLBCh := testutils.NewChannel()
308	xdsClientCh := testutils.NewChannel()
309	cancel := setup(edsLBCh, xdsClientCh)
310	defer cancel()
311
312	builder := balancer.Get(edsName)
313	cc := newNoopTestClientConn()
314	edsB, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}).(*edsBalancer)
315	if !ok {
316		t.Fatalf("builder.Build(%s) returned type {%T}, want {*edsBalancer}", edsName, edsB)
317	}
318	defer edsB.Close()
319
320	edsB.UpdateClientConnState(balancer.ClientConnState{
321		BalancerConfig: &EDSConfig{
322			BalancerName: testBalancerNameFooBar,
323			ChildPolicy: &loadBalancingConfig{
324				Name:   fakeBalancerA,
325				Config: json.RawMessage("{}"),
326			},
327			EDSServiceName: testEDSClusterName,
328		},
329	})
330	xdsC := waitForNewXDSClientWithEDSWatch(t, xdsClientCh, testBalancerNameFooBar)
331	xdsC.InvokeWatchEDSCallback(&xdsclient.EDSUpdate{}, nil)
332	edsLB := waitForNewEDSLB(t, edsLBCh)
333	edsLB.waitForChildPolicy(&loadBalancingConfig{
334		Name:   string(fakeBalancerA),
335		Config: json.RawMessage(`{}`),
336	})
337
338	edsB.UpdateClientConnState(balancer.ClientConnState{
339		BalancerConfig: &EDSConfig{
340			BalancerName: testBalancerNameFooBar,
341			ChildPolicy: &loadBalancingConfig{
342				Name:   fakeBalancerB,
343				Config: json.RawMessage("{}"),
344			},
345			EDSServiceName: testEDSClusterName,
346		},
347	})
348	edsLB.waitForChildPolicy(&loadBalancingConfig{
349		Name:   string(fakeBalancerA),
350		Config: json.RawMessage(`{}`),
351	})
352}
353
354// TestXDSSubConnStateChange verifies if the top-level edsBalancer passes on
355// the subConnStateChange to appropriate child balancers.
356func (s) TestXDSSubConnStateChange(t *testing.T) {
357	edsLBCh := testutils.NewChannel()
358	xdsClientCh := testutils.NewChannel()
359	cancel := setup(edsLBCh, xdsClientCh)
360	defer cancel()
361
362	builder := balancer.Get(edsName)
363	cc := newNoopTestClientConn()
364	edsB, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testEDSClusterName}}).(*edsBalancer)
365	if !ok {
366		t.Fatalf("builder.Build(%s) returned type {%T}, want {*edsBalancer}", edsName, edsB)
367	}
368	defer edsB.Close()
369
370	addrs := []resolver.Address{{Addr: "1.1.1.1:10001"}, {Addr: "2.2.2.2:10002"}, {Addr: "3.3.3.3:10003"}}
371	edsB.UpdateClientConnState(balancer.ClientConnState{
372		ResolverState: resolver.State{Addresses: addrs},
373		BalancerConfig: &EDSConfig{
374			BalancerName:   testBalancerNameFooBar,
375			EDSServiceName: testEDSClusterName,
376		},
377	})
378
379	xdsC := waitForNewXDSClientWithEDSWatch(t, xdsClientCh, testBalancerNameFooBar)
380	xdsC.InvokeWatchEDSCallback(&xdsclient.EDSUpdate{}, nil)
381	edsLB := waitForNewEDSLB(t, edsLBCh)
382
383	fsc := &fakeSubConn{}
384	state := connectivity.Ready
385	edsB.UpdateSubConnState(fsc, balancer.SubConnState{ConnectivityState: state})
386	edsLB.waitForSubConnStateChange(&scStateChange{sc: fsc, state: state})
387}
388
389func (s) TestXDSBalancerConfigParsing(t *testing.T) {
390	const testEDSName = "eds.service"
391	var testLRSName = "lrs.server"
392	b := bytes.NewBuffer(nil)
393	if err := (&jsonpb.Marshaler{}).Marshal(b, &scpb.XdsConfig{
394		ChildPolicy: []*scpb.LoadBalancingConfig{
395			{Policy: &scpb.LoadBalancingConfig_Xds{}},
396			{Policy: &scpb.LoadBalancingConfig_RoundRobin{
397				RoundRobin: &scpb.RoundRobinConfig{},
398			}},
399		},
400		FallbackPolicy: []*scpb.LoadBalancingConfig{
401			{Policy: &scpb.LoadBalancingConfig_Xds{}},
402			{Policy: &scpb.LoadBalancingConfig_PickFirst{
403				PickFirst: &scpb.PickFirstConfig{},
404			}},
405		},
406		EdsServiceName:             testEDSName,
407		LrsLoadReportingServerName: &wrapperspb.StringValue{Value: testLRSName},
408	}); err != nil {
409		t.Fatalf("%v", err)
410	}
411
412	tests := []struct {
413		name    string
414		js      json.RawMessage
415		want    serviceconfig.LoadBalancingConfig
416		wantErr bool
417	}{
418		{
419			name: "jsonpb-generated",
420			js:   b.Bytes(),
421			want: &EDSConfig{
422				ChildPolicy: &loadBalancingConfig{
423					Name:   "round_robin",
424					Config: json.RawMessage("{}"),
425				},
426				FallBackPolicy: &loadBalancingConfig{
427					Name:   "pick_first",
428					Config: json.RawMessage("{}"),
429				},
430				EDSServiceName:             testEDSName,
431				LrsLoadReportingServerName: &testLRSName,
432			},
433			wantErr: false,
434		},
435		{
436			// json with random balancers, and the first is not registered.
437			name: "manually-generated",
438			js: json.RawMessage(`
439{
440  "balancerName": "fake.foo.bar",
441  "childPolicy": [
442    {"fake_balancer_C": {}},
443    {"fake_balancer_A": {}},
444    {"fake_balancer_B": {}}
445  ],
446  "fallbackPolicy": [
447    {"fake_balancer_C": {}},
448    {"fake_balancer_B": {}},
449    {"fake_balancer_A": {}}
450  ],
451  "edsServiceName": "eds.service",
452  "lrsLoadReportingServerName": "lrs.server"
453}`),
454			want: &EDSConfig{
455				BalancerName: "fake.foo.bar",
456				ChildPolicy: &loadBalancingConfig{
457					Name:   "fake_balancer_A",
458					Config: json.RawMessage("{}"),
459				},
460				FallBackPolicy: &loadBalancingConfig{
461					Name:   "fake_balancer_B",
462					Config: json.RawMessage("{}"),
463				},
464				EDSServiceName:             testEDSName,
465				LrsLoadReportingServerName: &testLRSName,
466			},
467			wantErr: false,
468		},
469		{
470			// json with no lrs server name, LrsLoadReportingServerName should
471			// be nil (not an empty string).
472			name: "no-lrs-server-name",
473			js: json.RawMessage(`
474{
475  "balancerName": "fake.foo.bar",
476  "edsServiceName": "eds.service"
477}`),
478			want: &EDSConfig{
479				BalancerName:               "fake.foo.bar",
480				EDSServiceName:             testEDSName,
481				LrsLoadReportingServerName: nil,
482			},
483			wantErr: false,
484		},
485	}
486	for _, tt := range tests {
487		t.Run(tt.name, func(t *testing.T) {
488			b := &edsBalancerBuilder{}
489			got, err := b.ParseConfig(tt.js)
490			if (err != nil) != tt.wantErr {
491				t.Errorf("edsBalancerBuilder.ParseConfig() error = %v, wantErr %v", err, tt.wantErr)
492				return
493			}
494			if !cmp.Equal(got, tt.want) {
495				t.Errorf(cmp.Diff(got, tt.want))
496			}
497		})
498	}
499}
500func (s) TestLoadbalancingConfigParsing(t *testing.T) {
501	tests := []struct {
502		name string
503		s    string
504		want *EDSConfig
505	}{
506		{
507			name: "empty",
508			s:    "{}",
509			want: &EDSConfig{},
510		},
511		{
512			name: "success1",
513			s:    `{"childPolicy":[{"pick_first":{}}]}`,
514			want: &EDSConfig{
515				ChildPolicy: &loadBalancingConfig{
516					Name:   "pick_first",
517					Config: json.RawMessage(`{}`),
518				},
519			},
520		},
521		{
522			name: "success2",
523			s:    `{"childPolicy":[{"round_robin":{}},{"pick_first":{}}]}`,
524			want: &EDSConfig{
525				ChildPolicy: &loadBalancingConfig{
526					Name:   "round_robin",
527					Config: json.RawMessage(`{}`),
528				},
529			},
530		},
531	}
532	for _, tt := range tests {
533		t.Run(tt.name, func(t *testing.T) {
534			var cfg EDSConfig
535			if err := json.Unmarshal([]byte(tt.s), &cfg); err != nil || !cmp.Equal(&cfg, tt.want) {
536				t.Errorf("test name: %s, parseFullServiceConfig() = %+v, err: %v, want %+v, <nil>", tt.name, cfg, err, tt.want)
537			}
538		})
539	}
540}
541
542func (s) TestEqualStringPointers(t *testing.T) {
543	var (
544		ta1 = "test-a"
545		ta2 = "test-a"
546		tb  = "test-b"
547	)
548	tests := []struct {
549		name string
550		a    *string
551		b    *string
552		want bool
553	}{
554		{"both-nil", nil, nil, true},
555		{"a-non-nil", &ta1, nil, false},
556		{"b-non-nil", nil, &tb, false},
557		{"equal", &ta1, &ta2, true},
558		{"different", &ta1, &tb, false},
559	}
560	for _, tt := range tests {
561		t.Run(tt.name, func(t *testing.T) {
562			if got := equalStringPointers(tt.a, tt.b); got != tt.want {
563				t.Errorf("equalStringPointers() = %v, want %v", got, tt.want)
564			}
565		})
566	}
567}
568