1/* 2Copyright 2015 The Kubernetes Authors. 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package tests 18 19import ( 20 "context" 21 "fmt" 22 "reflect" 23 goruntime "runtime" 24 "strconv" 25 "sync" 26 "testing" 27 "time" 28 29 apitesting "k8s.io/apimachinery/pkg/api/apitesting" 30 apiequality "k8s.io/apimachinery/pkg/api/equality" 31 "k8s.io/apimachinery/pkg/api/errors" 32 "k8s.io/apimachinery/pkg/api/meta" 33 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" 34 "k8s.io/apimachinery/pkg/fields" 35 "k8s.io/apimachinery/pkg/labels" 36 "k8s.io/apimachinery/pkg/runtime" 37 "k8s.io/apimachinery/pkg/runtime/serializer" 38 "k8s.io/apimachinery/pkg/util/clock" 39 utilruntime "k8s.io/apimachinery/pkg/util/runtime" 40 "k8s.io/apimachinery/pkg/util/sets" 41 "k8s.io/apimachinery/pkg/util/wait" 42 "k8s.io/apimachinery/pkg/watch" 43 "k8s.io/apiserver/pkg/apis/example" 44 examplev1 "k8s.io/apiserver/pkg/apis/example/v1" 45 "k8s.io/apiserver/pkg/features" 46 "k8s.io/apiserver/pkg/storage" 47 cacherstorage "k8s.io/apiserver/pkg/storage/cacher" 48 "k8s.io/apiserver/pkg/storage/etcd3" 49 etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing" 50 storagetesting "k8s.io/apiserver/pkg/storage/testing" 51 "k8s.io/apiserver/pkg/storage/value" 52 utilfeature "k8s.io/apiserver/pkg/util/feature" 53 featuregatetesting "k8s.io/component-base/featuregate/testing" 54) 55 56var ( 57 scheme = runtime.NewScheme() 58 codecs = serializer.NewCodecFactory(scheme) 59) 60 61const ( 62 // watchCacheDefaultCapacity syncs watch cache defaultLowerBoundCapacity. 63 watchCacheDefaultCapacity = 100 64) 65 66func init() { 67 metav1.AddToGroupVersion(scheme, metav1.SchemeGroupVersion) 68 utilruntime.Must(example.AddToScheme(scheme)) 69 utilruntime.Must(examplev1.AddToScheme(scheme)) 70} 71 72// GetAttrs returns labels and fields of a given object for filtering purposes. 73func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, error) { 74 pod, ok := obj.(*example.Pod) 75 if !ok { 76 return nil, nil, fmt.Errorf("not a pod") 77 } 78 return labels.Set(pod.ObjectMeta.Labels), PodToSelectableFields(pod), nil 79} 80 81// PodToSelectableFields returns a field set that represents the object 82// TODO: fields are not labels, and the validation rules for them do not apply. 83func PodToSelectableFields(pod *example.Pod) fields.Set { 84 // The purpose of allocation with a given number of elements is to reduce 85 // amount of allocations needed to create the fields.Set. If you add any 86 // field here or the number of object-meta related fields changes, this should 87 // be adjusted. 88 podSpecificFieldsSet := make(fields.Set, 5) 89 podSpecificFieldsSet["spec.nodeName"] = pod.Spec.NodeName 90 podSpecificFieldsSet["spec.restartPolicy"] = string(pod.Spec.RestartPolicy) 91 podSpecificFieldsSet["status.phase"] = string(pod.Status.Phase) 92 return AddObjectMetaFieldsSet(podSpecificFieldsSet, &pod.ObjectMeta, true) 93} 94 95func AddObjectMetaFieldsSet(source fields.Set, objectMeta *metav1.ObjectMeta, hasNamespaceField bool) fields.Set { 96 source["metadata.name"] = objectMeta.Name 97 if hasNamespaceField { 98 source["metadata.namespace"] = objectMeta.Namespace 99 } 100 return source 101} 102 103func newPod() runtime.Object { return &example.Pod{} } 104func newPodList() runtime.Object { return &example.PodList{} } 105 106func newEtcdTestStorage(t *testing.T, prefix string) (*etcd3testing.EtcdTestServer, storage.Interface) { 107 server, _ := etcd3testing.NewUnsecuredEtcd3TestClientServer(t) 108 storage := etcd3.New(server.V3Client, apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion), newPod, prefix, value.IdentityTransformer, true) 109 return server, storage 110} 111 112func newTestCacher(s storage.Interface) (*cacherstorage.Cacher, storage.Versioner, error) { 113 return newTestCacherWithClock(s, clock.RealClock{}) 114} 115 116func newTestCacherWithClock(s storage.Interface, clock clock.Clock) (*cacherstorage.Cacher, storage.Versioner, error) { 117 prefix := "pods" 118 v := etcd3.APIObjectVersioner{} 119 config := cacherstorage.Config{ 120 Storage: s, 121 Versioner: v, 122 ResourcePrefix: prefix, 123 KeyFunc: func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) }, 124 GetAttrsFunc: GetAttrs, 125 NewFunc: newPod, 126 NewListFunc: newPodList, 127 Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion), 128 Clock: clock, 129 } 130 cacher, err := cacherstorage.NewCacherFromConfig(config) 131 return cacher, v, err 132} 133 134func makeTestPod(name string) *example.Pod { 135 return &example.Pod{ 136 ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: name}, 137 Spec: storagetesting.DeepEqualSafePodSpec(), 138 } 139} 140 141func createPod(s storage.Interface, obj *example.Pod) error { 142 key := "pods/" + obj.Namespace + "/" + obj.Name 143 out := &example.Pod{} 144 return s.Create(context.TODO(), key, obj, out, 0) 145} 146 147func updatePod(t *testing.T, s storage.Interface, obj, old *example.Pod) *example.Pod { 148 updateFn := func(input runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) { 149 return obj.DeepCopyObject(), nil, nil 150 } 151 key := "pods/" + obj.Namespace + "/" + obj.Name 152 if err := s.GuaranteedUpdate(context.TODO(), key, &example.Pod{}, old == nil, nil, updateFn, nil); err != nil { 153 t.Errorf("unexpected error: %v", err) 154 } 155 obj.ResourceVersion = "" 156 result := &example.Pod{} 157 if err := s.Get(context.TODO(), key, storage.GetOptions{}, result); err != nil { 158 t.Errorf("unexpected error: %v", err) 159 } 160 return result 161} 162 163func TestGet(t *testing.T) { 164 server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix()) 165 defer server.Terminate(t) 166 cacher, _, err := newTestCacher(etcdStorage) 167 if err != nil { 168 t.Fatalf("Couldn't create cacher: %v", err) 169 } 170 defer cacher.Stop() 171 172 podFoo := makeTestPod("foo") 173 fooCreated := updatePod(t, etcdStorage, podFoo, nil) 174 175 // We pass the ResourceVersion from the above Create() operation. 176 result := &example.Pod{} 177 if err := cacher.Get(context.TODO(), "pods/ns/foo", storage.GetOptions{IgnoreNotFound: true, ResourceVersion: fooCreated.ResourceVersion}, result); err != nil { 178 t.Errorf("Unexpected error: %v", err) 179 } 180 if e, a := *fooCreated, *result; !reflect.DeepEqual(e, a) { 181 t.Errorf("Expected: %#v, got: %#v", e, a) 182 } 183 184 if err := cacher.Get(context.TODO(), "pods/ns/bar", storage.GetOptions{ResourceVersion: fooCreated.ResourceVersion, IgnoreNotFound: true}, result); err != nil { 185 t.Errorf("Unexpected error: %v", err) 186 } 187 emptyPod := example.Pod{} 188 if e, a := emptyPod, *result; !reflect.DeepEqual(e, a) { 189 t.Errorf("Expected: %#v, got: %#v", e, a) 190 } 191 192 if err := cacher.Get(context.TODO(), "pods/ns/bar", storage.GetOptions{ResourceVersion: fooCreated.ResourceVersion}, result); !storage.IsNotFound(err) { 193 t.Errorf("Unexpected error: %v", err) 194 } 195} 196 197func TestGetToList(t *testing.T) { 198 server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix()) 199 defer server.Terminate(t) 200 cacher, _, err := newTestCacher(etcdStorage) 201 if err != nil { 202 t.Fatalf("Couldn't create cacher: %v", err) 203 } 204 defer cacher.Stop() 205 206 storedObj := updatePod(t, etcdStorage, makeTestPod("foo"), nil) 207 key := "pods/" + storedObj.Namespace + "/" + storedObj.Name 208 209 tests := []struct { 210 key string 211 pred storage.SelectionPredicate 212 expectedOut []*example.Pod 213 }{{ // test GetToList on existing key 214 key: key, 215 pred: storage.Everything, 216 expectedOut: []*example.Pod{storedObj}, 217 }, { // test GetToList on non-existing key 218 key: "/non-existing", 219 pred: storage.Everything, 220 expectedOut: nil, 221 }, { // test GetToList with matching pod name 222 key: "/non-existing", 223 pred: storage.SelectionPredicate{ 224 Label: labels.Everything(), 225 Field: fields.ParseSelectorOrDie("metadata.name!=" + storedObj.Name), 226 GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) { 227 pod := obj.(*example.Pod) 228 return nil, fields.Set{"metadata.name": pod.Name}, nil 229 }, 230 }, 231 expectedOut: nil, 232 }} 233 234 for i, tt := range tests { 235 out := &example.PodList{} 236 err := cacher.GetToList(context.TODO(), tt.key, storage.ListOptions{Predicate: tt.pred}, out) 237 if err != nil { 238 t.Fatalf("GetToList failed: %v", err) 239 } 240 if len(out.ResourceVersion) == 0 { 241 t.Errorf("#%d: unset resourceVersion", i) 242 } 243 if len(out.Items) != len(tt.expectedOut) { 244 t.Errorf("#%d: length of list want=%d, get=%d", i, len(tt.expectedOut), len(out.Items)) 245 continue 246 } 247 for j, wantPod := range tt.expectedOut { 248 getPod := &out.Items[j] 249 if !reflect.DeepEqual(wantPod, getPod) { 250 t.Errorf("#%d: pod want=%#v, get=%#v", i, wantPod, getPod) 251 } 252 } 253 } 254} 255 256func TestList(t *testing.T) { 257 server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix()) 258 defer server.Terminate(t) 259 cacher, _, err := newTestCacher(etcdStorage) 260 if err != nil { 261 t.Fatalf("Couldn't create cacher: %v", err) 262 } 263 defer cacher.Stop() 264 265 podFoo := makeTestPod("foo") 266 podBar := makeTestPod("bar") 267 podBaz := makeTestPod("baz") 268 269 podFooPrime := makeTestPod("foo") 270 podFooPrime.Spec.NodeName = "fakeNode" 271 272 fooCreated := updatePod(t, etcdStorage, podFoo, nil) 273 _ = updatePod(t, etcdStorage, podBar, nil) 274 _ = updatePod(t, etcdStorage, podBaz, nil) 275 276 _ = updatePod(t, etcdStorage, podFooPrime, fooCreated) 277 278 // Create a pod in a namespace that contains "ns" as a prefix 279 // Make sure it is not returned in a watch of "ns" 280 podFooNS2 := makeTestPod("foo") 281 podFooNS2.Namespace += "2" 282 updatePod(t, etcdStorage, podFooNS2, nil) 283 284 deleted := example.Pod{} 285 if err := etcdStorage.Delete(context.TODO(), "pods/ns/bar", &deleted, nil, storage.ValidateAllObjectFunc); err != nil { 286 t.Errorf("Unexpected error: %v", err) 287 } 288 289 // We first List directly from etcd by passing empty resourceVersion, 290 // to get the current etcd resourceVersion. 291 rvResult := &example.PodList{} 292 if err := cacher.List(context.TODO(), "pods/ns", storage.ListOptions{Predicate: storage.Everything}, rvResult); err != nil { 293 t.Errorf("Unexpected error: %v", err) 294 } 295 deletedPodRV := rvResult.ListMeta.ResourceVersion 296 297 result := &example.PodList{} 298 // We pass the current etcd ResourceVersion received from the above List() operation, 299 // since there is not easy way to get ResourceVersion of barPod deletion operation. 300 if err := cacher.List(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: deletedPodRV, Predicate: storage.Everything}, result); err != nil { 301 t.Errorf("Unexpected error: %v", err) 302 } 303 if result.ListMeta.ResourceVersion != deletedPodRV { 304 t.Errorf("Incorrect resource version: %v", result.ListMeta.ResourceVersion) 305 } 306 if len(result.Items) != 2 { 307 t.Errorf("Unexpected list result: %d", len(result.Items)) 308 } 309 keys := sets.String{} 310 for _, item := range result.Items { 311 keys.Insert(item.Name) 312 } 313 if !keys.HasAll("foo", "baz") { 314 t.Errorf("Unexpected list result: %#v", result) 315 } 316 for _, item := range result.Items { 317 // unset fields that are set by the infrastructure 318 item.ResourceVersion = "" 319 item.CreationTimestamp = metav1.Time{} 320 321 if item.Namespace != "ns" { 322 t.Errorf("Unexpected namespace: %s", item.Namespace) 323 } 324 325 var expected *example.Pod 326 switch item.Name { 327 case "foo": 328 expected = podFooPrime 329 case "baz": 330 expected = podBaz 331 default: 332 t.Errorf("Unexpected item: %v", item) 333 } 334 if e, a := *expected, item; !reflect.DeepEqual(e, a) { 335 t.Errorf("Expected: %#v, got: %#v", e, a) 336 } 337 } 338} 339 340// TestTooLargeResourceVersionList ensures that a list request for a resource version higher than available 341// in the watch cache completes (does not wait indefinitely) and results in a ResourceVersionTooLarge error. 342func TestTooLargeResourceVersionList(t *testing.T) { 343 server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix()) 344 defer server.Terminate(t) 345 cacher, v, err := newTestCacher(etcdStorage) 346 if err != nil { 347 t.Fatalf("Couldn't create cacher: %v", err) 348 } 349 defer cacher.Stop() 350 351 podFoo := makeTestPod("foo") 352 fooCreated := updatePod(t, etcdStorage, podFoo, nil) 353 354 // Set up List at fooCreated.ResourceVersion + 10 355 rv, err := v.ParseResourceVersion(fooCreated.ResourceVersion) 356 if err != nil { 357 t.Fatalf("Unexpected error: %v", err) 358 } 359 listRV := strconv.Itoa(int(rv + 10)) 360 361 result := &example.PodList{} 362 err = cacher.List(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: listRV, Predicate: storage.Everything}, result) 363 if !errors.IsTimeout(err) { 364 t.Errorf("Unexpected error: %v", err) 365 } 366 if !storage.IsTooLargeResourceVersion(err) { 367 t.Errorf("expected 'Too large resource version' cause in error but got: %v", err) 368 } 369} 370 371func verifyWatchEvent(t *testing.T, w watch.Interface, eventType watch.EventType, eventObject runtime.Object) { 372 _, _, line, _ := goruntime.Caller(1) 373 select { 374 case event := <-w.ResultChan(): 375 if e, a := eventType, event.Type; e != a { 376 t.Logf("(called from line %d)", line) 377 t.Errorf("Expected: %s, got: %s", eventType, event.Type) 378 } 379 object := event.Object 380 if co, ok := object.(runtime.CacheableObject); ok { 381 object = co.GetObject() 382 } 383 if e, a := eventObject, object; !apiequality.Semantic.DeepDerivative(e, a) { 384 t.Logf("(called from line %d)", line) 385 t.Errorf("Expected (%s): %#v, got: %#v", eventType, e, a) 386 } 387 case <-time.After(wait.ForeverTestTimeout): 388 t.Logf("(called from line %d)", line) 389 t.Errorf("Timed out waiting for an event") 390 } 391} 392 393type injectListError struct { 394 errors int 395 storage.Interface 396} 397 398func (self *injectListError) List(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error { 399 if self.errors > 0 { 400 self.errors-- 401 return fmt.Errorf("injected error") 402 } 403 return self.Interface.List(ctx, key, opts, listObj) 404} 405 406func TestWatch(t *testing.T) { 407 server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix()) 408 // Inject one list error to make sure we test the relist case. 409 etcdStorage = &injectListError{errors: 1, Interface: etcdStorage} 410 defer server.Terminate(t) 411 fakeClock := clock.NewFakeClock(time.Now()) 412 cacher, _, err := newTestCacherWithClock(etcdStorage, fakeClock) 413 if err != nil { 414 t.Fatalf("Couldn't create cacher: %v", err) 415 } 416 defer cacher.Stop() 417 418 podFoo := makeTestPod("foo") 419 podBar := makeTestPod("bar") 420 421 podFooPrime := makeTestPod("foo") 422 podFooPrime.Spec.NodeName = "fakeNode" 423 424 podFooBis := makeTestPod("foo") 425 podFooBis.Spec.NodeName = "anotherFakeNode" 426 427 podFooNS2 := makeTestPod("foo") 428 podFooNS2.Namespace += "2" 429 430 // initialVersion is used to initate the watcher at the beginning of the world, 431 // which is not defined precisely in etcd. 432 initialVersion, err := cacher.LastSyncResourceVersion() 433 if err != nil { 434 t.Fatalf("Unexpected error: %v", err) 435 } 436 startVersion := strconv.Itoa(int(initialVersion)) 437 438 // Set up Watch for object "podFoo". 439 watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", storage.ListOptions{ResourceVersion: startVersion, Predicate: storage.Everything}) 440 if err != nil { 441 t.Fatalf("Unexpected error: %v", err) 442 } 443 defer watcher.Stop() 444 445 // Create in another namespace first to make sure events from other namespaces don't get delivered 446 updatePod(t, etcdStorage, podFooNS2, nil) 447 448 fooCreated := updatePod(t, etcdStorage, podFoo, nil) 449 _ = updatePod(t, etcdStorage, podBar, nil) 450 fooUpdated := updatePod(t, etcdStorage, podFooPrime, fooCreated) 451 452 verifyWatchEvent(t, watcher, watch.Added, podFoo) 453 verifyWatchEvent(t, watcher, watch.Modified, podFooPrime) 454 455 initialWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", storage.ListOptions{ResourceVersion: fooCreated.ResourceVersion, Predicate: storage.Everything}) 456 if err != nil { 457 t.Fatalf("Unexpected error: %v", err) 458 } 459 defer initialWatcher.Stop() 460 461 verifyWatchEvent(t, initialWatcher, watch.Modified, podFooPrime) 462 463 // Now test watch from "now". 464 nowWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything}) 465 if err != nil { 466 t.Fatalf("Unexpected error: %v", err) 467 } 468 defer nowWatcher.Stop() 469 470 verifyWatchEvent(t, nowWatcher, watch.Added, podFooPrime) 471 472 _ = updatePod(t, etcdStorage, podFooBis, fooUpdated) 473 474 verifyWatchEvent(t, nowWatcher, watch.Modified, podFooBis) 475 476 // Add watchCacheDefaultCapacity events to make current watch cache full. 477 // Make start and last event duration exceed eventFreshDuration(current 75s) to ensure watch cache won't expand. 478 for i := 0; i < watchCacheDefaultCapacity; i++ { 479 fakeClock.SetTime(time.Now().Add(time.Duration(i) * time.Minute)) 480 podFoo := makeTestPod(fmt.Sprintf("foo-%d", i)) 481 updatePod(t, etcdStorage, podFoo, nil) 482 } 483 484 // Check whether we get too-old error via the watch channel 485 tooOldWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", storage.ListOptions{ResourceVersion: "1", Predicate: storage.Everything}) 486 if err != nil { 487 t.Fatalf("Expected no direct error, got %v", err) 488 } 489 defer tooOldWatcher.Stop() 490 491 // Ensure we get a "Gone" error. 492 expectedResourceExpiredError := errors.NewResourceExpired("").ErrStatus 493 verifyWatchEvent(t, tooOldWatcher, watch.Error, &expectedResourceExpiredError) 494} 495 496func TestWatcherTimeout(t *testing.T) { 497 server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix()) 498 defer server.Terminate(t) 499 cacher, _, err := newTestCacher(etcdStorage) 500 if err != nil { 501 t.Fatalf("Couldn't create cacher: %v", err) 502 } 503 defer cacher.Stop() 504 505 // initialVersion is used to initate the watcher at the beginning of the world, 506 // which is not defined precisely in etcd. 507 initialVersion, err := cacher.LastSyncResourceVersion() 508 if err != nil { 509 t.Fatalf("Unexpected error: %v", err) 510 } 511 startVersion := strconv.Itoa(int(initialVersion)) 512 513 // Create a number of watchers that will not be reading any result. 514 nonReadingWatchers := 50 515 for i := 0; i < nonReadingWatchers; i++ { 516 watcher, err := cacher.WatchList(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: startVersion, Predicate: storage.Everything}) 517 if err != nil { 518 t.Fatalf("Unexpected error: %v", err) 519 } 520 defer watcher.Stop() 521 } 522 523 // Create a second watcher that will be reading result. 524 readingWatcher, err := cacher.WatchList(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: startVersion, Predicate: storage.Everything}) 525 if err != nil { 526 t.Fatalf("Unexpected error: %v", err) 527 } 528 defer readingWatcher.Stop() 529 530 startTime := time.Now() 531 for i := 1; i <= 22; i++ { 532 pod := makeTestPod(strconv.Itoa(i)) 533 _ = updatePod(t, etcdStorage, pod, nil) 534 verifyWatchEvent(t, readingWatcher, watch.Added, pod) 535 } 536 if time.Since(startTime) > time.Duration(250*nonReadingWatchers)*time.Millisecond { 537 t.Errorf("waiting for events took too long: %v", time.Since(startTime)) 538 } 539} 540 541func TestFiltering(t *testing.T) { 542 server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix()) 543 defer server.Terminate(t) 544 cacher, _, err := newTestCacher(etcdStorage) 545 if err != nil { 546 t.Fatalf("Couldn't create cacher: %v", err) 547 } 548 defer cacher.Stop() 549 550 // Ensure that the cacher is initialized, before creating any pods, 551 // so that we are sure that all events will be present in cacher. 552 syncWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything}) 553 if err != nil { 554 t.Fatalf("Unexpected error: %v", err) 555 } 556 syncWatcher.Stop() 557 558 podFoo := makeTestPod("foo") 559 podFoo.Labels = map[string]string{"filter": "foo"} 560 podFooFiltered := makeTestPod("foo") 561 podFooPrime := makeTestPod("foo") 562 podFooPrime.Labels = map[string]string{"filter": "foo"} 563 podFooPrime.Spec.NodeName = "fakeNode" 564 565 podFooNS2 := makeTestPod("foo") 566 podFooNS2.Namespace += "2" 567 podFooNS2.Labels = map[string]string{"filter": "foo"} 568 569 // Create in another namespace first to make sure events from other namespaces don't get delivered 570 updatePod(t, etcdStorage, podFooNS2, nil) 571 572 fooCreated := updatePod(t, etcdStorage, podFoo, nil) 573 fooFiltered := updatePod(t, etcdStorage, podFooFiltered, fooCreated) 574 fooUnfiltered := updatePod(t, etcdStorage, podFoo, fooFiltered) 575 _ = updatePod(t, etcdStorage, podFooPrime, fooUnfiltered) 576 577 deleted := example.Pod{} 578 if err := etcdStorage.Delete(context.TODO(), "pods/ns/foo", &deleted, nil, storage.ValidateAllObjectFunc); err != nil { 579 t.Errorf("Unexpected error: %v", err) 580 } 581 582 // Set up Watch for object "podFoo" with label filter set. 583 pred := storage.SelectionPredicate{ 584 Label: labels.SelectorFromSet(labels.Set{"filter": "foo"}), 585 Field: fields.Everything(), 586 GetAttrs: func(obj runtime.Object) (label labels.Set, field fields.Set, err error) { 587 metadata, err := meta.Accessor(obj) 588 if err != nil { 589 t.Fatalf("Unexpected error: %v", err) 590 } 591 return labels.Set(metadata.GetLabels()), nil, nil 592 }, 593 } 594 watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", storage.ListOptions{ResourceVersion: fooCreated.ResourceVersion, Predicate: pred}) 595 if err != nil { 596 t.Fatalf("Unexpected error: %v", err) 597 } 598 defer watcher.Stop() 599 600 verifyWatchEvent(t, watcher, watch.Deleted, podFooFiltered) 601 verifyWatchEvent(t, watcher, watch.Added, podFoo) 602 verifyWatchEvent(t, watcher, watch.Modified, podFooPrime) 603 verifyWatchEvent(t, watcher, watch.Deleted, podFooPrime) 604} 605 606func TestEmptyWatchEventCache(t *testing.T) { 607 server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix()) 608 defer server.Terminate(t) 609 610 // add a few objects 611 updatePod(t, etcdStorage, makeTestPod("pod1"), nil) 612 updatePod(t, etcdStorage, makeTestPod("pod2"), nil) 613 updatePod(t, etcdStorage, makeTestPod("pod3"), nil) 614 updatePod(t, etcdStorage, makeTestPod("pod4"), nil) 615 updatePod(t, etcdStorage, makeTestPod("pod5"), nil) 616 617 fooCreated := updatePod(t, etcdStorage, makeTestPod("foo"), nil) 618 619 cacher, v, err := newTestCacher(etcdStorage) 620 if err != nil { 621 t.Fatalf("Couldn't create cacher: %v", err) 622 } 623 defer cacher.Stop() 624 625 // get rv of last pod created 626 rv, err := v.ParseResourceVersion(fooCreated.ResourceVersion) 627 if err != nil { 628 t.Fatalf("Unexpected error: %v", err) 629 } 630 631 // We now have a cacher with an empty cache of watch events and a resourceVersion of rv. 632 // It should support establishing watches from rv and higher, but not older. 633 634 { 635 watcher, err := cacher.Watch(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: strconv.Itoa(int(rv - 1)), Predicate: storage.Everything}) 636 if err != nil { 637 t.Fatalf("Unexpected error: %v", err) 638 } 639 defer watcher.Stop() 640 expectedResourceExpiredError := errors.NewResourceExpired("").ErrStatus 641 verifyWatchEvent(t, watcher, watch.Error, &expectedResourceExpiredError) 642 } 643 644 { 645 watcher, err := cacher.Watch(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: strconv.Itoa(int(rv + 1)), Predicate: storage.Everything}) 646 if err != nil { 647 t.Fatalf("Unexpected error: %v", err) 648 } 649 defer watcher.Stop() 650 select { 651 case e := <-watcher.ResultChan(): 652 t.Errorf("unexpected event %#v", e) 653 case <-time.After(3 * time.Second): 654 // watch from rv+1 remained established successfully 655 } 656 } 657 658 { 659 watcher, err := cacher.Watch(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: strconv.Itoa(int(rv)), Predicate: storage.Everything}) 660 if err != nil { 661 t.Fatalf("Unexpected error: %v", err) 662 } 663 defer watcher.Stop() 664 select { 665 case e := <-watcher.ResultChan(): 666 t.Errorf("unexpected event %#v", e) 667 case <-time.After(3 * time.Second): 668 // watch from rv remained established successfully 669 } 670 } 671} 672 673func TestRandomWatchDeliver(t *testing.T) { 674 server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix()) 675 defer server.Terminate(t) 676 cacher, v, err := newTestCacher(etcdStorage) 677 if err != nil { 678 t.Fatalf("Couldn't create cacher: %v", err) 679 } 680 defer cacher.Stop() 681 682 fooCreated := updatePod(t, etcdStorage, makeTestPod("foo"), nil) 683 rv, err := v.ParseResourceVersion(fooCreated.ResourceVersion) 684 if err != nil { 685 t.Fatalf("Unexpected error: %v", err) 686 } 687 startVersion := strconv.Itoa(int(rv)) 688 689 watcher, err := cacher.WatchList(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: startVersion, Predicate: storage.Everything}) 690 if err != nil { 691 t.Fatalf("Unexpected error: %v", err) 692 } 693 694 // Now we can create exactly 21 events that should be delivered 695 // to the watcher, before it will completely block cacher and as 696 // a result will be dropped. 697 for i := 0; i < 21; i++ { 698 updatePod(t, etcdStorage, makeTestPod(fmt.Sprintf("foo-%d", i)), nil) 699 } 700 701 // Now stop the watcher and check if the consecutive events are being delivered. 702 watcher.Stop() 703 704 watched := 0 705 for { 706 event, ok := <-watcher.ResultChan() 707 if !ok { 708 break 709 } 710 object := event.Object 711 if co, ok := object.(runtime.CacheableObject); ok { 712 object = co.GetObject() 713 } 714 if a, e := object.(*example.Pod).Name, fmt.Sprintf("foo-%d", watched); e != a { 715 t.Errorf("Unexpected object watched: %s, expected %s", a, e) 716 } 717 watched++ 718 } 719} 720 721func TestCacherListerWatcher(t *testing.T) { 722 prefix := "pods" 723 fn := func() runtime.Object { return &example.PodList{} } 724 server, store := newEtcdTestStorage(t, prefix) 725 defer server.Terminate(t) 726 727 podFoo := makeTestPod("foo") 728 podBar := makeTestPod("bar") 729 podBaz := makeTestPod("baz") 730 731 _ = updatePod(t, store, podFoo, nil) 732 _ = updatePod(t, store, podBar, nil) 733 _ = updatePod(t, store, podBaz, nil) 734 735 lw := cacherstorage.NewCacherListerWatcher(store, prefix, fn) 736 737 obj, err := lw.List(metav1.ListOptions{}) 738 if err != nil { 739 t.Fatalf("List failed: %v", err) 740 } 741 pl, ok := obj.(*example.PodList) 742 if !ok { 743 t.Fatalf("Expected PodList but got %v", pl) 744 } 745 if len(pl.Items) != 3 { 746 t.Errorf("Expected PodList of length 3 but got %d", len(pl.Items)) 747 } 748} 749 750func TestCacherListerWatcherPagination(t *testing.T) { 751 prefix := "pods" 752 fn := func() runtime.Object { return &example.PodList{} } 753 server, store := newEtcdTestStorage(t, prefix) 754 defer server.Terminate(t) 755 756 podFoo := makeTestPod("foo") 757 podBar := makeTestPod("bar") 758 podBaz := makeTestPod("baz") 759 760 _ = updatePod(t, store, podFoo, nil) 761 _ = updatePod(t, store, podBar, nil) 762 _ = updatePod(t, store, podBaz, nil) 763 764 lw := cacherstorage.NewCacherListerWatcher(store, prefix, fn) 765 766 obj1, err := lw.List(metav1.ListOptions{Limit: 2}) 767 if err != nil { 768 t.Fatalf("List failed: %v", err) 769 } 770 limit1, ok := obj1.(*example.PodList) 771 if !ok { 772 t.Fatalf("Expected PodList but got %v", limit1) 773 } 774 if len(limit1.Items) != 2 { 775 t.Errorf("Expected PodList of length 2 but got %d", len(limit1.Items)) 776 } 777 if limit1.Continue == "" { 778 t.Errorf("Expected list to have Continue but got none") 779 } 780 obj2, err := lw.List(metav1.ListOptions{Limit: 2, Continue: limit1.Continue}) 781 if err != nil { 782 t.Fatalf("List failed: %v", err) 783 } 784 limit2, ok := obj2.(*example.PodList) 785 if !ok { 786 t.Fatalf("Expected PodList but got %v", limit2) 787 } 788 if limit2.Continue != "" { 789 t.Errorf("Expected list not to have Continue, but got %s", limit1.Continue) 790 } 791 792 if limit1.Items[0].Name != podBar.Name { 793 t.Errorf("Expected list1.Items[0] to be %s but got %s", podBar.Name, limit1.Items[0].Name) 794 } 795 if limit1.Items[1].Name != podBaz.Name { 796 t.Errorf("Expected list1.Items[1] to be %s but got %s", podBaz.Name, limit1.Items[1].Name) 797 } 798 if limit2.Items[0].Name != podFoo.Name { 799 t.Errorf("Expected list2.Items[0] to be %s but got %s", podFoo.Name, limit2.Items[0].Name) 800 } 801 802} 803 804func TestWatchDispatchBookmarkEvents(t *testing.T) { 805 defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchBookmark, true)() 806 807 server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix()) 808 defer server.Terminate(t) 809 cacher, v, err := newTestCacher(etcdStorage) 810 if err != nil { 811 t.Fatalf("Couldn't create cacher: %v", err) 812 } 813 defer cacher.Stop() 814 815 fooCreated := updatePod(t, etcdStorage, makeTestPod("foo"), nil) 816 rv, err := v.ParseResourceVersion(fooCreated.ResourceVersion) 817 if err != nil { 818 t.Fatalf("Unexpected error: %v", err) 819 } 820 startVersion := strconv.Itoa(int(rv)) 821 822 tests := []struct { 823 timeout time.Duration 824 expected bool 825 allowWatchBookmark bool 826 }{ 827 { // test old client won't get Bookmark event 828 timeout: 3 * time.Second, 829 expected: false, 830 allowWatchBookmark: false, 831 }, 832 { 833 timeout: 3 * time.Second, 834 expected: true, 835 allowWatchBookmark: true, 836 }, 837 } 838 839 for i, c := range tests { 840 pred := storage.Everything 841 pred.AllowWatchBookmarks = c.allowWatchBookmark 842 ctx, _ := context.WithTimeout(context.Background(), c.timeout) 843 watcher, err := cacher.Watch(ctx, "pods/ns/foo", storage.ListOptions{ResourceVersion: startVersion, Predicate: pred}) 844 if err != nil { 845 t.Fatalf("Unexpected error: %v", err) 846 } 847 848 // Create events of other pods 849 updatePod(t, etcdStorage, makeTestPod(fmt.Sprintf("foo-whatever-%d", i)), nil) 850 851 // Now wait for Bookmark event 852 select { 853 case event, ok := <-watcher.ResultChan(): 854 if !ok && c.expected { 855 t.Errorf("Unexpected object watched (no objects)") 856 } 857 if c.expected && event.Type != watch.Bookmark { 858 t.Errorf("Unexpected object watched %#v", event) 859 } 860 case <-time.After(time.Second * 3): 861 if c.expected { 862 t.Errorf("Unexpected object watched (timeout)") 863 } 864 } 865 watcher.Stop() 866 } 867} 868 869func TestWatchBookmarksWithCorrectResourceVersion(t *testing.T) { 870 defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchBookmark, true)() 871 872 server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix()) 873 defer server.Terminate(t) 874 cacher, v, err := newTestCacher(etcdStorage) 875 if err != nil { 876 t.Fatalf("Couldn't create cacher: %v", err) 877 } 878 defer cacher.Stop() 879 880 pred := storage.Everything 881 pred.AllowWatchBookmarks = true 882 ctx, _ := context.WithTimeout(context.Background(), 3*time.Second) 883 watcher, err := cacher.WatchList(ctx, "pods/ns", storage.ListOptions{ResourceVersion: "0", Predicate: pred}) 884 if err != nil { 885 t.Fatalf("Unexpected error: %v", err) 886 } 887 defer watcher.Stop() 888 889 done := make(chan struct{}) 890 var wg sync.WaitGroup 891 wg.Add(1) 892 defer wg.Wait() // We must wait for the waitgroup to exit before we terminate the cache or the server in prior defers 893 defer close(done) // call close first, so the goroutine knows to exit 894 go func() { 895 defer wg.Done() 896 for i := 0; i < 100; i++ { 897 select { 898 case <-done: 899 return 900 default: 901 pod := fmt.Sprintf("foo-%d", i) 902 err := createPod(etcdStorage, makeTestPod(pod)) 903 if err != nil { 904 t.Fatalf("failed to create pod %v: %v", pod, err) 905 } 906 time.Sleep(time.Second / 100) 907 } 908 } 909 }() 910 911 bookmarkReceived := false 912 lastObservedResourceVersion := uint64(0) 913 for event := range watcher.ResultChan() { 914 rv, err := v.ObjectResourceVersion(event.Object) 915 if err != nil { 916 t.Fatalf("failed to parse resourceVersion from %#v", event) 917 } 918 if event.Type == watch.Bookmark { 919 bookmarkReceived = true 920 // bookmark event has a RV greater than or equal to the before one 921 if rv < lastObservedResourceVersion { 922 t.Fatalf("Unexpected bookmark resourceVersion %v less than observed %v)", rv, lastObservedResourceVersion) 923 } 924 } else { 925 // non-bookmark event has a RV greater than anything before 926 if rv <= lastObservedResourceVersion { 927 t.Fatalf("Unexpected event resourceVersion %v less than or equal to bookmark %v)", rv, lastObservedResourceVersion) 928 } 929 } 930 lastObservedResourceVersion = rv 931 } 932 // Make sure we have received a bookmark event 933 if !bookmarkReceived { 934 t.Fatalf("Unpexected error, we did not received a bookmark event") 935 } 936} 937