1/*
2Copyright 2015 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package cacher
18
19import (
20	"fmt"
21	"reflect"
22	"sort"
23	"sync"
24	"time"
25
26	"k8s.io/apimachinery/pkg/api/errors"
27	"k8s.io/apimachinery/pkg/fields"
28	"k8s.io/apimachinery/pkg/labels"
29	"k8s.io/apimachinery/pkg/runtime"
30	"k8s.io/apimachinery/pkg/util/clock"
31	"k8s.io/apimachinery/pkg/watch"
32	"k8s.io/apiserver/pkg/storage"
33	"k8s.io/client-go/tools/cache"
34	"k8s.io/klog/v2"
35	utiltrace "k8s.io/utils/trace"
36)
37
38const (
39	// blockTimeout determines how long we're willing to block the request
40	// to wait for a given resource version to be propagated to cache,
41	// before terminating request and returning Timeout error with retry
42	// after suggestion.
43	blockTimeout = 3 * time.Second
44
45	// resourceVersionTooHighRetrySeconds is the seconds before a operation should be retried by the client
46	// after receiving a 'too high resource version' error.
47	resourceVersionTooHighRetrySeconds = 1
48
49	// eventFreshDuration is time duration of events we want to keep.
50	// We set it to `defaultBookmarkFrequency` plus epsilon to maximize
51	// chances that last bookmark was sent within kept history, at the
52	// same time, minimizing the needed memory usage.
53	eventFreshDuration = 75 * time.Second
54
55	// defaultLowerBoundCapacity is a default value for event cache capacity's lower bound.
56	// TODO: Figure out, to what value we can decreased it.
57	defaultLowerBoundCapacity = 100
58
59	// defaultUpperBoundCapacity  should be able to keep eventFreshDuration of history.
60	defaultUpperBoundCapacity = 100 * 1024
61)
62
63// watchCacheEvent is a single "watch event" that is send to users of
64// watchCache. Additionally to a typical "watch.Event" it contains
65// the previous value of the object to enable proper filtering in the
66// upper layers.
67type watchCacheEvent struct {
68	Type            watch.EventType
69	Object          runtime.Object
70	ObjLabels       labels.Set
71	ObjFields       fields.Set
72	PrevObject      runtime.Object
73	PrevObjLabels   labels.Set
74	PrevObjFields   fields.Set
75	Key             string
76	ResourceVersion uint64
77	RecordTime      time.Time
78}
79
80// Computing a key of an object is generally non-trivial (it performs
81// e.g. validation underneath). Similarly computing object fields and
82// labels. To avoid computing them multiple times (to serve the event
83// in different List/Watch requests), in the underlying store we are
84// keeping structs (key, object, labels, fields).
85type storeElement struct {
86	Key    string
87	Object runtime.Object
88	Labels labels.Set
89	Fields fields.Set
90}
91
92func storeElementKey(obj interface{}) (string, error) {
93	elem, ok := obj.(*storeElement)
94	if !ok {
95		return "", fmt.Errorf("not a storeElement: %v", obj)
96	}
97	return elem.Key, nil
98}
99
100func storeElementObject(obj interface{}) (runtime.Object, error) {
101	elem, ok := obj.(*storeElement)
102	if !ok {
103		return nil, fmt.Errorf("not a storeElement: %v", obj)
104	}
105	return elem.Object, nil
106}
107
108func storeElementIndexFunc(objIndexFunc cache.IndexFunc) cache.IndexFunc {
109	return func(obj interface{}) (strings []string, e error) {
110		seo, err := storeElementObject(obj)
111		if err != nil {
112			return nil, err
113		}
114		return objIndexFunc(seo)
115	}
116}
117
118func storeElementIndexers(indexers *cache.Indexers) cache.Indexers {
119	if indexers == nil {
120		return cache.Indexers{}
121	}
122	ret := cache.Indexers{}
123	for indexName, indexFunc := range *indexers {
124		ret[indexName] = storeElementIndexFunc(indexFunc)
125	}
126	return ret
127}
128
129// watchCache implements a Store interface.
130// However, it depends on the elements implementing runtime.Object interface.
131//
132// watchCache is a "sliding window" (with a limited capacity) of objects
133// observed from a watch.
134type watchCache struct {
135	sync.RWMutex
136
137	// Condition on which lists are waiting for the fresh enough
138	// resource version.
139	cond *sync.Cond
140
141	// Maximum size of history window.
142	capacity int
143
144	// upper bound of capacity since event cache has a dynamic size.
145	upperBoundCapacity int
146
147	// lower bound of capacity since event cache has a dynamic size.
148	lowerBoundCapacity int
149
150	// keyFunc is used to get a key in the underlying storage for a given object.
151	keyFunc func(runtime.Object) (string, error)
152
153	// getAttrsFunc is used to get labels and fields of an object.
154	getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, error)
155
156	// cache is used a cyclic buffer - its first element (with the smallest
157	// resourceVersion) is defined by startIndex, its last element is defined
158	// by endIndex (if cache is full it will be startIndex + capacity).
159	// Both startIndex and endIndex can be greater than buffer capacity -
160	// you should always apply modulo capacity to get an index in cache array.
161	cache      []*watchCacheEvent
162	startIndex int
163	endIndex   int
164
165	// store will effectively support LIST operation from the "end of cache
166	// history" i.e. from the moment just after the newest cached watched event.
167	// It is necessary to effectively allow clients to start watching at now.
168	// NOTE: We assume that <store> is thread-safe.
169	store cache.Indexer
170
171	// ResourceVersion up to which the watchCache is propagated.
172	resourceVersion uint64
173
174	// ResourceVersion of the last list result (populated via Replace() method).
175	listResourceVersion uint64
176
177	// This handler is run at the end of every successful Replace() method.
178	onReplace func()
179
180	// This handler is run at the end of every Add/Update/Delete method
181	// and additionally gets the previous value of the object.
182	eventHandler func(*watchCacheEvent)
183
184	// for testing timeouts.
185	clock clock.Clock
186
187	// An underlying storage.Versioner.
188	versioner storage.Versioner
189
190	// cacher's objectType.
191	objectType reflect.Type
192}
193
194func newWatchCache(
195	keyFunc func(runtime.Object) (string, error),
196	eventHandler func(*watchCacheEvent),
197	getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, error),
198	versioner storage.Versioner,
199	indexers *cache.Indexers,
200	clock clock.Clock,
201	objectType reflect.Type) *watchCache {
202	wc := &watchCache{
203		capacity:            defaultLowerBoundCapacity,
204		keyFunc:             keyFunc,
205		getAttrsFunc:        getAttrsFunc,
206		cache:               make([]*watchCacheEvent, defaultLowerBoundCapacity),
207		lowerBoundCapacity:  defaultLowerBoundCapacity,
208		upperBoundCapacity:  defaultUpperBoundCapacity,
209		startIndex:          0,
210		endIndex:            0,
211		store:               cache.NewIndexer(storeElementKey, storeElementIndexers(indexers)),
212		resourceVersion:     0,
213		listResourceVersion: 0,
214		eventHandler:        eventHandler,
215		clock:               clock,
216		versioner:           versioner,
217		objectType:          objectType,
218	}
219	objType := objectType.String()
220	watchCacheCapacity.WithLabelValues(objType).Set(float64(wc.capacity))
221	wc.cond = sync.NewCond(wc.RLocker())
222	return wc
223}
224
225// Add takes runtime.Object as an argument.
226func (w *watchCache) Add(obj interface{}) error {
227	object, resourceVersion, err := w.objectToVersionedRuntimeObject(obj)
228	if err != nil {
229		return err
230	}
231	event := watch.Event{Type: watch.Added, Object: object}
232
233	f := func(elem *storeElement) error { return w.store.Add(elem) }
234	return w.processEvent(event, resourceVersion, f)
235}
236
237// Update takes runtime.Object as an argument.
238func (w *watchCache) Update(obj interface{}) error {
239	object, resourceVersion, err := w.objectToVersionedRuntimeObject(obj)
240	if err != nil {
241		return err
242	}
243	event := watch.Event{Type: watch.Modified, Object: object}
244
245	f := func(elem *storeElement) error { return w.store.Update(elem) }
246	return w.processEvent(event, resourceVersion, f)
247}
248
249// Delete takes runtime.Object as an argument.
250func (w *watchCache) Delete(obj interface{}) error {
251	object, resourceVersion, err := w.objectToVersionedRuntimeObject(obj)
252	if err != nil {
253		return err
254	}
255	event := watch.Event{Type: watch.Deleted, Object: object}
256
257	f := func(elem *storeElement) error { return w.store.Delete(elem) }
258	return w.processEvent(event, resourceVersion, f)
259}
260
261func (w *watchCache) objectToVersionedRuntimeObject(obj interface{}) (runtime.Object, uint64, error) {
262	object, ok := obj.(runtime.Object)
263	if !ok {
264		return nil, 0, fmt.Errorf("obj does not implement runtime.Object interface: %v", obj)
265	}
266	resourceVersion, err := w.versioner.ObjectResourceVersion(object)
267	if err != nil {
268		return nil, 0, err
269	}
270	return object, resourceVersion, nil
271}
272
273// processEvent is safe as long as there is at most one call to it in flight
274// at any point in time.
275func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, updateFunc func(*storeElement) error) error {
276	key, err := w.keyFunc(event.Object)
277	if err != nil {
278		return fmt.Errorf("couldn't compute key: %v", err)
279	}
280	elem := &storeElement{Key: key, Object: event.Object}
281	elem.Labels, elem.Fields, err = w.getAttrsFunc(event.Object)
282	if err != nil {
283		return err
284	}
285
286	wcEvent := &watchCacheEvent{
287		Type:            event.Type,
288		Object:          elem.Object,
289		ObjLabels:       elem.Labels,
290		ObjFields:       elem.Fields,
291		Key:             key,
292		ResourceVersion: resourceVersion,
293		RecordTime:      w.clock.Now(),
294	}
295
296	if err := func() error {
297		// TODO: We should consider moving this lock below after the watchCacheEvent
298		// is created. In such situation, the only problematic scenario is Replace(
299		// happening after getting object from store and before acquiring a lock.
300		// Maybe introduce another lock for this purpose.
301		w.Lock()
302		defer w.Unlock()
303
304		previous, exists, err := w.store.Get(elem)
305		if err != nil {
306			return err
307		}
308		if exists {
309			previousElem := previous.(*storeElement)
310			wcEvent.PrevObject = previousElem.Object
311			wcEvent.PrevObjLabels = previousElem.Labels
312			wcEvent.PrevObjFields = previousElem.Fields
313		}
314
315		w.updateCache(wcEvent)
316		w.resourceVersion = resourceVersion
317		defer w.cond.Broadcast()
318
319		return updateFunc(elem)
320	}(); err != nil {
321		return err
322	}
323
324	// Avoid calling event handler under lock.
325	// This is safe as long as there is at most one call to Add/Update/Delete and
326	// UpdateResourceVersion in flight at any point in time, which is true now,
327	// because reflector calls them synchronously from its main thread.
328	if w.eventHandler != nil {
329		w.eventHandler(wcEvent)
330	}
331	return nil
332}
333
334// Assumes that lock is already held for write.
335func (w *watchCache) updateCache(event *watchCacheEvent) {
336	w.resizeCacheLocked(event.RecordTime)
337	if w.isCacheFullLocked() {
338		// Cache is full - remove the oldest element.
339		w.startIndex++
340	}
341	w.cache[w.endIndex%w.capacity] = event
342	w.endIndex++
343}
344
345// resizeCacheLocked resizes the cache if necessary:
346// - increases capacity by 2x if cache is full and all cached events occurred within last eventFreshDuration.
347// - decreases capacity by 2x when recent quarter of events occurred outside of eventFreshDuration(protect watchCache from flapping).
348func (w *watchCache) resizeCacheLocked(eventTime time.Time) {
349	if w.isCacheFullLocked() && eventTime.Sub(w.cache[w.startIndex%w.capacity].RecordTime) < eventFreshDuration {
350		capacity := min(w.capacity*2, w.upperBoundCapacity)
351		if capacity > w.capacity {
352			w.doCacheResizeLocked(capacity)
353		}
354		return
355	}
356	if w.isCacheFullLocked() && eventTime.Sub(w.cache[(w.endIndex-w.capacity/4)%w.capacity].RecordTime) > eventFreshDuration {
357		capacity := max(w.capacity/2, w.lowerBoundCapacity)
358		if capacity < w.capacity {
359			w.doCacheResizeLocked(capacity)
360		}
361		return
362	}
363}
364
365// isCacheFullLocked used to judge whether watchCacheEvent is full.
366// Assumes that lock is already held for write.
367func (w *watchCache) isCacheFullLocked() bool {
368	return w.endIndex == w.startIndex+w.capacity
369}
370
371// doCacheResizeLocked resize watchCache's event array with different capacity.
372// Assumes that lock is already held for write.
373func (w *watchCache) doCacheResizeLocked(capacity int) {
374	newCache := make([]*watchCacheEvent, capacity)
375	if capacity < w.capacity {
376		// adjust startIndex if cache capacity shrink.
377		w.startIndex = w.endIndex - capacity
378	}
379	for i := w.startIndex; i < w.endIndex; i++ {
380		newCache[i%capacity] = w.cache[i%w.capacity]
381	}
382	w.cache = newCache
383	recordsWatchCacheCapacityChange(w.objectType.String(), w.capacity, capacity)
384	w.capacity = capacity
385}
386
387func (w *watchCache) UpdateResourceVersion(resourceVersion string) {
388	rv, err := w.versioner.ParseResourceVersion(resourceVersion)
389	if err != nil {
390		klog.Errorf("Couldn't parse resourceVersion: %v", err)
391		return
392	}
393
394	func() {
395		w.Lock()
396		defer w.Unlock()
397		w.resourceVersion = rv
398	}()
399
400	// Avoid calling event handler under lock.
401	// This is safe as long as there is at most one call to Add/Update/Delete and
402	// UpdateResourceVersion in flight at any point in time, which is true now,
403	// because reflector calls them synchronously from its main thread.
404	if w.eventHandler != nil {
405		wcEvent := &watchCacheEvent{
406			Type:            watch.Bookmark,
407			ResourceVersion: rv,
408		}
409		w.eventHandler(wcEvent)
410	}
411}
412
413// List returns list of pointers to <storeElement> objects.
414func (w *watchCache) List() []interface{} {
415	return w.store.List()
416}
417
418// waitUntilFreshAndBlock waits until cache is at least as fresh as given <resourceVersion>.
419// NOTE: This function acquired lock and doesn't release it.
420// You HAVE TO explicitly call w.RUnlock() after this function.
421func (w *watchCache) waitUntilFreshAndBlock(resourceVersion uint64, trace *utiltrace.Trace) error {
422	startTime := w.clock.Now()
423	go func() {
424		// Wake us up when the time limit has expired.  The docs
425		// promise that time.After (well, NewTimer, which it calls)
426		// will wait *at least* the duration given. Since this go
427		// routine starts sometime after we record the start time, and
428		// it will wake up the loop below sometime after the broadcast,
429		// we don't need to worry about waking it up before the time
430		// has expired accidentally.
431		<-w.clock.After(blockTimeout)
432		w.cond.Broadcast()
433	}()
434
435	w.RLock()
436	if trace != nil {
437		trace.Step("watchCache locked acquired")
438	}
439	for w.resourceVersion < resourceVersion {
440		if w.clock.Since(startTime) >= blockTimeout {
441			// Request that the client retry after 'resourceVersionTooHighRetrySeconds' seconds.
442			return storage.NewTooLargeResourceVersionError(resourceVersion, w.resourceVersion, resourceVersionTooHighRetrySeconds)
443		}
444		w.cond.Wait()
445	}
446	if trace != nil {
447		trace.Step("watchCache fresh enough")
448	}
449	return nil
450}
451
452// WaitUntilFreshAndList returns list of pointers to <storeElement> objects.
453func (w *watchCache) WaitUntilFreshAndList(resourceVersion uint64, matchValues []storage.MatchValue, trace *utiltrace.Trace) ([]interface{}, uint64, error) {
454	err := w.waitUntilFreshAndBlock(resourceVersion, trace)
455	defer w.RUnlock()
456	if err != nil {
457		return nil, 0, err
458	}
459
460	// This isn't the place where we do "final filtering" - only some "prefiltering" is happening here. So the only
461	// requirement here is to NOT miss anything that should be returned. We can return as many non-matching items as we
462	// want - they will be filtered out later. The fact that we return less things is only further performance improvement.
463	// TODO: if multiple indexes match, return the one with the fewest items, so as to do as much filtering as possible.
464	for _, matchValue := range matchValues {
465		if result, err := w.store.ByIndex(matchValue.IndexName, matchValue.Value); err == nil {
466			return result, w.resourceVersion, nil
467		}
468	}
469	return w.store.List(), w.resourceVersion, nil
470}
471
472// WaitUntilFreshAndGet returns a pointers to <storeElement> object.
473func (w *watchCache) WaitUntilFreshAndGet(resourceVersion uint64, key string, trace *utiltrace.Trace) (interface{}, bool, uint64, error) {
474	err := w.waitUntilFreshAndBlock(resourceVersion, trace)
475	defer w.RUnlock()
476	if err != nil {
477		return nil, false, 0, err
478	}
479	value, exists, err := w.store.GetByKey(key)
480	return value, exists, w.resourceVersion, err
481}
482
483func (w *watchCache) ListKeys() []string {
484	return w.store.ListKeys()
485}
486
487// Get takes runtime.Object as a parameter. However, it returns
488// pointer to <storeElement>.
489func (w *watchCache) Get(obj interface{}) (interface{}, bool, error) {
490	object, ok := obj.(runtime.Object)
491	if !ok {
492		return nil, false, fmt.Errorf("obj does not implement runtime.Object interface: %v", obj)
493	}
494	key, err := w.keyFunc(object)
495	if err != nil {
496		return nil, false, fmt.Errorf("couldn't compute key: %v", err)
497	}
498
499	return w.store.Get(&storeElement{Key: key, Object: object})
500}
501
502// GetByKey returns pointer to <storeElement>.
503func (w *watchCache) GetByKey(key string) (interface{}, bool, error) {
504	return w.store.GetByKey(key)
505}
506
507// Replace takes slice of runtime.Object as a parameter.
508func (w *watchCache) Replace(objs []interface{}, resourceVersion string) error {
509	version, err := w.versioner.ParseResourceVersion(resourceVersion)
510	if err != nil {
511		return err
512	}
513
514	toReplace := make([]interface{}, 0, len(objs))
515	for _, obj := range objs {
516		object, ok := obj.(runtime.Object)
517		if !ok {
518			return fmt.Errorf("didn't get runtime.Object for replace: %#v", obj)
519		}
520		key, err := w.keyFunc(object)
521		if err != nil {
522			return fmt.Errorf("couldn't compute key: %v", err)
523		}
524		objLabels, objFields, err := w.getAttrsFunc(object)
525		if err != nil {
526			return err
527		}
528		toReplace = append(toReplace, &storeElement{
529			Key:    key,
530			Object: object,
531			Labels: objLabels,
532			Fields: objFields,
533		})
534	}
535
536	w.Lock()
537	defer w.Unlock()
538
539	w.startIndex = 0
540	w.endIndex = 0
541	if err := w.store.Replace(toReplace, resourceVersion); err != nil {
542		return err
543	}
544	w.listResourceVersion = version
545	w.resourceVersion = version
546	if w.onReplace != nil {
547		w.onReplace()
548	}
549	w.cond.Broadcast()
550	klog.V(3).Infof("Replace watchCache (rev: %v) ", resourceVersion)
551	return nil
552}
553
554func (w *watchCache) SetOnReplace(onReplace func()) {
555	w.Lock()
556	defer w.Unlock()
557	w.onReplace = onReplace
558}
559
560func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]*watchCacheEvent, error) {
561	size := w.endIndex - w.startIndex
562	var oldest uint64
563	switch {
564	case w.listResourceVersion > 0 && w.startIndex == 0:
565		// If no event was removed from the buffer since last relist, the oldest watch
566		// event we can deliver is one greater than the resource version of the list.
567		oldest = w.listResourceVersion + 1
568	case size > 0:
569		// If the previous condition is not satisfied: either some event was already
570		// removed from the buffer or we've never completed a list (the latter can
571		// only happen in unit tests that populate the buffer without performing
572		// list/replace operations), the oldest watch event we can deliver is the first
573		// one in the buffer.
574		oldest = w.cache[w.startIndex%w.capacity].ResourceVersion
575	default:
576		return nil, fmt.Errorf("watch cache isn't correctly initialized")
577	}
578
579	if resourceVersion == 0 {
580		// resourceVersion = 0 means that we don't require any specific starting point
581		// and we would like to start watching from ~now.
582		// However, to keep backward compatibility, we additionally need to return the
583		// current state and only then start watching from that point.
584		//
585		// TODO: In v2 api, we should stop returning the current state - #13969.
586		allItems := w.store.List()
587		result := make([]*watchCacheEvent, len(allItems))
588		for i, item := range allItems {
589			elem, ok := item.(*storeElement)
590			if !ok {
591				return nil, fmt.Errorf("not a storeElement: %v", elem)
592			}
593			objLabels, objFields, err := w.getAttrsFunc(elem.Object)
594			if err != nil {
595				return nil, err
596			}
597			result[i] = &watchCacheEvent{
598				Type:            watch.Added,
599				Object:          elem.Object,
600				ObjLabels:       objLabels,
601				ObjFields:       objFields,
602				Key:             elem.Key,
603				ResourceVersion: w.resourceVersion,
604			}
605		}
606		return result, nil
607	}
608	if resourceVersion < oldest-1 {
609		return nil, errors.NewResourceExpired(fmt.Sprintf("too old resource version: %d (%d)", resourceVersion, oldest-1))
610	}
611
612	// Binary search the smallest index at which resourceVersion is greater than the given one.
613	f := func(i int) bool {
614		return w.cache[(w.startIndex+i)%w.capacity].ResourceVersion > resourceVersion
615	}
616	first := sort.Search(size, f)
617	result := make([]*watchCacheEvent, size-first)
618	for i := 0; i < size-first; i++ {
619		result[i] = w.cache[(w.startIndex+first+i)%w.capacity]
620	}
621	return result, nil
622}
623
624func (w *watchCache) GetAllEventsSince(resourceVersion uint64) ([]*watchCacheEvent, error) {
625	w.RLock()
626	defer w.RUnlock()
627	return w.GetAllEventsSinceThreadUnsafe(resourceVersion)
628}
629
630func (w *watchCache) Resync() error {
631	// Nothing to do
632	return nil
633}
634