1/* 2 * 3 * Copyright 2020 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package client 20 21import ( 22 "testing" 23 24 "google.golang.org/grpc/xds/internal/testutils" 25) 26 27type clusterUpdateErr struct { 28 u ClusterUpdate 29 err error 30} 31 32// TestClusterWatch covers the cases: 33// - an update is received after a watch() 34// - an update for another resource name 35// - an update is received after cancel() 36func (s) TestClusterWatch(t *testing.T) { 37 v2ClientCh, cleanup := overrideNewAPIClient() 38 defer cleanup() 39 40 c, err := New(clientOpts(testXDSServer, false)) 41 if err != nil { 42 t.Fatalf("failed to create client: %v", err) 43 } 44 defer c.Close() 45 46 // TODO: add a timeout to this recv. 47 // Note that this won't be necessary if we finish the TODO below to call 48 // Client directly instead of v2Client.r. 49 v2Client := <-v2ClientCh 50 51 clusterUpdateCh := testutils.NewChannel() 52 cancelWatch := c.WatchCluster(testCDSName, func(update ClusterUpdate, err error) { 53 clusterUpdateCh.Send(clusterUpdateErr{u: update, err: err}) 54 }) 55 if _, err := v2Client.addWatches[ClusterResource].Receive(); err != nil { 56 t.Fatalf("want new watch to start, got error %v", err) 57 } 58 59 wantUpdate := ClusterUpdate{ServiceName: testEDSName} 60 // This is calling v2Client.r to send the update, but r is set to Client, so 61 // this is same as calling Client to update. The one thing this covers is 62 // that `NewXDSV2Client` is called with the right parent. 63 // 64 // TODO: in a future cleanup, this (and the same thing in other tests) can 65 // be changed call Client directly. 66 v2Client.r.NewClusters(map[string]ClusterUpdate{ 67 testCDSName: wantUpdate, 68 }) 69 70 if u, err := clusterUpdateCh.Receive(); err != nil || u != (clusterUpdateErr{wantUpdate, nil}) { 71 t.Errorf("unexpected clusterUpdate: %v, error receiving from channel: %v", u, err) 72 } 73 74 // Another update, with an extra resource for a different resource name. 75 v2Client.r.NewClusters(map[string]ClusterUpdate{ 76 testCDSName: wantUpdate, 77 "randomName": {}, 78 }) 79 80 if u, err := clusterUpdateCh.Receive(); err != nil || u != (clusterUpdateErr{wantUpdate, nil}) { 81 t.Errorf("unexpected clusterUpdate: %+v, %v, want channel recv timeout", u, err) 82 } 83 84 // Cancel watch, and send update again. 85 cancelWatch() 86 v2Client.r.NewClusters(map[string]ClusterUpdate{ 87 testCDSName: wantUpdate, 88 }) 89 90 if u, err := clusterUpdateCh.TimedReceive(chanRecvTimeout); err != testutils.ErrRecvTimeout { 91 t.Errorf("unexpected clusterUpdate: %v, %v, want channel recv timeout", u, err) 92 } 93} 94 95// TestClusterTwoWatchSameResourceName covers the case where an update is received 96// after two watch() for the same resource name. 97func (s) TestClusterTwoWatchSameResourceName(t *testing.T) { 98 v2ClientCh, cleanup := overrideNewAPIClient() 99 defer cleanup() 100 101 c, err := New(clientOpts(testXDSServer, false)) 102 if err != nil { 103 t.Fatalf("failed to create client: %v", err) 104 } 105 defer c.Close() 106 107 v2Client := <-v2ClientCh 108 109 var clusterUpdateChs []*testutils.Channel 110 var cancelLastWatch func() 111 112 const count = 2 113 for i := 0; i < count; i++ { 114 clusterUpdateCh := testutils.NewChannel() 115 clusterUpdateChs = append(clusterUpdateChs, clusterUpdateCh) 116 cancelLastWatch = c.WatchCluster(testCDSName, func(update ClusterUpdate, err error) { 117 clusterUpdateCh.Send(clusterUpdateErr{u: update, err: err}) 118 }) 119 if _, err := v2Client.addWatches[ClusterResource].Receive(); i == 0 && err != nil { 120 t.Fatalf("want new watch to start, got error %v", err) 121 } 122 } 123 124 wantUpdate := ClusterUpdate{ServiceName: testEDSName} 125 v2Client.r.NewClusters(map[string]ClusterUpdate{ 126 testCDSName: wantUpdate, 127 }) 128 129 for i := 0; i < count; i++ { 130 if u, err := clusterUpdateChs[i].Receive(); err != nil || u != (clusterUpdateErr{wantUpdate, nil}) { 131 t.Errorf("i=%v, unexpected clusterUpdate: %v, error receiving from channel: %v", i, u, err) 132 } 133 } 134 135 // Cancel the last watch, and send update again. 136 cancelLastWatch() 137 v2Client.r.NewClusters(map[string]ClusterUpdate{ 138 testCDSName: wantUpdate, 139 }) 140 141 for i := 0; i < count-1; i++ { 142 if u, err := clusterUpdateChs[i].Receive(); err != nil || u != (clusterUpdateErr{wantUpdate, nil}) { 143 t.Errorf("i=%v, unexpected clusterUpdate: %v, error receiving from channel: %v", i, u, err) 144 } 145 } 146 147 if u, err := clusterUpdateChs[count-1].TimedReceive(chanRecvTimeout); err != testutils.ErrRecvTimeout { 148 t.Errorf("unexpected clusterUpdate: %v, %v, want channel recv timeout", u, err) 149 } 150} 151 152// TestClusterThreeWatchDifferentResourceName covers the case where an update is 153// received after three watch() for different resource names. 154func (s) TestClusterThreeWatchDifferentResourceName(t *testing.T) { 155 v2ClientCh, cleanup := overrideNewAPIClient() 156 defer cleanup() 157 158 c, err := New(clientOpts(testXDSServer, false)) 159 if err != nil { 160 t.Fatalf("failed to create client: %v", err) 161 } 162 defer c.Close() 163 164 v2Client := <-v2ClientCh 165 166 var clusterUpdateChs []*testutils.Channel 167 const count = 2 168 169 // Two watches for the same name. 170 for i := 0; i < count; i++ { 171 clusterUpdateCh := testutils.NewChannel() 172 clusterUpdateChs = append(clusterUpdateChs, clusterUpdateCh) 173 c.WatchCluster(testCDSName+"1", func(update ClusterUpdate, err error) { 174 clusterUpdateCh.Send(clusterUpdateErr{u: update, err: err}) 175 }) 176 if _, err := v2Client.addWatches[ClusterResource].Receive(); i == 0 && err != nil { 177 t.Fatalf("want new watch to start, got error %v", err) 178 } 179 } 180 181 // Third watch for a different name. 182 clusterUpdateCh2 := testutils.NewChannel() 183 c.WatchCluster(testCDSName+"2", func(update ClusterUpdate, err error) { 184 clusterUpdateCh2.Send(clusterUpdateErr{u: update, err: err}) 185 }) 186 if _, err := v2Client.addWatches[ClusterResource].Receive(); err != nil { 187 t.Fatalf("want new watch to start, got error %v", err) 188 } 189 190 wantUpdate1 := ClusterUpdate{ServiceName: testEDSName + "1"} 191 wantUpdate2 := ClusterUpdate{ServiceName: testEDSName + "2"} 192 v2Client.r.NewClusters(map[string]ClusterUpdate{ 193 testCDSName + "1": wantUpdate1, 194 testCDSName + "2": wantUpdate2, 195 }) 196 197 for i := 0; i < count; i++ { 198 if u, err := clusterUpdateChs[i].Receive(); err != nil || u != (clusterUpdateErr{wantUpdate1, nil}) { 199 t.Errorf("i=%v, unexpected clusterUpdate: %v, error receiving from channel: %v", i, u, err) 200 } 201 } 202 203 if u, err := clusterUpdateCh2.Receive(); err != nil || u != (clusterUpdateErr{wantUpdate2, nil}) { 204 t.Errorf("unexpected clusterUpdate: %v, error receiving from channel: %v", u, err) 205 } 206} 207 208// TestClusterWatchAfterCache covers the case where watch is called after the update 209// is in cache. 210func (s) TestClusterWatchAfterCache(t *testing.T) { 211 v2ClientCh, cleanup := overrideNewAPIClient() 212 defer cleanup() 213 214 c, err := New(clientOpts(testXDSServer, false)) 215 if err != nil { 216 t.Fatalf("failed to create client: %v", err) 217 } 218 defer c.Close() 219 220 v2Client := <-v2ClientCh 221 222 clusterUpdateCh := testutils.NewChannel() 223 c.WatchCluster(testCDSName, func(update ClusterUpdate, err error) { 224 clusterUpdateCh.Send(clusterUpdateErr{u: update, err: err}) 225 }) 226 if _, err := v2Client.addWatches[ClusterResource].Receive(); err != nil { 227 t.Fatalf("want new watch to start, got error %v", err) 228 } 229 230 wantUpdate := ClusterUpdate{ServiceName: testEDSName} 231 v2Client.r.NewClusters(map[string]ClusterUpdate{ 232 testCDSName: wantUpdate, 233 }) 234 235 if u, err := clusterUpdateCh.Receive(); err != nil || u != (clusterUpdateErr{wantUpdate, nil}) { 236 t.Errorf("unexpected clusterUpdate: %v, error receiving from channel: %v", u, err) 237 } 238 239 // Another watch for the resource in cache. 240 clusterUpdateCh2 := testutils.NewChannel() 241 c.WatchCluster(testCDSName, func(update ClusterUpdate, err error) { 242 clusterUpdateCh2.Send(clusterUpdateErr{u: update, err: err}) 243 }) 244 if n, err := v2Client.addWatches[ClusterResource].Receive(); err == nil { 245 t.Fatalf("want no new watch to start (recv timeout), got resource name: %v error %v", n, err) 246 } 247 248 // New watch should receives the update. 249 if u, err := clusterUpdateCh2.Receive(); err != nil || u != (clusterUpdateErr{wantUpdate, nil}) { 250 t.Errorf("unexpected clusterUpdate: %v, error receiving from channel: %v", u, err) 251 } 252 253 // Old watch should see nothing. 254 if u, err := clusterUpdateCh.TimedReceive(chanRecvTimeout); err != testutils.ErrRecvTimeout { 255 t.Errorf("unexpected clusterUpdate: %v, %v, want channel recv timeout", u, err) 256 } 257} 258 259// TestClusterWatchExpiryTimer tests the case where the client does not receive 260// an CDS response for the request that it sends out. We want the watch callback 261// to be invoked with an error once the watchExpiryTimer fires. 262func (s) TestClusterWatchExpiryTimer(t *testing.T) { 263 v2ClientCh, cleanup := overrideNewAPIClient() 264 defer cleanup() 265 266 c, err := New(clientOpts(testXDSServer, true)) 267 if err != nil { 268 t.Fatalf("failed to create client: %v", err) 269 } 270 defer c.Close() 271 272 v2Client := <-v2ClientCh 273 274 clusterUpdateCh := testutils.NewChannel() 275 c.WatchCluster(testCDSName, func(u ClusterUpdate, err error) { 276 clusterUpdateCh.Send(clusterUpdateErr{u: u, err: err}) 277 }) 278 if _, err := v2Client.addWatches[ClusterResource].Receive(); err != nil { 279 t.Fatalf("want new watch to start, got error %v", err) 280 } 281 282 u, err := clusterUpdateCh.TimedReceive(defaultTestWatchExpiryTimeout * 2) 283 if err != nil { 284 t.Fatalf("failed to get clusterUpdate: %v", err) 285 } 286 uu := u.(clusterUpdateErr) 287 if uu.u != (ClusterUpdate{}) { 288 t.Errorf("unexpected clusterUpdate: %v, want %v", uu.u, ClusterUpdate{}) 289 } 290 if uu.err == nil { 291 t.Errorf("unexpected clusterError: <nil>, want error watcher timeout") 292 } 293} 294 295// TestClusterWatchExpiryTimerStop tests the case where the client does receive 296// an CDS response for the request that it sends out. We want no error even 297// after expiry timeout. 298func (s) TestClusterWatchExpiryTimerStop(t *testing.T) { 299 v2ClientCh, cleanup := overrideNewAPIClient() 300 defer cleanup() 301 302 c, err := New(clientOpts(testXDSServer, true)) 303 if err != nil { 304 t.Fatalf("failed to create client: %v", err) 305 } 306 defer c.Close() 307 308 v2Client := <-v2ClientCh 309 310 clusterUpdateCh := testutils.NewChannel() 311 c.WatchCluster(testCDSName, func(u ClusterUpdate, err error) { 312 clusterUpdateCh.Send(clusterUpdateErr{u: u, err: err}) 313 }) 314 if _, err := v2Client.addWatches[ClusterResource].Receive(); err != nil { 315 t.Fatalf("want new watch to start, got error %v", err) 316 } 317 318 wantUpdate := ClusterUpdate{ServiceName: testEDSName} 319 v2Client.r.NewClusters(map[string]ClusterUpdate{ 320 testCDSName: wantUpdate, 321 }) 322 323 if u, err := clusterUpdateCh.Receive(); err != nil || u != (clusterUpdateErr{wantUpdate, nil}) { 324 t.Errorf("unexpected clusterUpdate: %v, error receiving from channel: %v", u, err) 325 } 326 327 // Wait for an error, the error should never happen. 328 u, err := clusterUpdateCh.TimedReceive(defaultTestWatchExpiryTimeout * 2) 329 if err != testutils.ErrRecvTimeout { 330 t.Fatalf("got unexpected: %v, %v, want recv timeout", u.(clusterUpdateErr).u, u.(clusterUpdateErr).err) 331 } 332} 333 334// TestClusterResourceRemoved covers the cases: 335// - an update is received after a watch() 336// - another update is received, with one resource removed 337// - this should trigger callback with resource removed error 338// - one more update without the removed resource 339// - the callback (above) shouldn't receive any update 340func (s) TestClusterResourceRemoved(t *testing.T) { 341 v2ClientCh, cleanup := overrideNewAPIClient() 342 defer cleanup() 343 344 c, err := New(clientOpts(testXDSServer, false)) 345 if err != nil { 346 t.Fatalf("failed to create client: %v", err) 347 } 348 defer c.Close() 349 350 v2Client := <-v2ClientCh 351 352 clusterUpdateCh1 := testutils.NewChannel() 353 c.WatchCluster(testCDSName+"1", func(update ClusterUpdate, err error) { 354 clusterUpdateCh1.Send(clusterUpdateErr{u: update, err: err}) 355 }) 356 if _, err := v2Client.addWatches[ClusterResource].Receive(); err != nil { 357 t.Fatalf("want new watch to start, got error %v", err) 358 } 359 // Another watch for a different name. 360 clusterUpdateCh2 := testutils.NewChannel() 361 c.WatchCluster(testCDSName+"2", func(update ClusterUpdate, err error) { 362 clusterUpdateCh2.Send(clusterUpdateErr{u: update, err: err}) 363 }) 364 if _, err := v2Client.addWatches[ClusterResource].Receive(); err != nil { 365 t.Fatalf("want new watch to start, got error %v", err) 366 } 367 368 wantUpdate1 := ClusterUpdate{ServiceName: testEDSName + "1"} 369 wantUpdate2 := ClusterUpdate{ServiceName: testEDSName + "2"} 370 v2Client.r.NewClusters(map[string]ClusterUpdate{ 371 testCDSName + "1": wantUpdate1, 372 testCDSName + "2": wantUpdate2, 373 }) 374 375 if u, err := clusterUpdateCh1.Receive(); err != nil || u != (clusterUpdateErr{wantUpdate1, nil}) { 376 t.Errorf("unexpected clusterUpdate: %v, error receiving from channel: %v", u, err) 377 } 378 379 if u, err := clusterUpdateCh2.Receive(); err != nil || u != (clusterUpdateErr{wantUpdate2, nil}) { 380 t.Errorf("unexpected clusterUpdate: %v, error receiving from channel: %v", u, err) 381 } 382 383 // Send another update to remove resource 1. 384 v2Client.r.NewClusters(map[string]ClusterUpdate{ 385 testCDSName + "2": wantUpdate2, 386 }) 387 388 // watcher 1 should get an error. 389 if u, err := clusterUpdateCh1.Receive(); err != nil || ErrType(u.(clusterUpdateErr).err) != ErrorTypeResourceNotFound { 390 t.Errorf("unexpected clusterUpdate: %v, error receiving from channel: %v, want update with error resource not found", u, err) 391 } 392 393 // watcher 2 should get the same update again. 394 if u, err := clusterUpdateCh2.Receive(); err != nil || u != (clusterUpdateErr{wantUpdate2, nil}) { 395 t.Errorf("unexpected clusterUpdate: %v, error receiving from channel: %v", u, err) 396 } 397 398 // Send one more update without resource 1. 399 v2Client.r.NewClusters(map[string]ClusterUpdate{ 400 testCDSName + "2": wantUpdate2, 401 }) 402 403 // watcher 1 should get an error. 404 if u, err := clusterUpdateCh1.Receive(); err != testutils.ErrRecvTimeout { 405 t.Errorf("unexpected clusterUpdate: %v, want receiving from channel timeout", u) 406 } 407 408 // watcher 2 should get the same update again. 409 if u, err := clusterUpdateCh2.Receive(); err != nil || u != (clusterUpdateErr{wantUpdate2, nil}) { 410 t.Errorf("unexpected clusterUpdate: %v, error receiving from channel: %v", u, err) 411 } 412} 413