1/* 2 * 3 * Copyright 2019 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package edsbalancer 20 21import ( 22 "errors" 23 "testing" 24 25 xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" 26 corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" 27 "github.com/golang/protobuf/proto" 28 "github.com/google/go-cmp/cmp" 29 "google.golang.org/grpc" 30 "google.golang.org/grpc/attributes" 31 "google.golang.org/grpc/balancer" 32 "google.golang.org/grpc/resolver" 33 xdsinternal "google.golang.org/grpc/xds/internal" 34 xdsclient "google.golang.org/grpc/xds/internal/client" 35 "google.golang.org/grpc/xds/internal/client/bootstrap" 36 "google.golang.org/grpc/xds/internal/testutils" 37 "google.golang.org/grpc/xds/internal/testutils/fakeclient" 38 "google.golang.org/grpc/xds/internal/testutils/fakeserver" 39) 40 41const ( 42 edsType = "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment" 43) 44 45var ( 46 testServiceName = "test/foo" 47 testEDSClusterName = "test/service/eds" 48) 49 50// TestClientWrapperWatchEDS verifies that the clientWrapper registers an 51// EDS watch for expected resource upon receiving an update from the top-level 52// edsBalancer. 53// 54// The test does the following: 55// * Starts a fake xDS server. 56// * Creates a clientWrapper. 57// * Sends updates with different edsServiceNames and expects new watches to be 58// registered. 59func (s) TestClientWrapperWatchEDS(t *testing.T) { 60 fakeServer, cleanup, err := fakeserver.StartServer() 61 if err != nil { 62 t.Fatalf("Failed to start fake xDS server: %v", err) 63 } 64 defer cleanup() 65 66 cw := newXDSClientWrapper(nil, nil, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}, nil) 67 defer cw.close() 68 69 for _, test := range []struct { 70 name string 71 edsServiceName string 72 wantResourceName string 73 }{ 74 { 75 // Update with an empty edsServiceName should trigger an EDS watch 76 // for the user's dial target. 77 name: "empty-edsServiceName", 78 edsServiceName: "", 79 wantResourceName: testServiceName, 80 }, 81 { 82 // Update with an non-empty edsServiceName should trigger an EDS 83 // watch for the same. 84 name: "first-non-empty-edsServiceName", 85 edsServiceName: "foobar-1", 86 wantResourceName: "foobar-1", 87 }, 88 { 89 // Also test the case where the edsServerName changes from one 90 // non-empty name to another, and make sure a new watch is 91 // registered. 92 name: "second-non-empty-edsServiceName", 93 edsServiceName: "foobar-2", 94 wantResourceName: "foobar-2", 95 }, 96 } { 97 t.Run(test.name, func(t *testing.T) { 98 oldBootstrapConfigNew := bootstrapConfigNew 99 bootstrapConfigNew = func() (*bootstrap.Config, error) { 100 return &bootstrap.Config{ 101 BalancerName: fakeServer.Address, 102 Creds: grpc.WithInsecure(), 103 NodeProto: &corepb.Node{}, 104 }, nil 105 } 106 defer func() { bootstrapConfigNew = oldBootstrapConfigNew }() 107 cw.handleUpdate(&EDSConfig{ 108 BalancerName: fakeServer.Address, 109 EDSServiceName: test.edsServiceName, 110 }, nil) 111 112 req, err := fakeServer.XDSRequestChan.Receive() 113 if err != nil { 114 t.Fatalf("EDS RPC failed with err: %v", err) 115 } 116 edsReq := req.(*fakeserver.Request) 117 if edsReq.Err != nil { 118 t.Fatalf("EDS RPC failed with err: %v", edsReq.Err) 119 } 120 121 wantReq := &xdspb.DiscoveryRequest{ 122 TypeUrl: edsType, 123 ResourceNames: []string{test.wantResourceName}, 124 Node: &corepb.Node{}, 125 } 126 if !proto.Equal(edsReq.Req, wantReq) { 127 t.Fatalf("got EDS request %v, expected: %v, diff: %s", edsReq.Req, wantReq, cmp.Diff(edsReq.Req, wantReq, cmp.Comparer(proto.Equal))) 128 } 129 }) 130 } 131} 132 133// TestClientWrapperHandleUpdateError verifies that the clientWrapper handles 134// errors from the edsWatch callback appropriately. 135// 136// The test does the following: 137// * Creates a clientWrapper. 138// * Creates a fakeclient.Client and passes it to the clientWrapper in attributes. 139// * Verifies the clientWrapper registers an EDS watch. 140// * Forces the fakeclient.Client to invoke the registered EDS watch callback with 141// an error. Verifies that the wrapper does not invoke the top-level 142// edsBalancer with the received error. 143func (s) TestClientWrapperHandleUpdateError(t *testing.T) { 144 edsRespChan := testutils.NewChannel() 145 newEDS := func(update *xdsclient.EDSUpdate) error { 146 edsRespChan.Send(update) 147 return nil 148 } 149 150 cw := newXDSClientWrapper(newEDS, nil, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}, nil) 151 defer cw.close() 152 153 xdsC := fakeclient.NewClient() 154 cw.handleUpdate(&EDSConfig{EDSServiceName: testEDSClusterName}, attributes.New(xdsinternal.XDSClientID, xdsC)) 155 gotCluster, err := xdsC.WaitForWatchEDS() 156 if err != nil { 157 t.Fatalf("xdsClient.WatchEDS failed with error: %v", err) 158 } 159 if gotCluster != testEDSClusterName { 160 t.Fatalf("xdsClient.WatchEDS() called with cluster: %v, want %v", gotCluster, testEDSClusterName) 161 } 162 xdsC.InvokeWatchEDSCallback(nil, errors.New("EDS watch callback error")) 163 164 // The callback is called with an error, expect no update from edsRespChan. 165 // 166 // TODO: check for loseContact() when errors indicating "lose contact" are 167 // handled correctly. 168 if gotUpdate, gotErr := edsRespChan.Receive(); gotErr != testutils.ErrRecvTimeout { 169 t.Fatalf("edsBalancer got edsUpdate {%+v, %v}, when none was expected", gotUpdate, gotErr) 170 } 171} 172 173// TestClientWrapperGetsXDSClientInAttributes verfies the case where the 174// clientWrapper receives the xdsClient to use in the attributes section of the 175// update. 176func (s) TestClientWrapperGetsXDSClientInAttributes(t *testing.T) { 177 oldxdsclientNew := xdsclientNew 178 xdsclientNew = func(_ xdsclient.Options) (xdsClientInterface, error) { 179 t.Fatalf("unexpected call to xdsclientNew when xds_client is set in attributes") 180 return nil, nil 181 } 182 defer func() { xdsclientNew = oldxdsclientNew }() 183 184 cw := newXDSClientWrapper(nil, nil, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}, nil) 185 defer cw.close() 186 187 // Verify that the eds watch is registered for the expected resource name. 188 xdsC1 := fakeclient.NewClient() 189 cw.handleUpdate(&EDSConfig{EDSServiceName: testEDSClusterName}, attributes.New(xdsinternal.XDSClientID, xdsC1)) 190 gotCluster, err := xdsC1.WaitForWatchEDS() 191 if err != nil { 192 t.Fatalf("xdsClient.WatchEDS failed with error: %v", err) 193 } 194 if gotCluster != testEDSClusterName { 195 t.Fatalf("xdsClient.WatchEDS() called with cluster: %v, want %v", gotCluster, testEDSClusterName) 196 } 197 198 // Pass a new client in the attributes. Verify that the watch is 199 // re-registered on the new client, and that the old client is not closed 200 // (because clientWrapper only closes clients that it creates, it does not 201 // close client that are passed through attributes). 202 xdsC2 := fakeclient.NewClient() 203 cw.handleUpdate(&EDSConfig{EDSServiceName: testEDSClusterName}, attributes.New(xdsinternal.XDSClientID, xdsC2)) 204 gotCluster, err = xdsC2.WaitForWatchEDS() 205 if err != nil { 206 t.Fatalf("xdsClient.WatchEDS failed with error: %v", err) 207 } 208 if gotCluster != testEDSClusterName { 209 t.Fatalf("xdsClient.WatchEDS() called with cluster: %v, want %v", gotCluster, testEDSClusterName) 210 } 211 212 if err := xdsC1.WaitForClose(); err != testutils.ErrRecvTimeout { 213 t.Fatalf("clientWrapper closed xdsClient received in attributes") 214 } 215} 216