1// +build go1.12
2
3/*
4 *
5 * Copyright 2019 gRPC authors.
6 *
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 *     http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 *
19 */
20
21package clusterresolver
22
23import (
24	"context"
25	"fmt"
26	"testing"
27	"time"
28
29	"github.com/google/go-cmp/cmp"
30	"google.golang.org/grpc/balancer"
31	"google.golang.org/grpc/connectivity"
32	"google.golang.org/grpc/internal/grpctest"
33	"google.golang.org/grpc/internal/testutils"
34	"google.golang.org/grpc/resolver"
35	"google.golang.org/grpc/xds/internal"
36	"google.golang.org/grpc/xds/internal/testutils/fakeclient"
37	"google.golang.org/grpc/xds/internal/xdsclient"
38
39	_ "google.golang.org/grpc/xds/internal/xdsclient/v2" // V2 client registration.
40)
41
42const (
43	defaultTestTimeout      = 1 * time.Second
44	defaultTestShortTimeout = 10 * time.Millisecond
45	testEDSServcie          = "test-eds-service-name"
46	testClusterName         = "test-cluster-name"
47)
48
49var (
50	// A non-empty endpoints update which is expected to be accepted by the EDS
51	// LB policy.
52	defaultEndpointsUpdate = xdsclient.EndpointsUpdate{
53		Localities: []xdsclient.Locality{
54			{
55				Endpoints: []xdsclient.Endpoint{{Address: "endpoint1"}},
56				ID:        internal.LocalityID{Zone: "zone"},
57				Priority:  1,
58				Weight:    100,
59			},
60		},
61	}
62)
63
64func init() {
65	balancer.Register(bb{})
66}
67
68type s struct {
69	grpctest.Tester
70
71	cleanup func()
72}
73
74func (ss s) Teardown(t *testing.T) {
75	xdsclient.ClearAllCountersForTesting()
76	ss.Tester.Teardown(t)
77	if ss.cleanup != nil {
78		ss.cleanup()
79	}
80}
81
82func Test(t *testing.T) {
83	grpctest.RunSubTests(t, s{})
84}
85
86const testBalancerNameFooBar = "foo.bar"
87
88func newNoopTestClientConn() *noopTestClientConn {
89	return &noopTestClientConn{}
90}
91
92// noopTestClientConn is used in EDS balancer config update tests that only
93// cover the config update handling, but not SubConn/load-balancing.
94type noopTestClientConn struct {
95	balancer.ClientConn
96}
97
98func (t *noopTestClientConn) NewSubConn([]resolver.Address, balancer.NewSubConnOptions) (balancer.SubConn, error) {
99	return nil, nil
100}
101
102func (noopTestClientConn) Target() string { return testEDSServcie }
103
104type scStateChange struct {
105	sc    balancer.SubConn
106	state balancer.SubConnState
107}
108
109type fakeChildBalancer struct {
110	cc              balancer.ClientConn
111	subConnState    *testutils.Channel
112	clientConnState *testutils.Channel
113	resolverError   *testutils.Channel
114}
115
116func (f *fakeChildBalancer) UpdateClientConnState(state balancer.ClientConnState) error {
117	f.clientConnState.Send(state)
118	return nil
119}
120
121func (f *fakeChildBalancer) ResolverError(err error) {
122	f.resolverError.Send(err)
123}
124
125func (f *fakeChildBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
126	f.subConnState.Send(&scStateChange{sc: sc, state: state})
127}
128
129func (f *fakeChildBalancer) Close() {}
130
131func (f *fakeChildBalancer) waitForClientConnStateChange(ctx context.Context) error {
132	_, err := f.clientConnState.Receive(ctx)
133	if err != nil {
134		return err
135	}
136	return nil
137}
138
139func (f *fakeChildBalancer) waitForResolverError(ctx context.Context) error {
140	_, err := f.resolverError.Receive(ctx)
141	if err != nil {
142		return err
143	}
144	return nil
145}
146
147func (f *fakeChildBalancer) waitForSubConnStateChange(ctx context.Context, wantState *scStateChange) error {
148	val, err := f.subConnState.Receive(ctx)
149	if err != nil {
150		return err
151	}
152	gotState := val.(*scStateChange)
153	if !cmp.Equal(gotState, wantState, cmp.AllowUnexported(scStateChange{})) {
154		return fmt.Errorf("got subconnStateChange %v, want %v", gotState, wantState)
155	}
156	return nil
157}
158
159func newFakeChildBalancer(cc balancer.ClientConn) balancer.Balancer {
160	return &fakeChildBalancer{
161		cc:              cc,
162		subConnState:    testutils.NewChannelWithSize(10),
163		clientConnState: testutils.NewChannelWithSize(10),
164		resolverError:   testutils.NewChannelWithSize(10),
165	}
166}
167
168type fakeSubConn struct{}
169
170func (*fakeSubConn) UpdateAddresses([]resolver.Address) { panic("implement me") }
171func (*fakeSubConn) Connect()                           { panic("implement me") }
172
173// waitForNewChildLB makes sure that a new child LB is created by the top-level
174// clusterResolverBalancer.
175func waitForNewChildLB(ctx context.Context, ch *testutils.Channel) (*fakeChildBalancer, error) {
176	val, err := ch.Receive(ctx)
177	if err != nil {
178		return nil, fmt.Errorf("error when waiting for a new edsLB: %v", err)
179	}
180	return val.(*fakeChildBalancer), nil
181}
182
183// setup overrides the functions which are used to create the xdsClient and the
184// edsLB, creates fake version of them and makes them available on the provided
185// channels. The returned cancel function should be called by the test for
186// cleanup.
187func setup(childLBCh *testutils.Channel) (*fakeclient.Client, func()) {
188	xdsC := fakeclient.NewClientWithName(testBalancerNameFooBar)
189
190	origNewChildBalancer := newChildBalancer
191	newChildBalancer = func(_ balancer.Builder, cc balancer.ClientConn, _ balancer.BuildOptions) balancer.Balancer {
192		childLB := newFakeChildBalancer(cc)
193		defer func() { childLBCh.Send(childLB) }()
194		return childLB
195	}
196	return xdsC, func() {
197		newChildBalancer = origNewChildBalancer
198		xdsC.Close()
199	}
200}
201
202// TestSubConnStateChange verifies if the top-level clusterResolverBalancer passes on
203// the subConnState to appropriate child balancer.
204func (s) TestSubConnStateChange(t *testing.T) {
205	edsLBCh := testutils.NewChannel()
206	xdsC, cleanup := setup(edsLBCh)
207	defer cleanup()
208
209	builder := balancer.Get(Name)
210	edsB := builder.Build(newNoopTestClientConn(), balancer.BuildOptions{Target: resolver.Target{Endpoint: testEDSServcie}})
211	if edsB == nil {
212		t.Fatalf("builder.Build(%s) failed and returned nil", Name)
213	}
214	defer edsB.Close()
215
216	if err := edsB.UpdateClientConnState(balancer.ClientConnState{
217		ResolverState:  xdsclient.SetClient(resolver.State{}, xdsC),
218		BalancerConfig: newLBConfigWithOneEDS(testEDSServcie),
219	}); err != nil {
220		t.Fatalf("edsB.UpdateClientConnState() failed: %v", err)
221	}
222
223	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
224	defer cancel()
225	if _, err := xdsC.WaitForWatchEDS(ctx); err != nil {
226		t.Fatalf("xdsClient.WatchEndpoints failed with error: %v", err)
227	}
228	xdsC.InvokeWatchEDSCallback("", defaultEndpointsUpdate, nil)
229	edsLB, err := waitForNewChildLB(ctx, edsLBCh)
230	if err != nil {
231		t.Fatal(err)
232	}
233
234	fsc := &fakeSubConn{}
235	state := balancer.SubConnState{ConnectivityState: connectivity.Ready}
236	edsB.UpdateSubConnState(fsc, state)
237	if err := edsLB.waitForSubConnStateChange(ctx, &scStateChange{sc: fsc, state: state}); err != nil {
238		t.Fatal(err)
239	}
240}
241
242// TestErrorFromXDSClientUpdate verifies that an error from xdsClient update is
243// handled correctly.
244//
245// If it's resource-not-found, watch will NOT be canceled, the EDS impl will
246// receive an empty EDS update, and new RPCs will fail.
247//
248// If it's connection error, nothing will happen. This will need to change to
249// handle fallback.
250func (s) TestErrorFromXDSClientUpdate(t *testing.T) {
251	edsLBCh := testutils.NewChannel()
252	xdsC, cleanup := setup(edsLBCh)
253	defer cleanup()
254
255	builder := balancer.Get(Name)
256	edsB := builder.Build(newNoopTestClientConn(), balancer.BuildOptions{Target: resolver.Target{Endpoint: testEDSServcie}})
257	if edsB == nil {
258		t.Fatalf("builder.Build(%s) failed and returned nil", Name)
259	}
260	defer edsB.Close()
261
262	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
263	defer cancel()
264	if err := edsB.UpdateClientConnState(balancer.ClientConnState{
265		ResolverState:  xdsclient.SetClient(resolver.State{}, xdsC),
266		BalancerConfig: newLBConfigWithOneEDS(testEDSServcie),
267	}); err != nil {
268		t.Fatal(err)
269	}
270	if _, err := xdsC.WaitForWatchEDS(ctx); err != nil {
271		t.Fatalf("xdsClient.WatchEndpoints failed with error: %v", err)
272	}
273	xdsC.InvokeWatchEDSCallback("", xdsclient.EndpointsUpdate{}, nil)
274	edsLB, err := waitForNewChildLB(ctx, edsLBCh)
275	if err != nil {
276		t.Fatal(err)
277	}
278	if err := edsLB.waitForClientConnStateChange(ctx); err != nil {
279		t.Fatalf("EDS impl got unexpected update: %v", err)
280	}
281
282	connectionErr := xdsclient.NewErrorf(xdsclient.ErrorTypeConnection, "connection error")
283	xdsC.InvokeWatchEDSCallback("", xdsclient.EndpointsUpdate{}, connectionErr)
284
285	sCtx, sCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
286	defer sCancel()
287	if _, err := xdsC.WaitForCancelEDSWatch(sCtx); err != context.DeadlineExceeded {
288		t.Fatal("watch was canceled, want not canceled (timeout error)")
289	}
290
291	sCtx, sCancel = context.WithTimeout(context.Background(), defaultTestShortTimeout)
292	defer sCancel()
293	if err := edsLB.waitForClientConnStateChange(sCtx); err != context.DeadlineExceeded {
294		t.Fatal(err)
295	}
296	if err := edsLB.waitForResolverError(ctx); err != nil {
297		t.Fatalf("want resolver error, got %v", err)
298	}
299
300	resourceErr := xdsclient.NewErrorf(xdsclient.ErrorTypeResourceNotFound, "clusterResolverBalancer resource not found error")
301	xdsC.InvokeWatchEDSCallback("", xdsclient.EndpointsUpdate{}, resourceErr)
302	// Even if error is resource not found, watch shouldn't be canceled, because
303	// this is an EDS resource removed (and xds client actually never sends this
304	// error, but we still handles it).
305	sCtx, sCancel = context.WithTimeout(context.Background(), defaultTestShortTimeout)
306	defer sCancel()
307	if _, err := xdsC.WaitForCancelEDSWatch(sCtx); err != context.DeadlineExceeded {
308		t.Fatal("watch was canceled, want not canceled (timeout error)")
309	}
310	if err := edsLB.waitForClientConnStateChange(sCtx); err != context.DeadlineExceeded {
311		t.Fatal(err)
312	}
313	if err := edsLB.waitForResolverError(ctx); err != nil {
314		t.Fatalf("want resolver error, got %v", err)
315	}
316
317	// An update with the same service name should not trigger a new watch.
318	if err := edsB.UpdateClientConnState(balancer.ClientConnState{
319		ResolverState:  xdsclient.SetClient(resolver.State{}, xdsC),
320		BalancerConfig: newLBConfigWithOneEDS(testEDSServcie),
321	}); err != nil {
322		t.Fatal(err)
323	}
324	sCtx, sCancel = context.WithTimeout(context.Background(), defaultTestShortTimeout)
325	defer sCancel()
326	if _, err := xdsC.WaitForWatchEDS(sCtx); err != context.DeadlineExceeded {
327		t.Fatal("got unexpected new EDS watch")
328	}
329}
330
331// TestErrorFromResolver verifies that resolver errors are handled correctly.
332//
333// If it's resource-not-found, watch will be canceled, the EDS impl will receive
334// an empty EDS update, and new RPCs will fail.
335//
336// If it's connection error, nothing will happen. This will need to change to
337// handle fallback.
338func (s) TestErrorFromResolver(t *testing.T) {
339	edsLBCh := testutils.NewChannel()
340	xdsC, cleanup := setup(edsLBCh)
341	defer cleanup()
342
343	builder := balancer.Get(Name)
344	edsB := builder.Build(newNoopTestClientConn(), balancer.BuildOptions{Target: resolver.Target{Endpoint: testEDSServcie}})
345	if edsB == nil {
346		t.Fatalf("builder.Build(%s) failed and returned nil", Name)
347	}
348	defer edsB.Close()
349
350	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
351	defer cancel()
352	if err := edsB.UpdateClientConnState(balancer.ClientConnState{
353		ResolverState:  xdsclient.SetClient(resolver.State{}, xdsC),
354		BalancerConfig: newLBConfigWithOneEDS(testEDSServcie),
355	}); err != nil {
356		t.Fatal(err)
357	}
358
359	if _, err := xdsC.WaitForWatchEDS(ctx); err != nil {
360		t.Fatalf("xdsClient.WatchEndpoints failed with error: %v", err)
361	}
362	xdsC.InvokeWatchEDSCallback("", xdsclient.EndpointsUpdate{}, nil)
363	edsLB, err := waitForNewChildLB(ctx, edsLBCh)
364	if err != nil {
365		t.Fatal(err)
366	}
367	if err := edsLB.waitForClientConnStateChange(ctx); err != nil {
368		t.Fatalf("EDS impl got unexpected update: %v", err)
369	}
370
371	connectionErr := xdsclient.NewErrorf(xdsclient.ErrorTypeConnection, "connection error")
372	edsB.ResolverError(connectionErr)
373
374	sCtx, sCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
375	defer sCancel()
376	if _, err := xdsC.WaitForCancelEDSWatch(sCtx); err != context.DeadlineExceeded {
377		t.Fatal("watch was canceled, want not canceled (timeout error)")
378	}
379
380	sCtx, sCancel = context.WithTimeout(context.Background(), defaultTestShortTimeout)
381	defer sCancel()
382	if err := edsLB.waitForClientConnStateChange(sCtx); err != context.DeadlineExceeded {
383		t.Fatal("eds impl got EDS resp, want timeout error")
384	}
385	if err := edsLB.waitForResolverError(ctx); err != nil {
386		t.Fatalf("want resolver error, got %v", err)
387	}
388
389	resourceErr := xdsclient.NewErrorf(xdsclient.ErrorTypeResourceNotFound, "clusterResolverBalancer resource not found error")
390	edsB.ResolverError(resourceErr)
391	if _, err := xdsC.WaitForCancelEDSWatch(ctx); err != nil {
392		t.Fatalf("want watch to be canceled, waitForCancel failed: %v", err)
393	}
394	if err := edsLB.waitForClientConnStateChange(sCtx); err != context.DeadlineExceeded {
395		t.Fatal(err)
396	}
397	if err := edsLB.waitForResolverError(ctx); err != nil {
398		t.Fatalf("want resolver error, got %v", err)
399	}
400
401	// An update with the same service name should trigger a new watch, because
402	// the previous watch was canceled.
403	if err := edsB.UpdateClientConnState(balancer.ClientConnState{
404		ResolverState:  xdsclient.SetClient(resolver.State{}, xdsC),
405		BalancerConfig: newLBConfigWithOneEDS(testEDSServcie),
406	}); err != nil {
407		t.Fatal(err)
408	}
409	if _, err := xdsC.WaitForWatchEDS(ctx); err != nil {
410		t.Fatalf("xdsClient.WatchEndpoints failed with error: %v", err)
411	}
412}
413
414// Given a list of resource names, verifies that EDS requests for the same are
415// sent by the EDS balancer, through the fake xDS client.
416func verifyExpectedRequests(ctx context.Context, fc *fakeclient.Client, resourceNames ...string) error {
417	for _, name := range resourceNames {
418		if name == "" {
419			// ResourceName empty string indicates a cancel.
420			if _, err := fc.WaitForCancelEDSWatch(ctx); err != nil {
421				return fmt.Errorf("timed out when expecting resource %q", name)
422			}
423			continue
424		}
425
426		resName, err := fc.WaitForWatchEDS(ctx)
427		if err != nil {
428			return fmt.Errorf("timed out when expecting resource %q, %p", name, fc)
429		}
430		if resName != name {
431			return fmt.Errorf("got EDS request for resource %q, expected: %q", resName, name)
432		}
433	}
434	return nil
435}
436
437// TestClientWatchEDS verifies that the xdsClient inside the top-level EDS LB
438// policy registers an EDS watch for expected resource upon receiving an update
439// from gRPC.
440func (s) TestClientWatchEDS(t *testing.T) {
441	edsLBCh := testutils.NewChannel()
442	xdsC, cleanup := setup(edsLBCh)
443	defer cleanup()
444
445	builder := balancer.Get(Name)
446	edsB := builder.Build(newNoopTestClientConn(), balancer.BuildOptions{Target: resolver.Target{Endpoint: testEDSServcie}})
447	if edsB == nil {
448		t.Fatalf("builder.Build(%s) failed and returned nil", Name)
449	}
450	defer edsB.Close()
451
452	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
453	defer cancel()
454	// If eds service name is not set, should watch for cluster name.
455	if err := edsB.UpdateClientConnState(balancer.ClientConnState{
456		ResolverState:  xdsclient.SetClient(resolver.State{}, xdsC),
457		BalancerConfig: newLBConfigWithOneEDS("cluster-1"),
458	}); err != nil {
459		t.Fatal(err)
460	}
461	if err := verifyExpectedRequests(ctx, xdsC, "cluster-1"); err != nil {
462		t.Fatal(err)
463	}
464
465	// Update with an non-empty edsServiceName should trigger an EDS watch for
466	// the same.
467	if err := edsB.UpdateClientConnState(balancer.ClientConnState{
468		ResolverState:  xdsclient.SetClient(resolver.State{}, xdsC),
469		BalancerConfig: newLBConfigWithOneEDS("foobar-1"),
470	}); err != nil {
471		t.Fatal(err)
472	}
473	if err := verifyExpectedRequests(ctx, xdsC, "", "foobar-1"); err != nil {
474		t.Fatal(err)
475	}
476
477	// Also test the case where the edsServerName changes from one non-empty
478	// name to another, and make sure a new watch is registered. The previously
479	// registered watch will be cancelled, which will result in an EDS request
480	// with no resource names being sent to the server.
481	if err := edsB.UpdateClientConnState(balancer.ClientConnState{
482		ResolverState:  xdsclient.SetClient(resolver.State{}, xdsC),
483		BalancerConfig: newLBConfigWithOneEDS("foobar-2"),
484	}); err != nil {
485		t.Fatal(err)
486	}
487	if err := verifyExpectedRequests(ctx, xdsC, "", "foobar-2"); err != nil {
488		t.Fatal(err)
489	}
490}
491
492func newLBConfigWithOneEDS(edsServiceName string) *LBConfig {
493	return &LBConfig{
494		DiscoveryMechanisms: []DiscoveryMechanism{{
495			Cluster:        testClusterName,
496			Type:           DiscoveryMechanismTypeEDS,
497			EDSServiceName: edsServiceName,
498		}},
499	}
500}
501