1// +build go1.12 2 3/* 4 * 5 * Copyright 2020 gRPC authors. 6 * 7 * Licensed under the Apache License, Version 2.0 (the "License"); 8 * you may not use this file except in compliance with the License. 9 * You may obtain a copy of the License at 10 * 11 * http://www.apache.org/licenses/LICENSE-2.0 12 * 13 * Unless required by applicable law or agreed to in writing, software 14 * distributed under the License is distributed on an "AS IS" BASIS, 15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 16 * See the License for the specific language governing permissions and 17 * limitations under the License. 18 * 19 */ 20 21package client 22 23import ( 24 "context" 25 "testing" 26 27 "github.com/google/go-cmp/cmp" 28 29 "google.golang.org/grpc/internal/testutils" 30) 31 32type clusterUpdateErr struct { 33 u ClusterUpdate 34 err error 35} 36 37// TestClusterWatch covers the cases: 38// - an update is received after a watch() 39// - an update for another resource name 40// - an update is received after cancel() 41func (s) TestClusterWatch(t *testing.T) { 42 apiClientCh, cleanup := overrideNewAPIClient() 43 defer cleanup() 44 45 client, err := newWithConfig(clientOpts(testXDSServer, false)) 46 if err != nil { 47 t.Fatalf("failed to create client: %v", err) 48 } 49 defer client.Close() 50 51 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) 52 defer cancel() 53 c, err := apiClientCh.Receive(ctx) 54 if err != nil { 55 t.Fatalf("timeout when waiting for API client to be created: %v", err) 56 } 57 apiClient := c.(*testAPIClient) 58 59 clusterUpdateCh := testutils.NewChannel() 60 cancelWatch := client.WatchCluster(testCDSName, func(update ClusterUpdate, err error) { 61 clusterUpdateCh.Send(clusterUpdateErr{u: update, err: err}) 62 }) 63 if _, err := apiClient.addWatches[ClusterResource].Receive(ctx); err != nil { 64 t.Fatalf("want new watch to start, got error %v", err) 65 } 66 67 wantUpdate := ClusterUpdate{ServiceName: testEDSName} 68 client.NewClusters(map[string]ClusterUpdate{testCDSName: wantUpdate}, UpdateMetadata{}) 69 if err := verifyClusterUpdate(ctx, clusterUpdateCh, wantUpdate); err != nil { 70 t.Fatal(err) 71 } 72 73 // Another update, with an extra resource for a different resource name. 74 client.NewClusters(map[string]ClusterUpdate{ 75 testCDSName: wantUpdate, 76 "randomName": {}, 77 }, UpdateMetadata{}) 78 if err := verifyClusterUpdate(ctx, clusterUpdateCh, wantUpdate); err != nil { 79 t.Fatal(err) 80 } 81 82 // Cancel watch, and send update again. 83 cancelWatch() 84 client.NewClusters(map[string]ClusterUpdate{testCDSName: wantUpdate}, UpdateMetadata{}) 85 sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) 86 defer sCancel() 87 if u, err := clusterUpdateCh.Receive(sCtx); err != context.DeadlineExceeded { 88 t.Errorf("unexpected clusterUpdate: %v, %v, want channel recv timeout", u, err) 89 } 90} 91 92// TestClusterTwoWatchSameResourceName covers the case where an update is received 93// after two watch() for the same resource name. 94func (s) TestClusterTwoWatchSameResourceName(t *testing.T) { 95 apiClientCh, cleanup := overrideNewAPIClient() 96 defer cleanup() 97 98 client, err := newWithConfig(clientOpts(testXDSServer, false)) 99 if err != nil { 100 t.Fatalf("failed to create client: %v", err) 101 } 102 defer client.Close() 103 104 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) 105 defer cancel() 106 c, err := apiClientCh.Receive(ctx) 107 if err != nil { 108 t.Fatalf("timeout when waiting for API client to be created: %v", err) 109 } 110 apiClient := c.(*testAPIClient) 111 112 var clusterUpdateChs []*testutils.Channel 113 var cancelLastWatch func() 114 const count = 2 115 for i := 0; i < count; i++ { 116 clusterUpdateCh := testutils.NewChannel() 117 clusterUpdateChs = append(clusterUpdateChs, clusterUpdateCh) 118 cancelLastWatch = client.WatchCluster(testCDSName, func(update ClusterUpdate, err error) { 119 clusterUpdateCh.Send(clusterUpdateErr{u: update, err: err}) 120 }) 121 122 if i == 0 { 123 // A new watch is registered on the underlying API client only for 124 // the first iteration because we are using the same resource name. 125 if _, err := apiClient.addWatches[ClusterResource].Receive(ctx); err != nil { 126 t.Fatalf("want new watch to start, got error %v", err) 127 } 128 } 129 } 130 131 wantUpdate := ClusterUpdate{ServiceName: testEDSName} 132 client.NewClusters(map[string]ClusterUpdate{testCDSName: wantUpdate}, UpdateMetadata{}) 133 for i := 0; i < count; i++ { 134 if err := verifyClusterUpdate(ctx, clusterUpdateChs[i], wantUpdate); err != nil { 135 t.Fatal(err) 136 } 137 } 138 139 // Cancel the last watch, and send update again. 140 cancelLastWatch() 141 client.NewClusters(map[string]ClusterUpdate{testCDSName: wantUpdate}, UpdateMetadata{}) 142 for i := 0; i < count-1; i++ { 143 if err := verifyClusterUpdate(ctx, clusterUpdateChs[i], wantUpdate); err != nil { 144 t.Fatal(err) 145 } 146 } 147 148 sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) 149 defer sCancel() 150 if u, err := clusterUpdateChs[count-1].Receive(sCtx); err != context.DeadlineExceeded { 151 t.Errorf("unexpected clusterUpdate: %v, %v, want channel recv timeout", u, err) 152 } 153} 154 155// TestClusterThreeWatchDifferentResourceName covers the case where an update is 156// received after three watch() for different resource names. 157func (s) TestClusterThreeWatchDifferentResourceName(t *testing.T) { 158 apiClientCh, cleanup := overrideNewAPIClient() 159 defer cleanup() 160 161 client, err := newWithConfig(clientOpts(testXDSServer, false)) 162 if err != nil { 163 t.Fatalf("failed to create client: %v", err) 164 } 165 defer client.Close() 166 167 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) 168 defer cancel() 169 c, err := apiClientCh.Receive(ctx) 170 if err != nil { 171 t.Fatalf("timeout when waiting for API client to be created: %v", err) 172 } 173 apiClient := c.(*testAPIClient) 174 175 // Two watches for the same name. 176 var clusterUpdateChs []*testutils.Channel 177 const count = 2 178 for i := 0; i < count; i++ { 179 clusterUpdateCh := testutils.NewChannel() 180 clusterUpdateChs = append(clusterUpdateChs, clusterUpdateCh) 181 client.WatchCluster(testCDSName+"1", func(update ClusterUpdate, err error) { 182 clusterUpdateCh.Send(clusterUpdateErr{u: update, err: err}) 183 }) 184 185 if i == 0 { 186 // A new watch is registered on the underlying API client only for 187 // the first iteration because we are using the same resource name. 188 if _, err := apiClient.addWatches[ClusterResource].Receive(ctx); err != nil { 189 t.Fatalf("want new watch to start, got error %v", err) 190 } 191 } 192 } 193 194 // Third watch for a different name. 195 clusterUpdateCh2 := testutils.NewChannel() 196 client.WatchCluster(testCDSName+"2", func(update ClusterUpdate, err error) { 197 clusterUpdateCh2.Send(clusterUpdateErr{u: update, err: err}) 198 }) 199 if _, err := apiClient.addWatches[ClusterResource].Receive(ctx); err != nil { 200 t.Fatalf("want new watch to start, got error %v", err) 201 } 202 203 wantUpdate1 := ClusterUpdate{ServiceName: testEDSName + "1"} 204 wantUpdate2 := ClusterUpdate{ServiceName: testEDSName + "2"} 205 client.NewClusters(map[string]ClusterUpdate{ 206 testCDSName + "1": wantUpdate1, 207 testCDSName + "2": wantUpdate2, 208 }, UpdateMetadata{}) 209 210 for i := 0; i < count; i++ { 211 if err := verifyClusterUpdate(ctx, clusterUpdateChs[i], wantUpdate1); err != nil { 212 t.Fatal(err) 213 } 214 } 215 if err := verifyClusterUpdate(ctx, clusterUpdateCh2, wantUpdate2); err != nil { 216 t.Fatal(err) 217 } 218} 219 220// TestClusterWatchAfterCache covers the case where watch is called after the update 221// is in cache. 222func (s) TestClusterWatchAfterCache(t *testing.T) { 223 apiClientCh, cleanup := overrideNewAPIClient() 224 defer cleanup() 225 226 client, err := newWithConfig(clientOpts(testXDSServer, false)) 227 if err != nil { 228 t.Fatalf("failed to create client: %v", err) 229 } 230 defer client.Close() 231 232 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) 233 defer cancel() 234 c, err := apiClientCh.Receive(ctx) 235 if err != nil { 236 t.Fatalf("timeout when waiting for API client to be created: %v", err) 237 } 238 apiClient := c.(*testAPIClient) 239 240 clusterUpdateCh := testutils.NewChannel() 241 client.WatchCluster(testCDSName, func(update ClusterUpdate, err error) { 242 clusterUpdateCh.Send(clusterUpdateErr{u: update, err: err}) 243 }) 244 if _, err := apiClient.addWatches[ClusterResource].Receive(ctx); err != nil { 245 t.Fatalf("want new watch to start, got error %v", err) 246 } 247 248 wantUpdate := ClusterUpdate{ServiceName: testEDSName} 249 client.NewClusters(map[string]ClusterUpdate{ 250 testCDSName: wantUpdate, 251 }, UpdateMetadata{}) 252 if err := verifyClusterUpdate(ctx, clusterUpdateCh, wantUpdate); err != nil { 253 t.Fatal(err) 254 } 255 256 // Another watch for the resource in cache. 257 clusterUpdateCh2 := testutils.NewChannel() 258 client.WatchCluster(testCDSName, func(update ClusterUpdate, err error) { 259 clusterUpdateCh2.Send(clusterUpdateErr{u: update, err: err}) 260 }) 261 sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) 262 defer sCancel() 263 if n, err := apiClient.addWatches[ClusterResource].Receive(sCtx); err != context.DeadlineExceeded { 264 t.Fatalf("want no new watch to start (recv timeout), got resource name: %v error %v", n, err) 265 } 266 267 // New watch should receives the update. 268 if err := verifyClusterUpdate(ctx, clusterUpdateCh2, wantUpdate); err != nil { 269 t.Fatal(err) 270 } 271 272 // Old watch should see nothing. 273 sCtx, sCancel = context.WithTimeout(ctx, defaultTestShortTimeout) 274 defer sCancel() 275 if u, err := clusterUpdateCh.Receive(sCtx); err != context.DeadlineExceeded { 276 t.Errorf("unexpected clusterUpdate: %v, %v, want channel recv timeout", u, err) 277 } 278} 279 280// TestClusterWatchExpiryTimer tests the case where the client does not receive 281// an CDS response for the request that it sends out. We want the watch callback 282// to be invoked with an error once the watchExpiryTimer fires. 283func (s) TestClusterWatchExpiryTimer(t *testing.T) { 284 apiClientCh, cleanup := overrideNewAPIClient() 285 defer cleanup() 286 287 client, err := newWithConfig(clientOpts(testXDSServer, true)) 288 if err != nil { 289 t.Fatalf("failed to create client: %v", err) 290 } 291 defer client.Close() 292 293 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) 294 defer cancel() 295 c, err := apiClientCh.Receive(ctx) 296 if err != nil { 297 t.Fatalf("timeout when waiting for API client to be created: %v", err) 298 } 299 apiClient := c.(*testAPIClient) 300 301 clusterUpdateCh := testutils.NewChannel() 302 client.WatchCluster(testCDSName, func(u ClusterUpdate, err error) { 303 clusterUpdateCh.Send(clusterUpdateErr{u: u, err: err}) 304 }) 305 if _, err := apiClient.addWatches[ClusterResource].Receive(ctx); err != nil { 306 t.Fatalf("want new watch to start, got error %v", err) 307 } 308 309 u, err := clusterUpdateCh.Receive(ctx) 310 if err != nil { 311 t.Fatalf("timeout when waiting for cluster update: %v", err) 312 } 313 gotUpdate := u.(clusterUpdateErr) 314 if gotUpdate.err == nil || !cmp.Equal(gotUpdate.u, ClusterUpdate{}) { 315 t.Fatalf("unexpected clusterUpdate: (%v, %v), want: (ClusterUpdate{}, nil)", gotUpdate.u, gotUpdate.err) 316 } 317} 318 319// TestClusterWatchExpiryTimerStop tests the case where the client does receive 320// an CDS response for the request that it sends out. We want no error even 321// after expiry timeout. 322func (s) TestClusterWatchExpiryTimerStop(t *testing.T) { 323 apiClientCh, cleanup := overrideNewAPIClient() 324 defer cleanup() 325 326 client, err := newWithConfig(clientOpts(testXDSServer, true)) 327 if err != nil { 328 t.Fatalf("failed to create client: %v", err) 329 } 330 defer client.Close() 331 332 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) 333 defer cancel() 334 c, err := apiClientCh.Receive(ctx) 335 if err != nil { 336 t.Fatalf("timeout when waiting for API client to be created: %v", err) 337 } 338 apiClient := c.(*testAPIClient) 339 340 clusterUpdateCh := testutils.NewChannel() 341 client.WatchCluster(testCDSName, func(u ClusterUpdate, err error) { 342 clusterUpdateCh.Send(clusterUpdateErr{u: u, err: err}) 343 }) 344 if _, err := apiClient.addWatches[ClusterResource].Receive(ctx); err != nil { 345 t.Fatalf("want new watch to start, got error %v", err) 346 } 347 348 wantUpdate := ClusterUpdate{ServiceName: testEDSName} 349 client.NewClusters(map[string]ClusterUpdate{ 350 testCDSName: wantUpdate, 351 }, UpdateMetadata{}) 352 if err := verifyClusterUpdate(ctx, clusterUpdateCh, wantUpdate); err != nil { 353 t.Fatal(err) 354 } 355 356 // Wait for an error, the error should never happen. 357 sCtx, sCancel := context.WithTimeout(ctx, defaultTestWatchExpiryTimeout) 358 defer sCancel() 359 if u, err := clusterUpdateCh.Receive(sCtx); err != context.DeadlineExceeded { 360 t.Errorf("unexpected clusterUpdate: %v, %v, want channel recv timeout", u, err) 361 } 362} 363 364// TestClusterResourceRemoved covers the cases: 365// - an update is received after a watch() 366// - another update is received, with one resource removed 367// - this should trigger callback with resource removed error 368// - one more update without the removed resource 369// - the callback (above) shouldn't receive any update 370func (s) TestClusterResourceRemoved(t *testing.T) { 371 apiClientCh, cleanup := overrideNewAPIClient() 372 defer cleanup() 373 374 client, err := newWithConfig(clientOpts(testXDSServer, false)) 375 if err != nil { 376 t.Fatalf("failed to create client: %v", err) 377 } 378 defer client.Close() 379 380 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) 381 defer cancel() 382 c, err := apiClientCh.Receive(ctx) 383 if err != nil { 384 t.Fatalf("timeout when waiting for API client to be created: %v", err) 385 } 386 apiClient := c.(*testAPIClient) 387 388 clusterUpdateCh1 := testutils.NewChannel() 389 client.WatchCluster(testCDSName+"1", func(update ClusterUpdate, err error) { 390 clusterUpdateCh1.Send(clusterUpdateErr{u: update, err: err}) 391 }) 392 if _, err := apiClient.addWatches[ClusterResource].Receive(ctx); err != nil { 393 t.Fatalf("want new watch to start, got error %v", err) 394 } 395 396 // Another watch for a different name. 397 clusterUpdateCh2 := testutils.NewChannel() 398 client.WatchCluster(testCDSName+"2", func(update ClusterUpdate, err error) { 399 clusterUpdateCh2.Send(clusterUpdateErr{u: update, err: err}) 400 }) 401 if _, err := apiClient.addWatches[ClusterResource].Receive(ctx); err != nil { 402 t.Fatalf("want new watch to start, got error %v", err) 403 } 404 405 wantUpdate1 := ClusterUpdate{ServiceName: testEDSName + "1"} 406 wantUpdate2 := ClusterUpdate{ServiceName: testEDSName + "2"} 407 client.NewClusters(map[string]ClusterUpdate{ 408 testCDSName + "1": wantUpdate1, 409 testCDSName + "2": wantUpdate2, 410 }, UpdateMetadata{}) 411 if err := verifyClusterUpdate(ctx, clusterUpdateCh1, wantUpdate1); err != nil { 412 t.Fatal(err) 413 } 414 if err := verifyClusterUpdate(ctx, clusterUpdateCh2, wantUpdate2); err != nil { 415 t.Fatal(err) 416 } 417 418 // Send another update to remove resource 1. 419 client.NewClusters(map[string]ClusterUpdate{testCDSName + "2": wantUpdate2}, UpdateMetadata{}) 420 421 // Watcher 1 should get an error. 422 if u, err := clusterUpdateCh1.Receive(ctx); err != nil || ErrType(u.(clusterUpdateErr).err) != ErrorTypeResourceNotFound { 423 t.Errorf("unexpected clusterUpdate: %v, error receiving from channel: %v, want update with error resource not found", u, err) 424 } 425 426 // Watcher 2 should get the same update again. 427 if err := verifyClusterUpdate(ctx, clusterUpdateCh2, wantUpdate2); err != nil { 428 t.Fatal(err) 429 } 430 431 // Send one more update without resource 1. 432 client.NewClusters(map[string]ClusterUpdate{testCDSName + "2": wantUpdate2}, UpdateMetadata{}) 433 434 // Watcher 1 should not see an update. 435 sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) 436 defer sCancel() 437 if u, err := clusterUpdateCh1.Receive(sCtx); err != context.DeadlineExceeded { 438 t.Errorf("unexpected clusterUpdate: %v, %v, want channel recv timeout", u, err) 439 } 440 441 // Watcher 2 should get the same update again. 442 if err := verifyClusterUpdate(ctx, clusterUpdateCh2, wantUpdate2); err != nil { 443 t.Fatal(err) 444 } 445} 446