1// +build go1.12 2 3/* 4 * 5 * Copyright 2019 gRPC authors. 6 * 7 * Licensed under the Apache License, Version 2.0 (the "License"); 8 * you may not use this file except in compliance with the License. 9 * You may obtain a copy of the License at 10 * 11 * http://www.apache.org/licenses/LICENSE-2.0 12 * 13 * Unless required by applicable law or agreed to in writing, software 14 * distributed under the License is distributed on an "AS IS" BASIS, 15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 16 * See the License for the specific language governing permissions and 17 * limitations under the License. 18 * 19 */ 20 21package clusterresolver 22 23import ( 24 "context" 25 "fmt" 26 "testing" 27 "time" 28 29 "github.com/google/go-cmp/cmp" 30 "google.golang.org/grpc/balancer" 31 "google.golang.org/grpc/connectivity" 32 "google.golang.org/grpc/internal/grpctest" 33 "google.golang.org/grpc/internal/testutils" 34 "google.golang.org/grpc/resolver" 35 "google.golang.org/grpc/xds/internal" 36 "google.golang.org/grpc/xds/internal/testutils/fakeclient" 37 "google.golang.org/grpc/xds/internal/xdsclient" 38 39 _ "google.golang.org/grpc/xds/internal/xdsclient/v2" // V2 client registration. 40) 41 42const ( 43 defaultTestTimeout = 1 * time.Second 44 defaultTestShortTimeout = 10 * time.Millisecond 45 testEDSServcie = "test-eds-service-name" 46 testClusterName = "test-cluster-name" 47) 48 49var ( 50 // A non-empty endpoints update which is expected to be accepted by the EDS 51 // LB policy. 52 defaultEndpointsUpdate = xdsclient.EndpointsUpdate{ 53 Localities: []xdsclient.Locality{ 54 { 55 Endpoints: []xdsclient.Endpoint{{Address: "endpoint1"}}, 56 ID: internal.LocalityID{Zone: "zone"}, 57 Priority: 1, 58 Weight: 100, 59 }, 60 }, 61 } 62) 63 64func init() { 65 balancer.Register(bb{}) 66} 67 68type s struct { 69 grpctest.Tester 70 71 cleanup func() 72} 73 74func (ss s) Teardown(t *testing.T) { 75 xdsclient.ClearAllCountersForTesting() 76 ss.Tester.Teardown(t) 77 if ss.cleanup != nil { 78 ss.cleanup() 79 } 80} 81 82func Test(t *testing.T) { 83 grpctest.RunSubTests(t, s{}) 84} 85 86const testBalancerNameFooBar = "foo.bar" 87 88func newNoopTestClientConn() *noopTestClientConn { 89 return &noopTestClientConn{} 90} 91 92// noopTestClientConn is used in EDS balancer config update tests that only 93// cover the config update handling, but not SubConn/load-balancing. 94type noopTestClientConn struct { 95 balancer.ClientConn 96} 97 98func (t *noopTestClientConn) NewSubConn([]resolver.Address, balancer.NewSubConnOptions) (balancer.SubConn, error) { 99 return nil, nil 100} 101 102func (noopTestClientConn) Target() string { return testEDSServcie } 103 104type scStateChange struct { 105 sc balancer.SubConn 106 state balancer.SubConnState 107} 108 109type fakeChildBalancer struct { 110 cc balancer.ClientConn 111 subConnState *testutils.Channel 112 clientConnState *testutils.Channel 113 resolverError *testutils.Channel 114} 115 116func (f *fakeChildBalancer) UpdateClientConnState(state balancer.ClientConnState) error { 117 f.clientConnState.Send(state) 118 return nil 119} 120 121func (f *fakeChildBalancer) ResolverError(err error) { 122 f.resolverError.Send(err) 123} 124 125func (f *fakeChildBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) { 126 f.subConnState.Send(&scStateChange{sc: sc, state: state}) 127} 128 129func (f *fakeChildBalancer) Close() {} 130 131func (f *fakeChildBalancer) waitForClientConnStateChange(ctx context.Context) error { 132 _, err := f.clientConnState.Receive(ctx) 133 if err != nil { 134 return err 135 } 136 return nil 137} 138 139func (f *fakeChildBalancer) waitForResolverError(ctx context.Context) error { 140 _, err := f.resolverError.Receive(ctx) 141 if err != nil { 142 return err 143 } 144 return nil 145} 146 147func (f *fakeChildBalancer) waitForSubConnStateChange(ctx context.Context, wantState *scStateChange) error { 148 val, err := f.subConnState.Receive(ctx) 149 if err != nil { 150 return err 151 } 152 gotState := val.(*scStateChange) 153 if !cmp.Equal(gotState, wantState, cmp.AllowUnexported(scStateChange{})) { 154 return fmt.Errorf("got subconnStateChange %v, want %v", gotState, wantState) 155 } 156 return nil 157} 158 159func newFakeChildBalancer(cc balancer.ClientConn) balancer.Balancer { 160 return &fakeChildBalancer{ 161 cc: cc, 162 subConnState: testutils.NewChannelWithSize(10), 163 clientConnState: testutils.NewChannelWithSize(10), 164 resolverError: testutils.NewChannelWithSize(10), 165 } 166} 167 168type fakeSubConn struct{} 169 170func (*fakeSubConn) UpdateAddresses([]resolver.Address) { panic("implement me") } 171func (*fakeSubConn) Connect() { panic("implement me") } 172 173// waitForNewChildLB makes sure that a new child LB is created by the top-level 174// clusterResolverBalancer. 175func waitForNewChildLB(ctx context.Context, ch *testutils.Channel) (*fakeChildBalancer, error) { 176 val, err := ch.Receive(ctx) 177 if err != nil { 178 return nil, fmt.Errorf("error when waiting for a new edsLB: %v", err) 179 } 180 return val.(*fakeChildBalancer), nil 181} 182 183// setup overrides the functions which are used to create the xdsClient and the 184// edsLB, creates fake version of them and makes them available on the provided 185// channels. The returned cancel function should be called by the test for 186// cleanup. 187func setup(childLBCh *testutils.Channel) (*fakeclient.Client, func()) { 188 xdsC := fakeclient.NewClientWithName(testBalancerNameFooBar) 189 190 origNewChildBalancer := newChildBalancer 191 newChildBalancer = func(_ balancer.Builder, cc balancer.ClientConn, _ balancer.BuildOptions) balancer.Balancer { 192 childLB := newFakeChildBalancer(cc) 193 defer func() { childLBCh.Send(childLB) }() 194 return childLB 195 } 196 return xdsC, func() { 197 newChildBalancer = origNewChildBalancer 198 xdsC.Close() 199 } 200} 201 202// TestSubConnStateChange verifies if the top-level clusterResolverBalancer passes on 203// the subConnState to appropriate child balancer. 204func (s) TestSubConnStateChange(t *testing.T) { 205 edsLBCh := testutils.NewChannel() 206 xdsC, cleanup := setup(edsLBCh) 207 defer cleanup() 208 209 builder := balancer.Get(Name) 210 edsB := builder.Build(newNoopTestClientConn(), balancer.BuildOptions{Target: resolver.Target{Endpoint: testEDSServcie}}) 211 if edsB == nil { 212 t.Fatalf("builder.Build(%s) failed and returned nil", Name) 213 } 214 defer edsB.Close() 215 216 if err := edsB.UpdateClientConnState(balancer.ClientConnState{ 217 ResolverState: xdsclient.SetClient(resolver.State{}, xdsC), 218 BalancerConfig: newLBConfigWithOneEDS(testEDSServcie), 219 }); err != nil { 220 t.Fatalf("edsB.UpdateClientConnState() failed: %v", err) 221 } 222 223 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) 224 defer cancel() 225 if _, err := xdsC.WaitForWatchEDS(ctx); err != nil { 226 t.Fatalf("xdsClient.WatchEndpoints failed with error: %v", err) 227 } 228 xdsC.InvokeWatchEDSCallback("", defaultEndpointsUpdate, nil) 229 edsLB, err := waitForNewChildLB(ctx, edsLBCh) 230 if err != nil { 231 t.Fatal(err) 232 } 233 234 fsc := &fakeSubConn{} 235 state := balancer.SubConnState{ConnectivityState: connectivity.Ready} 236 edsB.UpdateSubConnState(fsc, state) 237 if err := edsLB.waitForSubConnStateChange(ctx, &scStateChange{sc: fsc, state: state}); err != nil { 238 t.Fatal(err) 239 } 240} 241 242// TestErrorFromXDSClientUpdate verifies that an error from xdsClient update is 243// handled correctly. 244// 245// If it's resource-not-found, watch will NOT be canceled, the EDS impl will 246// receive an empty EDS update, and new RPCs will fail. 247// 248// If it's connection error, nothing will happen. This will need to change to 249// handle fallback. 250func (s) TestErrorFromXDSClientUpdate(t *testing.T) { 251 edsLBCh := testutils.NewChannel() 252 xdsC, cleanup := setup(edsLBCh) 253 defer cleanup() 254 255 builder := balancer.Get(Name) 256 edsB := builder.Build(newNoopTestClientConn(), balancer.BuildOptions{Target: resolver.Target{Endpoint: testEDSServcie}}) 257 if edsB == nil { 258 t.Fatalf("builder.Build(%s) failed and returned nil", Name) 259 } 260 defer edsB.Close() 261 262 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) 263 defer cancel() 264 if err := edsB.UpdateClientConnState(balancer.ClientConnState{ 265 ResolverState: xdsclient.SetClient(resolver.State{}, xdsC), 266 BalancerConfig: newLBConfigWithOneEDS(testEDSServcie), 267 }); err != nil { 268 t.Fatal(err) 269 } 270 if _, err := xdsC.WaitForWatchEDS(ctx); err != nil { 271 t.Fatalf("xdsClient.WatchEndpoints failed with error: %v", err) 272 } 273 xdsC.InvokeWatchEDSCallback("", xdsclient.EndpointsUpdate{}, nil) 274 edsLB, err := waitForNewChildLB(ctx, edsLBCh) 275 if err != nil { 276 t.Fatal(err) 277 } 278 if err := edsLB.waitForClientConnStateChange(ctx); err != nil { 279 t.Fatalf("EDS impl got unexpected update: %v", err) 280 } 281 282 connectionErr := xdsclient.NewErrorf(xdsclient.ErrorTypeConnection, "connection error") 283 xdsC.InvokeWatchEDSCallback("", xdsclient.EndpointsUpdate{}, connectionErr) 284 285 sCtx, sCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) 286 defer sCancel() 287 if _, err := xdsC.WaitForCancelEDSWatch(sCtx); err != context.DeadlineExceeded { 288 t.Fatal("watch was canceled, want not canceled (timeout error)") 289 } 290 291 sCtx, sCancel = context.WithTimeout(context.Background(), defaultTestShortTimeout) 292 defer sCancel() 293 if err := edsLB.waitForClientConnStateChange(sCtx); err != context.DeadlineExceeded { 294 t.Fatal(err) 295 } 296 if err := edsLB.waitForResolverError(ctx); err != nil { 297 t.Fatalf("want resolver error, got %v", err) 298 } 299 300 resourceErr := xdsclient.NewErrorf(xdsclient.ErrorTypeResourceNotFound, "clusterResolverBalancer resource not found error") 301 xdsC.InvokeWatchEDSCallback("", xdsclient.EndpointsUpdate{}, resourceErr) 302 // Even if error is resource not found, watch shouldn't be canceled, because 303 // this is an EDS resource removed (and xds client actually never sends this 304 // error, but we still handles it). 305 sCtx, sCancel = context.WithTimeout(context.Background(), defaultTestShortTimeout) 306 defer sCancel() 307 if _, err := xdsC.WaitForCancelEDSWatch(sCtx); err != context.DeadlineExceeded { 308 t.Fatal("watch was canceled, want not canceled (timeout error)") 309 } 310 if err := edsLB.waitForClientConnStateChange(sCtx); err != context.DeadlineExceeded { 311 t.Fatal(err) 312 } 313 if err := edsLB.waitForResolverError(ctx); err != nil { 314 t.Fatalf("want resolver error, got %v", err) 315 } 316 317 // An update with the same service name should not trigger a new watch. 318 if err := edsB.UpdateClientConnState(balancer.ClientConnState{ 319 ResolverState: xdsclient.SetClient(resolver.State{}, xdsC), 320 BalancerConfig: newLBConfigWithOneEDS(testEDSServcie), 321 }); err != nil { 322 t.Fatal(err) 323 } 324 sCtx, sCancel = context.WithTimeout(context.Background(), defaultTestShortTimeout) 325 defer sCancel() 326 if _, err := xdsC.WaitForWatchEDS(sCtx); err != context.DeadlineExceeded { 327 t.Fatal("got unexpected new EDS watch") 328 } 329} 330 331// TestErrorFromResolver verifies that resolver errors are handled correctly. 332// 333// If it's resource-not-found, watch will be canceled, the EDS impl will receive 334// an empty EDS update, and new RPCs will fail. 335// 336// If it's connection error, nothing will happen. This will need to change to 337// handle fallback. 338func (s) TestErrorFromResolver(t *testing.T) { 339 edsLBCh := testutils.NewChannel() 340 xdsC, cleanup := setup(edsLBCh) 341 defer cleanup() 342 343 builder := balancer.Get(Name) 344 edsB := builder.Build(newNoopTestClientConn(), balancer.BuildOptions{Target: resolver.Target{Endpoint: testEDSServcie}}) 345 if edsB == nil { 346 t.Fatalf("builder.Build(%s) failed and returned nil", Name) 347 } 348 defer edsB.Close() 349 350 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) 351 defer cancel() 352 if err := edsB.UpdateClientConnState(balancer.ClientConnState{ 353 ResolverState: xdsclient.SetClient(resolver.State{}, xdsC), 354 BalancerConfig: newLBConfigWithOneEDS(testEDSServcie), 355 }); err != nil { 356 t.Fatal(err) 357 } 358 359 if _, err := xdsC.WaitForWatchEDS(ctx); err != nil { 360 t.Fatalf("xdsClient.WatchEndpoints failed with error: %v", err) 361 } 362 xdsC.InvokeWatchEDSCallback("", xdsclient.EndpointsUpdate{}, nil) 363 edsLB, err := waitForNewChildLB(ctx, edsLBCh) 364 if err != nil { 365 t.Fatal(err) 366 } 367 if err := edsLB.waitForClientConnStateChange(ctx); err != nil { 368 t.Fatalf("EDS impl got unexpected update: %v", err) 369 } 370 371 connectionErr := xdsclient.NewErrorf(xdsclient.ErrorTypeConnection, "connection error") 372 edsB.ResolverError(connectionErr) 373 374 sCtx, sCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) 375 defer sCancel() 376 if _, err := xdsC.WaitForCancelEDSWatch(sCtx); err != context.DeadlineExceeded { 377 t.Fatal("watch was canceled, want not canceled (timeout error)") 378 } 379 380 sCtx, sCancel = context.WithTimeout(context.Background(), defaultTestShortTimeout) 381 defer sCancel() 382 if err := edsLB.waitForClientConnStateChange(sCtx); err != context.DeadlineExceeded { 383 t.Fatal("eds impl got EDS resp, want timeout error") 384 } 385 if err := edsLB.waitForResolverError(ctx); err != nil { 386 t.Fatalf("want resolver error, got %v", err) 387 } 388 389 resourceErr := xdsclient.NewErrorf(xdsclient.ErrorTypeResourceNotFound, "clusterResolverBalancer resource not found error") 390 edsB.ResolverError(resourceErr) 391 if _, err := xdsC.WaitForCancelEDSWatch(ctx); err != nil { 392 t.Fatalf("want watch to be canceled, waitForCancel failed: %v", err) 393 } 394 if err := edsLB.waitForClientConnStateChange(sCtx); err != context.DeadlineExceeded { 395 t.Fatal(err) 396 } 397 if err := edsLB.waitForResolverError(ctx); err != nil { 398 t.Fatalf("want resolver error, got %v", err) 399 } 400 401 // An update with the same service name should trigger a new watch, because 402 // the previous watch was canceled. 403 if err := edsB.UpdateClientConnState(balancer.ClientConnState{ 404 ResolverState: xdsclient.SetClient(resolver.State{}, xdsC), 405 BalancerConfig: newLBConfigWithOneEDS(testEDSServcie), 406 }); err != nil { 407 t.Fatal(err) 408 } 409 if _, err := xdsC.WaitForWatchEDS(ctx); err != nil { 410 t.Fatalf("xdsClient.WatchEndpoints failed with error: %v", err) 411 } 412} 413 414// Given a list of resource names, verifies that EDS requests for the same are 415// sent by the EDS balancer, through the fake xDS client. 416func verifyExpectedRequests(ctx context.Context, fc *fakeclient.Client, resourceNames ...string) error { 417 for _, name := range resourceNames { 418 if name == "" { 419 // ResourceName empty string indicates a cancel. 420 if _, err := fc.WaitForCancelEDSWatch(ctx); err != nil { 421 return fmt.Errorf("timed out when expecting resource %q", name) 422 } 423 continue 424 } 425 426 resName, err := fc.WaitForWatchEDS(ctx) 427 if err != nil { 428 return fmt.Errorf("timed out when expecting resource %q, %p", name, fc) 429 } 430 if resName != name { 431 return fmt.Errorf("got EDS request for resource %q, expected: %q", resName, name) 432 } 433 } 434 return nil 435} 436 437// TestClientWatchEDS verifies that the xdsClient inside the top-level EDS LB 438// policy registers an EDS watch for expected resource upon receiving an update 439// from gRPC. 440func (s) TestClientWatchEDS(t *testing.T) { 441 edsLBCh := testutils.NewChannel() 442 xdsC, cleanup := setup(edsLBCh) 443 defer cleanup() 444 445 builder := balancer.Get(Name) 446 edsB := builder.Build(newNoopTestClientConn(), balancer.BuildOptions{Target: resolver.Target{Endpoint: testEDSServcie}}) 447 if edsB == nil { 448 t.Fatalf("builder.Build(%s) failed and returned nil", Name) 449 } 450 defer edsB.Close() 451 452 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) 453 defer cancel() 454 // If eds service name is not set, should watch for cluster name. 455 if err := edsB.UpdateClientConnState(balancer.ClientConnState{ 456 ResolverState: xdsclient.SetClient(resolver.State{}, xdsC), 457 BalancerConfig: newLBConfigWithOneEDS("cluster-1"), 458 }); err != nil { 459 t.Fatal(err) 460 } 461 if err := verifyExpectedRequests(ctx, xdsC, "cluster-1"); err != nil { 462 t.Fatal(err) 463 } 464 465 // Update with an non-empty edsServiceName should trigger an EDS watch for 466 // the same. 467 if err := edsB.UpdateClientConnState(balancer.ClientConnState{ 468 ResolverState: xdsclient.SetClient(resolver.State{}, xdsC), 469 BalancerConfig: newLBConfigWithOneEDS("foobar-1"), 470 }); err != nil { 471 t.Fatal(err) 472 } 473 if err := verifyExpectedRequests(ctx, xdsC, "", "foobar-1"); err != nil { 474 t.Fatal(err) 475 } 476 477 // Also test the case where the edsServerName changes from one non-empty 478 // name to another, and make sure a new watch is registered. The previously 479 // registered watch will be cancelled, which will result in an EDS request 480 // with no resource names being sent to the server. 481 if err := edsB.UpdateClientConnState(balancer.ClientConnState{ 482 ResolverState: xdsclient.SetClient(resolver.State{}, xdsC), 483 BalancerConfig: newLBConfigWithOneEDS("foobar-2"), 484 }); err != nil { 485 t.Fatal(err) 486 } 487 if err := verifyExpectedRequests(ctx, xdsC, "", "foobar-2"); err != nil { 488 t.Fatal(err) 489 } 490} 491 492func newLBConfigWithOneEDS(edsServiceName string) *LBConfig { 493 return &LBConfig{ 494 DiscoveryMechanisms: []DiscoveryMechanism{{ 495 Cluster: testClusterName, 496 Type: DiscoveryMechanismTypeEDS, 497 EDSServiceName: edsServiceName, 498 }}, 499 } 500} 501