1/*
2 *
3 * Copyright 2019 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package edsbalancer
20
21import (
22	"errors"
23	"testing"
24
25	xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
26	corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
27	"github.com/golang/protobuf/proto"
28	"github.com/google/go-cmp/cmp"
29	"google.golang.org/grpc"
30	"google.golang.org/grpc/attributes"
31	"google.golang.org/grpc/balancer"
32	"google.golang.org/grpc/resolver"
33	xdsinternal "google.golang.org/grpc/xds/internal"
34	xdsclient "google.golang.org/grpc/xds/internal/client"
35	"google.golang.org/grpc/xds/internal/client/bootstrap"
36	"google.golang.org/grpc/xds/internal/testutils"
37	"google.golang.org/grpc/xds/internal/testutils/fakeclient"
38	"google.golang.org/grpc/xds/internal/testutils/fakeserver"
39)
40
41const (
42	edsType = "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment"
43)
44
45var (
46	testServiceName    = "test/foo"
47	testEDSClusterName = "test/service/eds"
48)
49
50// TestClientWrapperWatchEDS verifies that the clientWrapper registers an
51// EDS watch for expected resource upon receiving an update from the top-level
52// edsBalancer.
53//
54// The test does the following:
55// * Starts a fake xDS server.
56// * Creates a clientWrapper.
57// * Sends updates with different edsServiceNames and expects new watches to be
58//   registered.
59func (s) TestClientWrapperWatchEDS(t *testing.T) {
60	fakeServer, cleanup, err := fakeserver.StartServer()
61	if err != nil {
62		t.Fatalf("Failed to start fake xDS server: %v", err)
63	}
64	defer cleanup()
65
66	cw := newXDSClientWrapper(nil, nil, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}, nil)
67	defer cw.close()
68
69	for _, test := range []struct {
70		name             string
71		edsServiceName   string
72		wantResourceName string
73	}{
74		{
75			// Update with an empty edsServiceName should trigger an EDS watch
76			// for the user's dial target.
77			name:             "empty-edsServiceName",
78			edsServiceName:   "",
79			wantResourceName: testServiceName,
80		},
81		{
82			// Update with an non-empty edsServiceName should trigger an EDS
83			// watch for the same.
84			name:             "first-non-empty-edsServiceName",
85			edsServiceName:   "foobar-1",
86			wantResourceName: "foobar-1",
87		},
88		{
89			// Also test the case where the edsServerName changes from one
90			// non-empty name to another, and make sure a new watch is
91			// registered.
92			name:             "second-non-empty-edsServiceName",
93			edsServiceName:   "foobar-2",
94			wantResourceName: "foobar-2",
95		},
96	} {
97		t.Run(test.name, func(t *testing.T) {
98			oldBootstrapConfigNew := bootstrapConfigNew
99			bootstrapConfigNew = func() (*bootstrap.Config, error) {
100				return &bootstrap.Config{
101					BalancerName: fakeServer.Address,
102					Creds:        grpc.WithInsecure(),
103					NodeProto:    &corepb.Node{},
104				}, nil
105			}
106			defer func() { bootstrapConfigNew = oldBootstrapConfigNew }()
107			cw.handleUpdate(&EDSConfig{
108				BalancerName:   fakeServer.Address,
109				EDSServiceName: test.edsServiceName,
110			}, nil)
111
112			req, err := fakeServer.XDSRequestChan.Receive()
113			if err != nil {
114				t.Fatalf("EDS RPC failed with err: %v", err)
115			}
116			edsReq := req.(*fakeserver.Request)
117			if edsReq.Err != nil {
118				t.Fatalf("EDS RPC failed with err: %v", edsReq.Err)
119			}
120
121			wantReq := &xdspb.DiscoveryRequest{
122				TypeUrl:       edsType,
123				ResourceNames: []string{test.wantResourceName},
124				Node:          &corepb.Node{},
125			}
126			if !proto.Equal(edsReq.Req, wantReq) {
127				t.Fatalf("got EDS request %v, expected: %v, diff: %s", edsReq.Req, wantReq, cmp.Diff(edsReq.Req, wantReq, cmp.Comparer(proto.Equal)))
128			}
129		})
130	}
131}
132
133// TestClientWrapperHandleUpdateError verifies that the clientWrapper handles
134// errors from the edsWatch callback appropriately.
135//
136// The test does the following:
137// * Creates a clientWrapper.
138// * Creates a fakeclient.Client and passes it to the clientWrapper in attributes.
139// * Verifies the clientWrapper registers an EDS watch.
140// * Forces the fakeclient.Client to invoke the registered EDS watch callback with
141//   an error. Verifies that the wrapper does not invoke the top-level
142//   edsBalancer with the received error.
143func (s) TestClientWrapperHandleUpdateError(t *testing.T) {
144	edsRespChan := testutils.NewChannel()
145	newEDS := func(update *xdsclient.EDSUpdate) error {
146		edsRespChan.Send(update)
147		return nil
148	}
149
150	cw := newXDSClientWrapper(newEDS, nil, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}, nil)
151	defer cw.close()
152
153	xdsC := fakeclient.NewClient()
154	cw.handleUpdate(&EDSConfig{EDSServiceName: testEDSClusterName}, attributes.New(xdsinternal.XDSClientID, xdsC))
155	gotCluster, err := xdsC.WaitForWatchEDS()
156	if err != nil {
157		t.Fatalf("xdsClient.WatchEDS failed with error: %v", err)
158	}
159	if gotCluster != testEDSClusterName {
160		t.Fatalf("xdsClient.WatchEDS() called with cluster: %v, want %v", gotCluster, testEDSClusterName)
161	}
162	xdsC.InvokeWatchEDSCallback(nil, errors.New("EDS watch callback error"))
163
164	// The callback is called with an error, expect no update from edsRespChan.
165	//
166	// TODO: check for loseContact() when errors indicating "lose contact" are
167	// handled correctly.
168	if gotUpdate, gotErr := edsRespChan.Receive(); gotErr != testutils.ErrRecvTimeout {
169		t.Fatalf("edsBalancer got edsUpdate {%+v, %v}, when none was expected", gotUpdate, gotErr)
170	}
171}
172
173// TestClientWrapperGetsXDSClientInAttributes verfies the case where the
174// clientWrapper receives the xdsClient to use in the attributes section of the
175// update.
176func (s) TestClientWrapperGetsXDSClientInAttributes(t *testing.T) {
177	oldxdsclientNew := xdsclientNew
178	xdsclientNew = func(_ xdsclient.Options) (xdsClientInterface, error) {
179		t.Fatalf("unexpected call to xdsclientNew when xds_client is set in attributes")
180		return nil, nil
181	}
182	defer func() { xdsclientNew = oldxdsclientNew }()
183
184	cw := newXDSClientWrapper(nil, nil, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}, nil)
185	defer cw.close()
186
187	// Verify that the eds watch is registered for the expected resource name.
188	xdsC1 := fakeclient.NewClient()
189	cw.handleUpdate(&EDSConfig{EDSServiceName: testEDSClusterName}, attributes.New(xdsinternal.XDSClientID, xdsC1))
190	gotCluster, err := xdsC1.WaitForWatchEDS()
191	if err != nil {
192		t.Fatalf("xdsClient.WatchEDS failed with error: %v", err)
193	}
194	if gotCluster != testEDSClusterName {
195		t.Fatalf("xdsClient.WatchEDS() called with cluster: %v, want %v", gotCluster, testEDSClusterName)
196	}
197
198	// Pass a new client in the attributes. Verify that the watch is
199	// re-registered on the new client, and that the old client is not closed
200	// (because clientWrapper only closes clients that it creates, it does not
201	// close client that are passed through attributes).
202	xdsC2 := fakeclient.NewClient()
203	cw.handleUpdate(&EDSConfig{EDSServiceName: testEDSClusterName}, attributes.New(xdsinternal.XDSClientID, xdsC2))
204	gotCluster, err = xdsC2.WaitForWatchEDS()
205	if err != nil {
206		t.Fatalf("xdsClient.WatchEDS failed with error: %v", err)
207	}
208	if gotCluster != testEDSClusterName {
209		t.Fatalf("xdsClient.WatchEDS() called with cluster: %v, want %v", gotCluster, testEDSClusterName)
210	}
211
212	if err := xdsC1.WaitForClose(); err != testutils.ErrRecvTimeout {
213		t.Fatalf("clientWrapper closed xdsClient received in attributes")
214	}
215}
216