1/*
2Copyright 2015 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package tests
18
19import (
20	"context"
21	"fmt"
22	"reflect"
23	goruntime "runtime"
24	"strconv"
25	"sync"
26	"testing"
27	"time"
28
29	apitesting "k8s.io/apimachinery/pkg/api/apitesting"
30	apiequality "k8s.io/apimachinery/pkg/api/equality"
31	"k8s.io/apimachinery/pkg/api/errors"
32	"k8s.io/apimachinery/pkg/api/meta"
33	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
34	"k8s.io/apimachinery/pkg/fields"
35	"k8s.io/apimachinery/pkg/labels"
36	"k8s.io/apimachinery/pkg/runtime"
37	"k8s.io/apimachinery/pkg/runtime/serializer"
38	"k8s.io/apimachinery/pkg/util/clock"
39	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
40	"k8s.io/apimachinery/pkg/util/sets"
41	"k8s.io/apimachinery/pkg/util/wait"
42	"k8s.io/apimachinery/pkg/watch"
43	"k8s.io/apiserver/pkg/apis/example"
44	examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
45	"k8s.io/apiserver/pkg/features"
46	"k8s.io/apiserver/pkg/storage"
47	cacherstorage "k8s.io/apiserver/pkg/storage/cacher"
48	"k8s.io/apiserver/pkg/storage/etcd3"
49	etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing"
50	storagetesting "k8s.io/apiserver/pkg/storage/testing"
51	"k8s.io/apiserver/pkg/storage/value"
52	utilfeature "k8s.io/apiserver/pkg/util/feature"
53	featuregatetesting "k8s.io/component-base/featuregate/testing"
54)
55
56var (
57	scheme = runtime.NewScheme()
58	codecs = serializer.NewCodecFactory(scheme)
59)
60
61const (
62	// watchCacheDefaultCapacity syncs watch cache defaultLowerBoundCapacity.
63	watchCacheDefaultCapacity = 100
64)
65
66func init() {
67	metav1.AddToGroupVersion(scheme, metav1.SchemeGroupVersion)
68	utilruntime.Must(example.AddToScheme(scheme))
69	utilruntime.Must(examplev1.AddToScheme(scheme))
70}
71
72// GetAttrs returns labels and fields of a given object for filtering purposes.
73func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, error) {
74	pod, ok := obj.(*example.Pod)
75	if !ok {
76		return nil, nil, fmt.Errorf("not a pod")
77	}
78	return labels.Set(pod.ObjectMeta.Labels), PodToSelectableFields(pod), nil
79}
80
81// PodToSelectableFields returns a field set that represents the object
82// TODO: fields are not labels, and the validation rules for them do not apply.
83func PodToSelectableFields(pod *example.Pod) fields.Set {
84	// The purpose of allocation with a given number of elements is to reduce
85	// amount of allocations needed to create the fields.Set. If you add any
86	// field here or the number of object-meta related fields changes, this should
87	// be adjusted.
88	podSpecificFieldsSet := make(fields.Set, 5)
89	podSpecificFieldsSet["spec.nodeName"] = pod.Spec.NodeName
90	podSpecificFieldsSet["spec.restartPolicy"] = string(pod.Spec.RestartPolicy)
91	podSpecificFieldsSet["status.phase"] = string(pod.Status.Phase)
92	return AddObjectMetaFieldsSet(podSpecificFieldsSet, &pod.ObjectMeta, true)
93}
94
95func AddObjectMetaFieldsSet(source fields.Set, objectMeta *metav1.ObjectMeta, hasNamespaceField bool) fields.Set {
96	source["metadata.name"] = objectMeta.Name
97	if hasNamespaceField {
98		source["metadata.namespace"] = objectMeta.Namespace
99	}
100	return source
101}
102
103func newPod() runtime.Object     { return &example.Pod{} }
104func newPodList() runtime.Object { return &example.PodList{} }
105
106func newEtcdTestStorage(t *testing.T, prefix string) (*etcd3testing.EtcdTestServer, storage.Interface) {
107	server, _ := etcd3testing.NewUnsecuredEtcd3TestClientServer(t)
108	storage := etcd3.New(server.V3Client, apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion), newPod, prefix, value.IdentityTransformer, true)
109	return server, storage
110}
111
112func newTestCacher(s storage.Interface) (*cacherstorage.Cacher, storage.Versioner, error) {
113	return newTestCacherWithClock(s, clock.RealClock{})
114}
115
116func newTestCacherWithClock(s storage.Interface, clock clock.Clock) (*cacherstorage.Cacher, storage.Versioner, error) {
117	prefix := "pods"
118	v := etcd3.APIObjectVersioner{}
119	config := cacherstorage.Config{
120		Storage:        s,
121		Versioner:      v,
122		ResourcePrefix: prefix,
123		KeyFunc:        func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) },
124		GetAttrsFunc:   GetAttrs,
125		NewFunc:        newPod,
126		NewListFunc:    newPodList,
127		Codec:          codecs.LegacyCodec(examplev1.SchemeGroupVersion),
128		Clock:          clock,
129	}
130	cacher, err := cacherstorage.NewCacherFromConfig(config)
131	return cacher, v, err
132}
133
134func makeTestPod(name string) *example.Pod {
135	return &example.Pod{
136		ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: name},
137		Spec:       storagetesting.DeepEqualSafePodSpec(),
138	}
139}
140
141func createPod(s storage.Interface, obj *example.Pod) error {
142	key := "pods/" + obj.Namespace + "/" + obj.Name
143	out := &example.Pod{}
144	return s.Create(context.TODO(), key, obj, out, 0)
145}
146
147func updatePod(t *testing.T, s storage.Interface, obj, old *example.Pod) *example.Pod {
148	updateFn := func(input runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) {
149		return obj.DeepCopyObject(), nil, nil
150	}
151	key := "pods/" + obj.Namespace + "/" + obj.Name
152	if err := s.GuaranteedUpdate(context.TODO(), key, &example.Pod{}, old == nil, nil, updateFn, nil); err != nil {
153		t.Errorf("unexpected error: %v", err)
154	}
155	obj.ResourceVersion = ""
156	result := &example.Pod{}
157	if err := s.Get(context.TODO(), key, storage.GetOptions{}, result); err != nil {
158		t.Errorf("unexpected error: %v", err)
159	}
160	return result
161}
162
163func TestGet(t *testing.T) {
164	server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix())
165	defer server.Terminate(t)
166	cacher, _, err := newTestCacher(etcdStorage)
167	if err != nil {
168		t.Fatalf("Couldn't create cacher: %v", err)
169	}
170	defer cacher.Stop()
171
172	podFoo := makeTestPod("foo")
173	fooCreated := updatePod(t, etcdStorage, podFoo, nil)
174
175	// We pass the ResourceVersion from the above Create() operation.
176	result := &example.Pod{}
177	if err := cacher.Get(context.TODO(), "pods/ns/foo", storage.GetOptions{IgnoreNotFound: true, ResourceVersion: fooCreated.ResourceVersion}, result); err != nil {
178		t.Errorf("Unexpected error: %v", err)
179	}
180	if e, a := *fooCreated, *result; !reflect.DeepEqual(e, a) {
181		t.Errorf("Expected: %#v, got: %#v", e, a)
182	}
183
184	if err := cacher.Get(context.TODO(), "pods/ns/bar", storage.GetOptions{ResourceVersion: fooCreated.ResourceVersion, IgnoreNotFound: true}, result); err != nil {
185		t.Errorf("Unexpected error: %v", err)
186	}
187	emptyPod := example.Pod{}
188	if e, a := emptyPod, *result; !reflect.DeepEqual(e, a) {
189		t.Errorf("Expected: %#v, got: %#v", e, a)
190	}
191
192	if err := cacher.Get(context.TODO(), "pods/ns/bar", storage.GetOptions{ResourceVersion: fooCreated.ResourceVersion}, result); !storage.IsNotFound(err) {
193		t.Errorf("Unexpected error: %v", err)
194	}
195}
196
197func TestGetToList(t *testing.T) {
198	server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix())
199	defer server.Terminate(t)
200	cacher, _, err := newTestCacher(etcdStorage)
201	if err != nil {
202		t.Fatalf("Couldn't create cacher: %v", err)
203	}
204	defer cacher.Stop()
205
206	storedObj := updatePod(t, etcdStorage, makeTestPod("foo"), nil)
207	key := "pods/" + storedObj.Namespace + "/" + storedObj.Name
208
209	tests := []struct {
210		key         string
211		pred        storage.SelectionPredicate
212		expectedOut []*example.Pod
213	}{{ // test GetToList on existing key
214		key:         key,
215		pred:        storage.Everything,
216		expectedOut: []*example.Pod{storedObj},
217	}, { // test GetToList on non-existing key
218		key:         "/non-existing",
219		pred:        storage.Everything,
220		expectedOut: nil,
221	}, { // test GetToList with matching pod name
222		key: "/non-existing",
223		pred: storage.SelectionPredicate{
224			Label: labels.Everything(),
225			Field: fields.ParseSelectorOrDie("metadata.name!=" + storedObj.Name),
226			GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
227				pod := obj.(*example.Pod)
228				return nil, fields.Set{"metadata.name": pod.Name}, nil
229			},
230		},
231		expectedOut: nil,
232	}}
233
234	for i, tt := range tests {
235		out := &example.PodList{}
236		err := cacher.GetToList(context.TODO(), tt.key, storage.ListOptions{Predicate: tt.pred}, out)
237		if err != nil {
238			t.Fatalf("GetToList failed: %v", err)
239		}
240		if len(out.ResourceVersion) == 0 {
241			t.Errorf("#%d: unset resourceVersion", i)
242		}
243		if len(out.Items) != len(tt.expectedOut) {
244			t.Errorf("#%d: length of list want=%d, get=%d", i, len(tt.expectedOut), len(out.Items))
245			continue
246		}
247		for j, wantPod := range tt.expectedOut {
248			getPod := &out.Items[j]
249			if !reflect.DeepEqual(wantPod, getPod) {
250				t.Errorf("#%d: pod want=%#v, get=%#v", i, wantPod, getPod)
251			}
252		}
253	}
254}
255
256func TestList(t *testing.T) {
257	server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix())
258	defer server.Terminate(t)
259	cacher, _, err := newTestCacher(etcdStorage)
260	if err != nil {
261		t.Fatalf("Couldn't create cacher: %v", err)
262	}
263	defer cacher.Stop()
264
265	podFoo := makeTestPod("foo")
266	podBar := makeTestPod("bar")
267	podBaz := makeTestPod("baz")
268
269	podFooPrime := makeTestPod("foo")
270	podFooPrime.Spec.NodeName = "fakeNode"
271
272	fooCreated := updatePod(t, etcdStorage, podFoo, nil)
273	_ = updatePod(t, etcdStorage, podBar, nil)
274	_ = updatePod(t, etcdStorage, podBaz, nil)
275
276	_ = updatePod(t, etcdStorage, podFooPrime, fooCreated)
277
278	// Create a pod in a namespace that contains "ns" as a prefix
279	// Make sure it is not returned in a watch of "ns"
280	podFooNS2 := makeTestPod("foo")
281	podFooNS2.Namespace += "2"
282	updatePod(t, etcdStorage, podFooNS2, nil)
283
284	deleted := example.Pod{}
285	if err := etcdStorage.Delete(context.TODO(), "pods/ns/bar", &deleted, nil, storage.ValidateAllObjectFunc); err != nil {
286		t.Errorf("Unexpected error: %v", err)
287	}
288
289	// We first List directly from etcd by passing empty resourceVersion,
290	// to get the current etcd resourceVersion.
291	rvResult := &example.PodList{}
292	if err := cacher.List(context.TODO(), "pods/ns", storage.ListOptions{Predicate: storage.Everything}, rvResult); err != nil {
293		t.Errorf("Unexpected error: %v", err)
294	}
295	deletedPodRV := rvResult.ListMeta.ResourceVersion
296
297	result := &example.PodList{}
298	// We pass the current etcd ResourceVersion received from the above List() operation,
299	// since there is not easy way to get ResourceVersion of barPod deletion operation.
300	if err := cacher.List(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: deletedPodRV, Predicate: storage.Everything}, result); err != nil {
301		t.Errorf("Unexpected error: %v", err)
302	}
303	if result.ListMeta.ResourceVersion != deletedPodRV {
304		t.Errorf("Incorrect resource version: %v", result.ListMeta.ResourceVersion)
305	}
306	if len(result.Items) != 2 {
307		t.Errorf("Unexpected list result: %d", len(result.Items))
308	}
309	keys := sets.String{}
310	for _, item := range result.Items {
311		keys.Insert(item.Name)
312	}
313	if !keys.HasAll("foo", "baz") {
314		t.Errorf("Unexpected list result: %#v", result)
315	}
316	for _, item := range result.Items {
317		// unset fields that are set by the infrastructure
318		item.ResourceVersion = ""
319		item.CreationTimestamp = metav1.Time{}
320
321		if item.Namespace != "ns" {
322			t.Errorf("Unexpected namespace: %s", item.Namespace)
323		}
324
325		var expected *example.Pod
326		switch item.Name {
327		case "foo":
328			expected = podFooPrime
329		case "baz":
330			expected = podBaz
331		default:
332			t.Errorf("Unexpected item: %v", item)
333		}
334		if e, a := *expected, item; !reflect.DeepEqual(e, a) {
335			t.Errorf("Expected: %#v, got: %#v", e, a)
336		}
337	}
338}
339
340// TestTooLargeResourceVersionList ensures that a list request for a resource version higher than available
341// in the watch cache completes (does not wait indefinitely) and results in a ResourceVersionTooLarge error.
342func TestTooLargeResourceVersionList(t *testing.T) {
343	server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix())
344	defer server.Terminate(t)
345	cacher, v, err := newTestCacher(etcdStorage)
346	if err != nil {
347		t.Fatalf("Couldn't create cacher: %v", err)
348	}
349	defer cacher.Stop()
350
351	podFoo := makeTestPod("foo")
352	fooCreated := updatePod(t, etcdStorage, podFoo, nil)
353
354	// Set up List at fooCreated.ResourceVersion + 10
355	rv, err := v.ParseResourceVersion(fooCreated.ResourceVersion)
356	if err != nil {
357		t.Fatalf("Unexpected error: %v", err)
358	}
359	listRV := strconv.Itoa(int(rv + 10))
360
361	result := &example.PodList{}
362	err = cacher.List(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: listRV, Predicate: storage.Everything}, result)
363	if !errors.IsTimeout(err) {
364		t.Errorf("Unexpected error: %v", err)
365	}
366	if !storage.IsTooLargeResourceVersion(err) {
367		t.Errorf("expected 'Too large resource version' cause in error but got: %v", err)
368	}
369}
370
371func verifyWatchEvent(t *testing.T, w watch.Interface, eventType watch.EventType, eventObject runtime.Object) {
372	_, _, line, _ := goruntime.Caller(1)
373	select {
374	case event := <-w.ResultChan():
375		if e, a := eventType, event.Type; e != a {
376			t.Logf("(called from line %d)", line)
377			t.Errorf("Expected: %s, got: %s", eventType, event.Type)
378		}
379		object := event.Object
380		if co, ok := object.(runtime.CacheableObject); ok {
381			object = co.GetObject()
382		}
383		if e, a := eventObject, object; !apiequality.Semantic.DeepDerivative(e, a) {
384			t.Logf("(called from line %d)", line)
385			t.Errorf("Expected (%s): %#v, got: %#v", eventType, e, a)
386		}
387	case <-time.After(wait.ForeverTestTimeout):
388		t.Logf("(called from line %d)", line)
389		t.Errorf("Timed out waiting for an event")
390	}
391}
392
393type injectListError struct {
394	errors int
395	storage.Interface
396}
397
398func (self *injectListError) List(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
399	if self.errors > 0 {
400		self.errors--
401		return fmt.Errorf("injected error")
402	}
403	return self.Interface.List(ctx, key, opts, listObj)
404}
405
406func TestWatch(t *testing.T) {
407	server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix())
408	// Inject one list error to make sure we test the relist case.
409	etcdStorage = &injectListError{errors: 1, Interface: etcdStorage}
410	defer server.Terminate(t)
411	fakeClock := clock.NewFakeClock(time.Now())
412	cacher, _, err := newTestCacherWithClock(etcdStorage, fakeClock)
413	if err != nil {
414		t.Fatalf("Couldn't create cacher: %v", err)
415	}
416	defer cacher.Stop()
417
418	podFoo := makeTestPod("foo")
419	podBar := makeTestPod("bar")
420
421	podFooPrime := makeTestPod("foo")
422	podFooPrime.Spec.NodeName = "fakeNode"
423
424	podFooBis := makeTestPod("foo")
425	podFooBis.Spec.NodeName = "anotherFakeNode"
426
427	podFooNS2 := makeTestPod("foo")
428	podFooNS2.Namespace += "2"
429
430	// initialVersion is used to initate the watcher at the beginning of the world,
431	// which is not defined precisely in etcd.
432	initialVersion, err := cacher.LastSyncResourceVersion()
433	if err != nil {
434		t.Fatalf("Unexpected error: %v", err)
435	}
436	startVersion := strconv.Itoa(int(initialVersion))
437
438	// Set up Watch for object "podFoo".
439	watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", storage.ListOptions{ResourceVersion: startVersion, Predicate: storage.Everything})
440	if err != nil {
441		t.Fatalf("Unexpected error: %v", err)
442	}
443	defer watcher.Stop()
444
445	// Create in another namespace first to make sure events from other namespaces don't get delivered
446	updatePod(t, etcdStorage, podFooNS2, nil)
447
448	fooCreated := updatePod(t, etcdStorage, podFoo, nil)
449	_ = updatePod(t, etcdStorage, podBar, nil)
450	fooUpdated := updatePod(t, etcdStorage, podFooPrime, fooCreated)
451
452	verifyWatchEvent(t, watcher, watch.Added, podFoo)
453	verifyWatchEvent(t, watcher, watch.Modified, podFooPrime)
454
455	initialWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", storage.ListOptions{ResourceVersion: fooCreated.ResourceVersion, Predicate: storage.Everything})
456	if err != nil {
457		t.Fatalf("Unexpected error: %v", err)
458	}
459	defer initialWatcher.Stop()
460
461	verifyWatchEvent(t, initialWatcher, watch.Modified, podFooPrime)
462
463	// Now test watch from "now".
464	nowWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
465	if err != nil {
466		t.Fatalf("Unexpected error: %v", err)
467	}
468	defer nowWatcher.Stop()
469
470	verifyWatchEvent(t, nowWatcher, watch.Added, podFooPrime)
471
472	_ = updatePod(t, etcdStorage, podFooBis, fooUpdated)
473
474	verifyWatchEvent(t, nowWatcher, watch.Modified, podFooBis)
475
476	// Add watchCacheDefaultCapacity events to make current watch cache full.
477	// Make start and last event duration exceed eventFreshDuration(current 75s) to ensure watch cache won't expand.
478	for i := 0; i < watchCacheDefaultCapacity; i++ {
479		fakeClock.SetTime(time.Now().Add(time.Duration(i) * time.Minute))
480		podFoo := makeTestPod(fmt.Sprintf("foo-%d", i))
481		updatePod(t, etcdStorage, podFoo, nil)
482	}
483
484	// Check whether we get too-old error via the watch channel
485	tooOldWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", storage.ListOptions{ResourceVersion: "1", Predicate: storage.Everything})
486	if err != nil {
487		t.Fatalf("Expected no direct error, got %v", err)
488	}
489	defer tooOldWatcher.Stop()
490
491	// Ensure we get a "Gone" error.
492	expectedResourceExpiredError := errors.NewResourceExpired("").ErrStatus
493	verifyWatchEvent(t, tooOldWatcher, watch.Error, &expectedResourceExpiredError)
494}
495
496func TestWatcherTimeout(t *testing.T) {
497	server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix())
498	defer server.Terminate(t)
499	cacher, _, err := newTestCacher(etcdStorage)
500	if err != nil {
501		t.Fatalf("Couldn't create cacher: %v", err)
502	}
503	defer cacher.Stop()
504
505	// initialVersion is used to initate the watcher at the beginning of the world,
506	// which is not defined precisely in etcd.
507	initialVersion, err := cacher.LastSyncResourceVersion()
508	if err != nil {
509		t.Fatalf("Unexpected error: %v", err)
510	}
511	startVersion := strconv.Itoa(int(initialVersion))
512
513	// Create a number of watchers that will not be reading any result.
514	nonReadingWatchers := 50
515	for i := 0; i < nonReadingWatchers; i++ {
516		watcher, err := cacher.WatchList(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: startVersion, Predicate: storage.Everything})
517		if err != nil {
518			t.Fatalf("Unexpected error: %v", err)
519		}
520		defer watcher.Stop()
521	}
522
523	// Create a second watcher that will be reading result.
524	readingWatcher, err := cacher.WatchList(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: startVersion, Predicate: storage.Everything})
525	if err != nil {
526		t.Fatalf("Unexpected error: %v", err)
527	}
528	defer readingWatcher.Stop()
529
530	startTime := time.Now()
531	for i := 1; i <= 22; i++ {
532		pod := makeTestPod(strconv.Itoa(i))
533		_ = updatePod(t, etcdStorage, pod, nil)
534		verifyWatchEvent(t, readingWatcher, watch.Added, pod)
535	}
536	if time.Since(startTime) > time.Duration(250*nonReadingWatchers)*time.Millisecond {
537		t.Errorf("waiting for events took too long: %v", time.Since(startTime))
538	}
539}
540
541func TestFiltering(t *testing.T) {
542	server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix())
543	defer server.Terminate(t)
544	cacher, _, err := newTestCacher(etcdStorage)
545	if err != nil {
546		t.Fatalf("Couldn't create cacher: %v", err)
547	}
548	defer cacher.Stop()
549
550	// Ensure that the cacher is initialized, before creating any pods,
551	// so that we are sure that all events will be present in cacher.
552	syncWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
553	if err != nil {
554		t.Fatalf("Unexpected error: %v", err)
555	}
556	syncWatcher.Stop()
557
558	podFoo := makeTestPod("foo")
559	podFoo.Labels = map[string]string{"filter": "foo"}
560	podFooFiltered := makeTestPod("foo")
561	podFooPrime := makeTestPod("foo")
562	podFooPrime.Labels = map[string]string{"filter": "foo"}
563	podFooPrime.Spec.NodeName = "fakeNode"
564
565	podFooNS2 := makeTestPod("foo")
566	podFooNS2.Namespace += "2"
567	podFooNS2.Labels = map[string]string{"filter": "foo"}
568
569	// Create in another namespace first to make sure events from other namespaces don't get delivered
570	updatePod(t, etcdStorage, podFooNS2, nil)
571
572	fooCreated := updatePod(t, etcdStorage, podFoo, nil)
573	fooFiltered := updatePod(t, etcdStorage, podFooFiltered, fooCreated)
574	fooUnfiltered := updatePod(t, etcdStorage, podFoo, fooFiltered)
575	_ = updatePod(t, etcdStorage, podFooPrime, fooUnfiltered)
576
577	deleted := example.Pod{}
578	if err := etcdStorage.Delete(context.TODO(), "pods/ns/foo", &deleted, nil, storage.ValidateAllObjectFunc); err != nil {
579		t.Errorf("Unexpected error: %v", err)
580	}
581
582	// Set up Watch for object "podFoo" with label filter set.
583	pred := storage.SelectionPredicate{
584		Label: labels.SelectorFromSet(labels.Set{"filter": "foo"}),
585		Field: fields.Everything(),
586		GetAttrs: func(obj runtime.Object) (label labels.Set, field fields.Set, err error) {
587			metadata, err := meta.Accessor(obj)
588			if err != nil {
589				t.Fatalf("Unexpected error: %v", err)
590			}
591			return labels.Set(metadata.GetLabels()), nil, nil
592		},
593	}
594	watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", storage.ListOptions{ResourceVersion: fooCreated.ResourceVersion, Predicate: pred})
595	if err != nil {
596		t.Fatalf("Unexpected error: %v", err)
597	}
598	defer watcher.Stop()
599
600	verifyWatchEvent(t, watcher, watch.Deleted, podFooFiltered)
601	verifyWatchEvent(t, watcher, watch.Added, podFoo)
602	verifyWatchEvent(t, watcher, watch.Modified, podFooPrime)
603	verifyWatchEvent(t, watcher, watch.Deleted, podFooPrime)
604}
605
606func TestEmptyWatchEventCache(t *testing.T) {
607	server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix())
608	defer server.Terminate(t)
609
610	// add a few objects
611	updatePod(t, etcdStorage, makeTestPod("pod1"), nil)
612	updatePod(t, etcdStorage, makeTestPod("pod2"), nil)
613	updatePod(t, etcdStorage, makeTestPod("pod3"), nil)
614	updatePod(t, etcdStorage, makeTestPod("pod4"), nil)
615	updatePod(t, etcdStorage, makeTestPod("pod5"), nil)
616
617	fooCreated := updatePod(t, etcdStorage, makeTestPod("foo"), nil)
618
619	cacher, v, err := newTestCacher(etcdStorage)
620	if err != nil {
621		t.Fatalf("Couldn't create cacher: %v", err)
622	}
623	defer cacher.Stop()
624
625	// get rv of last pod created
626	rv, err := v.ParseResourceVersion(fooCreated.ResourceVersion)
627	if err != nil {
628		t.Fatalf("Unexpected error: %v", err)
629	}
630
631	// We now have a cacher with an empty cache of watch events and a resourceVersion of rv.
632	// It should support establishing watches from rv and higher, but not older.
633
634	{
635		watcher, err := cacher.Watch(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: strconv.Itoa(int(rv - 1)), Predicate: storage.Everything})
636		if err != nil {
637			t.Fatalf("Unexpected error: %v", err)
638		}
639		defer watcher.Stop()
640		expectedResourceExpiredError := errors.NewResourceExpired("").ErrStatus
641		verifyWatchEvent(t, watcher, watch.Error, &expectedResourceExpiredError)
642	}
643
644	{
645		watcher, err := cacher.Watch(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: strconv.Itoa(int(rv + 1)), Predicate: storage.Everything})
646		if err != nil {
647			t.Fatalf("Unexpected error: %v", err)
648		}
649		defer watcher.Stop()
650		select {
651		case e := <-watcher.ResultChan():
652			t.Errorf("unexpected event %#v", e)
653		case <-time.After(3 * time.Second):
654			// watch from rv+1 remained established successfully
655		}
656	}
657
658	{
659		watcher, err := cacher.Watch(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: strconv.Itoa(int(rv)), Predicate: storage.Everything})
660		if err != nil {
661			t.Fatalf("Unexpected error: %v", err)
662		}
663		defer watcher.Stop()
664		select {
665		case e := <-watcher.ResultChan():
666			t.Errorf("unexpected event %#v", e)
667		case <-time.After(3 * time.Second):
668			// watch from rv remained established successfully
669		}
670	}
671}
672
673func TestRandomWatchDeliver(t *testing.T) {
674	server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix())
675	defer server.Terminate(t)
676	cacher, v, err := newTestCacher(etcdStorage)
677	if err != nil {
678		t.Fatalf("Couldn't create cacher: %v", err)
679	}
680	defer cacher.Stop()
681
682	fooCreated := updatePod(t, etcdStorage, makeTestPod("foo"), nil)
683	rv, err := v.ParseResourceVersion(fooCreated.ResourceVersion)
684	if err != nil {
685		t.Fatalf("Unexpected error: %v", err)
686	}
687	startVersion := strconv.Itoa(int(rv))
688
689	watcher, err := cacher.WatchList(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: startVersion, Predicate: storage.Everything})
690	if err != nil {
691		t.Fatalf("Unexpected error: %v", err)
692	}
693
694	// Now we can create exactly 21 events that should be delivered
695	// to the watcher, before it will completely block cacher and as
696	// a result will be dropped.
697	for i := 0; i < 21; i++ {
698		updatePod(t, etcdStorage, makeTestPod(fmt.Sprintf("foo-%d", i)), nil)
699	}
700
701	// Now stop the watcher and check if the consecutive events are being delivered.
702	watcher.Stop()
703
704	watched := 0
705	for {
706		event, ok := <-watcher.ResultChan()
707		if !ok {
708			break
709		}
710		object := event.Object
711		if co, ok := object.(runtime.CacheableObject); ok {
712			object = co.GetObject()
713		}
714		if a, e := object.(*example.Pod).Name, fmt.Sprintf("foo-%d", watched); e != a {
715			t.Errorf("Unexpected object watched: %s, expected %s", a, e)
716		}
717		watched++
718	}
719}
720
721func TestCacherListerWatcher(t *testing.T) {
722	prefix := "pods"
723	fn := func() runtime.Object { return &example.PodList{} }
724	server, store := newEtcdTestStorage(t, prefix)
725	defer server.Terminate(t)
726
727	podFoo := makeTestPod("foo")
728	podBar := makeTestPod("bar")
729	podBaz := makeTestPod("baz")
730
731	_ = updatePod(t, store, podFoo, nil)
732	_ = updatePod(t, store, podBar, nil)
733	_ = updatePod(t, store, podBaz, nil)
734
735	lw := cacherstorage.NewCacherListerWatcher(store, prefix, fn)
736
737	obj, err := lw.List(metav1.ListOptions{})
738	if err != nil {
739		t.Fatalf("List failed: %v", err)
740	}
741	pl, ok := obj.(*example.PodList)
742	if !ok {
743		t.Fatalf("Expected PodList but got %v", pl)
744	}
745	if len(pl.Items) != 3 {
746		t.Errorf("Expected PodList of length 3 but got %d", len(pl.Items))
747	}
748}
749
750func TestCacherListerWatcherPagination(t *testing.T) {
751	prefix := "pods"
752	fn := func() runtime.Object { return &example.PodList{} }
753	server, store := newEtcdTestStorage(t, prefix)
754	defer server.Terminate(t)
755
756	podFoo := makeTestPod("foo")
757	podBar := makeTestPod("bar")
758	podBaz := makeTestPod("baz")
759
760	_ = updatePod(t, store, podFoo, nil)
761	_ = updatePod(t, store, podBar, nil)
762	_ = updatePod(t, store, podBaz, nil)
763
764	lw := cacherstorage.NewCacherListerWatcher(store, prefix, fn)
765
766	obj1, err := lw.List(metav1.ListOptions{Limit: 2})
767	if err != nil {
768		t.Fatalf("List failed: %v", err)
769	}
770	limit1, ok := obj1.(*example.PodList)
771	if !ok {
772		t.Fatalf("Expected PodList but got %v", limit1)
773	}
774	if len(limit1.Items) != 2 {
775		t.Errorf("Expected PodList of length 2 but got %d", len(limit1.Items))
776	}
777	if limit1.Continue == "" {
778		t.Errorf("Expected list to have Continue but got none")
779	}
780	obj2, err := lw.List(metav1.ListOptions{Limit: 2, Continue: limit1.Continue})
781	if err != nil {
782		t.Fatalf("List failed: %v", err)
783	}
784	limit2, ok := obj2.(*example.PodList)
785	if !ok {
786		t.Fatalf("Expected PodList but got %v", limit2)
787	}
788	if limit2.Continue != "" {
789		t.Errorf("Expected list not to have Continue, but got %s", limit1.Continue)
790	}
791
792	if limit1.Items[0].Name != podBar.Name {
793		t.Errorf("Expected list1.Items[0] to be %s but got %s", podBar.Name, limit1.Items[0].Name)
794	}
795	if limit1.Items[1].Name != podBaz.Name {
796		t.Errorf("Expected list1.Items[1] to be %s but got %s", podBaz.Name, limit1.Items[1].Name)
797	}
798	if limit2.Items[0].Name != podFoo.Name {
799		t.Errorf("Expected list2.Items[0] to be %s but got %s", podFoo.Name, limit2.Items[0].Name)
800	}
801
802}
803
804func TestWatchDispatchBookmarkEvents(t *testing.T) {
805	defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchBookmark, true)()
806
807	server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix())
808	defer server.Terminate(t)
809	cacher, v, err := newTestCacher(etcdStorage)
810	if err != nil {
811		t.Fatalf("Couldn't create cacher: %v", err)
812	}
813	defer cacher.Stop()
814
815	fooCreated := updatePod(t, etcdStorage, makeTestPod("foo"), nil)
816	rv, err := v.ParseResourceVersion(fooCreated.ResourceVersion)
817	if err != nil {
818		t.Fatalf("Unexpected error: %v", err)
819	}
820	startVersion := strconv.Itoa(int(rv))
821
822	tests := []struct {
823		timeout            time.Duration
824		expected           bool
825		allowWatchBookmark bool
826	}{
827		{ // test old client won't get Bookmark event
828			timeout:            3 * time.Second,
829			expected:           false,
830			allowWatchBookmark: false,
831		},
832		{
833			timeout:            3 * time.Second,
834			expected:           true,
835			allowWatchBookmark: true,
836		},
837	}
838
839	for i, c := range tests {
840		pred := storage.Everything
841		pred.AllowWatchBookmarks = c.allowWatchBookmark
842		ctx, _ := context.WithTimeout(context.Background(), c.timeout)
843		watcher, err := cacher.Watch(ctx, "pods/ns/foo", storage.ListOptions{ResourceVersion: startVersion, Predicate: pred})
844		if err != nil {
845			t.Fatalf("Unexpected error: %v", err)
846		}
847
848		// Create events of other pods
849		updatePod(t, etcdStorage, makeTestPod(fmt.Sprintf("foo-whatever-%d", i)), nil)
850
851		// Now wait for Bookmark event
852		select {
853		case event, ok := <-watcher.ResultChan():
854			if !ok && c.expected {
855				t.Errorf("Unexpected object watched (no objects)")
856			}
857			if c.expected && event.Type != watch.Bookmark {
858				t.Errorf("Unexpected object watched %#v", event)
859			}
860		case <-time.After(time.Second * 3):
861			if c.expected {
862				t.Errorf("Unexpected object watched (timeout)")
863			}
864		}
865		watcher.Stop()
866	}
867}
868
869func TestWatchBookmarksWithCorrectResourceVersion(t *testing.T) {
870	defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchBookmark, true)()
871
872	server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix())
873	defer server.Terminate(t)
874	cacher, v, err := newTestCacher(etcdStorage)
875	if err != nil {
876		t.Fatalf("Couldn't create cacher: %v", err)
877	}
878	defer cacher.Stop()
879
880	pred := storage.Everything
881	pred.AllowWatchBookmarks = true
882	ctx, _ := context.WithTimeout(context.Background(), 3*time.Second)
883	watcher, err := cacher.WatchList(ctx, "pods/ns", storage.ListOptions{ResourceVersion: "0", Predicate: pred})
884	if err != nil {
885		t.Fatalf("Unexpected error: %v", err)
886	}
887	defer watcher.Stop()
888
889	done := make(chan struct{})
890	var wg sync.WaitGroup
891	wg.Add(1)
892	defer wg.Wait()   // We must wait for the waitgroup to exit before we terminate the cache or the server in prior defers
893	defer close(done) // call close first, so the goroutine knows to exit
894	go func() {
895		defer wg.Done()
896		for i := 0; i < 100; i++ {
897			select {
898			case <-done:
899				return
900			default:
901				pod := fmt.Sprintf("foo-%d", i)
902				err := createPod(etcdStorage, makeTestPod(pod))
903				if err != nil {
904					t.Fatalf("failed to create pod %v: %v", pod, err)
905				}
906				time.Sleep(time.Second / 100)
907			}
908		}
909	}()
910
911	bookmarkReceived := false
912	lastObservedResourceVersion := uint64(0)
913	for event := range watcher.ResultChan() {
914		rv, err := v.ObjectResourceVersion(event.Object)
915		if err != nil {
916			t.Fatalf("failed to parse resourceVersion from %#v", event)
917		}
918		if event.Type == watch.Bookmark {
919			bookmarkReceived = true
920			// bookmark event has a RV greater than or equal to the before one
921			if rv < lastObservedResourceVersion {
922				t.Fatalf("Unexpected bookmark resourceVersion %v less than observed %v)", rv, lastObservedResourceVersion)
923			}
924		} else {
925			// non-bookmark event has a RV greater than anything before
926			if rv <= lastObservedResourceVersion {
927				t.Fatalf("Unexpected event resourceVersion %v less than or equal to bookmark %v)", rv, lastObservedResourceVersion)
928			}
929		}
930		lastObservedResourceVersion = rv
931	}
932	// Make sure we have received a bookmark event
933	if !bookmarkReceived {
934		t.Fatalf("Unpexected error, we did not received a bookmark event")
935	}
936}
937