1/*
2Copyright 2014 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package cache
18
19import (
20	"errors"
21	"fmt"
22	"sync"
23
24	"k8s.io/apimachinery/pkg/util/sets"
25
26	"k8s.io/klog/v2"
27)
28
29// DeltaFIFOOptions is the configuration parameters for DeltaFIFO. All are
30// optional.
31type DeltaFIFOOptions struct {
32
33	// KeyFunction is used to figure out what key an object should have. (It's
34	// exposed in the returned DeltaFIFO's KeyOf() method, with additional
35	// handling around deleted objects and queue state).
36	// Optional, the default is MetaNamespaceKeyFunc.
37	KeyFunction KeyFunc
38
39	// KnownObjects is expected to return a list of keys that the consumer of
40	// this queue "knows about". It is used to decide which items are missing
41	// when Replace() is called; 'Deleted' deltas are produced for the missing items.
42	// KnownObjects may be nil if you can tolerate missing deletions on Replace().
43	KnownObjects KeyListerGetter
44
45	// EmitDeltaTypeReplaced indicates that the queue consumer
46	// understands the Replaced DeltaType. Before the `Replaced` event type was
47	// added, calls to Replace() were handled the same as Sync(). For
48	// backwards-compatibility purposes, this is false by default.
49	// When true, `Replaced` events will be sent for items passed to a Replace() call.
50	// When false, `Sync` events will be sent instead.
51	EmitDeltaTypeReplaced bool
52}
53
54// DeltaFIFO is like FIFO, but differs in two ways.  One is that the
55// accumulator associated with a given object's key is not that object
56// but rather a Deltas, which is a slice of Delta values for that
57// object.  Applying an object to a Deltas means to append a Delta
58// except when the potentially appended Delta is a Deleted and the
59// Deltas already ends with a Deleted.  In that case the Deltas does
60// not grow, although the terminal Deleted will be replaced by the new
61// Deleted if the older Deleted's object is a
62// DeletedFinalStateUnknown.
63//
64// The other difference is that DeltaFIFO has two additional ways that
65// an object can be applied to an accumulator: Replaced and Sync.
66// If EmitDeltaTypeReplaced is not set to true, Sync will be used in
67// replace events for backwards compatibility.  Sync is used for periodic
68// resync events.
69//
70// DeltaFIFO is a producer-consumer queue, where a Reflector is
71// intended to be the producer, and the consumer is whatever calls
72// the Pop() method.
73//
74// DeltaFIFO solves this use case:
75//  * You want to process every object change (delta) at most once.
76//  * When you process an object, you want to see everything
77//    that's happened to it since you last processed it.
78//  * You want to process the deletion of some of the objects.
79//  * You might want to periodically reprocess objects.
80//
81// DeltaFIFO's Pop(), Get(), and GetByKey() methods return
82// interface{} to satisfy the Store/Queue interfaces, but they
83// will always return an object of type Deltas. List() returns
84// the newest object from each accumulator in the FIFO.
85//
86// A DeltaFIFO's knownObjects KeyListerGetter provides the abilities
87// to list Store keys and to get objects by Store key.  The objects in
88// question are called "known objects" and this set of objects
89// modifies the behavior of the Delete, Replace, and Resync methods
90// (each in a different way).
91//
92// A note on threading: If you call Pop() in parallel from multiple
93// threads, you could end up with multiple threads processing slightly
94// different versions of the same object.
95type DeltaFIFO struct {
96	// lock/cond protects access to 'items' and 'queue'.
97	lock sync.RWMutex
98	cond sync.Cond
99
100	// `items` maps a key to a Deltas.
101	// Each such Deltas has at least one Delta.
102	items map[string]Deltas
103
104	// `queue` maintains FIFO order of keys for consumption in Pop().
105	// There are no duplicates in `queue`.
106	// A key is in `queue` if and only if it is in `items`.
107	queue []string
108
109	// populated is true if the first batch of items inserted by Replace() has been populated
110	// or Delete/Add/Update/AddIfNotPresent was called first.
111	populated bool
112	// initialPopulationCount is the number of items inserted by the first call of Replace()
113	initialPopulationCount int
114
115	// keyFunc is used to make the key used for queued item
116	// insertion and retrieval, and should be deterministic.
117	keyFunc KeyFunc
118
119	// knownObjects list keys that are "known" --- affecting Delete(),
120	// Replace(), and Resync()
121	knownObjects KeyListerGetter
122
123	// Used to indicate a queue is closed so a control loop can exit when a queue is empty.
124	// Currently, not used to gate any of CRED operations.
125	closed bool
126
127	// emitDeltaTypeReplaced is whether to emit the Replaced or Sync
128	// DeltaType when Replace() is called (to preserve backwards compat).
129	emitDeltaTypeReplaced bool
130}
131
132// DeltaType is the type of a change (addition, deletion, etc)
133type DeltaType string
134
135// Change type definition
136const (
137	Added   DeltaType = "Added"
138	Updated DeltaType = "Updated"
139	Deleted DeltaType = "Deleted"
140	// Replaced is emitted when we encountered watch errors and had to do a
141	// relist. We don't know if the replaced object has changed.
142	//
143	// NOTE: Previous versions of DeltaFIFO would use Sync for Replace events
144	// as well. Hence, Replaced is only emitted when the option
145	// EmitDeltaTypeReplaced is true.
146	Replaced DeltaType = "Replaced"
147	// Sync is for synthetic events during a periodic resync.
148	Sync DeltaType = "Sync"
149)
150
151// Delta is a member of Deltas (a list of Delta objects) which
152// in its turn is the type stored by a DeltaFIFO. It tells you what
153// change happened, and the object's state after* that change.
154//
155// [*] Unless the change is a deletion, and then you'll get the final
156// state of the object before it was deleted.
157type Delta struct {
158	Type   DeltaType
159	Object interface{}
160}
161
162// Deltas is a list of one or more 'Delta's to an individual object.
163// The oldest delta is at index 0, the newest delta is the last one.
164type Deltas []Delta
165
166// NewDeltaFIFO returns a Queue which can be used to process changes to items.
167//
168// keyFunc is used to figure out what key an object should have. (It is
169// exposed in the returned DeltaFIFO's KeyOf() method, with additional handling
170// around deleted objects and queue state).
171//
172// 'knownObjects' may be supplied to modify the behavior of Delete,
173// Replace, and Resync.  It may be nil if you do not need those
174// modifications.
175//
176// TODO: consider merging keyLister with this object, tracking a list of
177// "known" keys when Pop() is called. Have to think about how that
178// affects error retrying.
179//
180//       NOTE: It is possible to misuse this and cause a race when using an
181//       external known object source.
182//       Whether there is a potential race depends on how the consumer
183//       modifies knownObjects. In Pop(), process function is called under
184//       lock, so it is safe to update data structures in it that need to be
185//       in sync with the queue (e.g. knownObjects).
186//
187//       Example:
188//       In case of sharedIndexInformer being a consumer
189//       (https://github.com/kubernetes/kubernetes/blob/0cdd940f/staging/src/k8s.io/client-go/tools/cache/shared_informer.go#L192),
190//       there is no race as knownObjects (s.indexer) is modified safely
191//       under DeltaFIFO's lock. The only exceptions are GetStore() and
192//       GetIndexer() methods, which expose ways to modify the underlying
193//       storage. Currently these two methods are used for creating Lister
194//       and internal tests.
195//
196// Also see the comment on DeltaFIFO.
197//
198// Warning: This constructs a DeltaFIFO that does not differentiate between
199// events caused by a call to Replace (e.g., from a relist, which may
200// contain object updates), and synthetic events caused by a periodic resync
201// (which just emit the existing object). See https://issue.k8s.io/86015 for details.
202//
203// Use `NewDeltaFIFOWithOptions(DeltaFIFOOptions{..., EmitDeltaTypeReplaced: true})`
204// instead to receive a `Replaced` event depending on the type.
205//
206// Deprecated: Equivalent to NewDeltaFIFOWithOptions(DeltaFIFOOptions{KeyFunction: keyFunc, KnownObjects: knownObjects})
207func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO {
208	return NewDeltaFIFOWithOptions(DeltaFIFOOptions{
209		KeyFunction:  keyFunc,
210		KnownObjects: knownObjects,
211	})
212}
213
214// NewDeltaFIFOWithOptions returns a Queue which can be used to process changes to
215// items. See also the comment on DeltaFIFO.
216func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
217	if opts.KeyFunction == nil {
218		opts.KeyFunction = MetaNamespaceKeyFunc
219	}
220
221	f := &DeltaFIFO{
222		items:        map[string]Deltas{},
223		queue:        []string{},
224		keyFunc:      opts.KeyFunction,
225		knownObjects: opts.KnownObjects,
226
227		emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced,
228	}
229	f.cond.L = &f.lock
230	return f
231}
232
233var (
234	_ = Queue(&DeltaFIFO{}) // DeltaFIFO is a Queue
235)
236
237var (
238	// ErrZeroLengthDeltasObject is returned in a KeyError if a Deltas
239	// object with zero length is encountered (should be impossible,
240	// but included for completeness).
241	ErrZeroLengthDeltasObject = errors.New("0 length Deltas object; can't get key")
242)
243
244// Close the queue.
245func (f *DeltaFIFO) Close() {
246	f.lock.Lock()
247	defer f.lock.Unlock()
248	f.closed = true
249	f.cond.Broadcast()
250}
251
252// KeyOf exposes f's keyFunc, but also detects the key of a Deltas object or
253// DeletedFinalStateUnknown objects.
254func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error) {
255	if d, ok := obj.(Deltas); ok {
256		if len(d) == 0 {
257			return "", KeyError{obj, ErrZeroLengthDeltasObject}
258		}
259		obj = d.Newest().Object
260	}
261	if d, ok := obj.(DeletedFinalStateUnknown); ok {
262		return d.Key, nil
263	}
264	return f.keyFunc(obj)
265}
266
267// HasSynced returns true if an Add/Update/Delete/AddIfNotPresent are called first,
268// or the first batch of items inserted by Replace() has been popped.
269func (f *DeltaFIFO) HasSynced() bool {
270	f.lock.Lock()
271	defer f.lock.Unlock()
272	return f.populated && f.initialPopulationCount == 0
273}
274
275// Add inserts an item, and puts it in the queue. The item is only enqueued
276// if it doesn't already exist in the set.
277func (f *DeltaFIFO) Add(obj interface{}) error {
278	f.lock.Lock()
279	defer f.lock.Unlock()
280	f.populated = true
281	return f.queueActionLocked(Added, obj)
282}
283
284// Update is just like Add, but makes an Updated Delta.
285func (f *DeltaFIFO) Update(obj interface{}) error {
286	f.lock.Lock()
287	defer f.lock.Unlock()
288	f.populated = true
289	return f.queueActionLocked(Updated, obj)
290}
291
292// Delete is just like Add, but makes a Deleted Delta. If the given
293// object does not already exist, it will be ignored. (It may have
294// already been deleted by a Replace (re-list), for example.)  In this
295// method `f.knownObjects`, if not nil, provides (via GetByKey)
296// _additional_ objects that are considered to already exist.
297func (f *DeltaFIFO) Delete(obj interface{}) error {
298	id, err := f.KeyOf(obj)
299	if err != nil {
300		return KeyError{obj, err}
301	}
302	f.lock.Lock()
303	defer f.lock.Unlock()
304	f.populated = true
305	if f.knownObjects == nil {
306		if _, exists := f.items[id]; !exists {
307			// Presumably, this was deleted when a relist happened.
308			// Don't provide a second report of the same deletion.
309			return nil
310		}
311	} else {
312		// We only want to skip the "deletion" action if the object doesn't
313		// exist in knownObjects and it doesn't have corresponding item in items.
314		// Note that even if there is a "deletion" action in items, we can ignore it,
315		// because it will be deduped automatically in "queueActionLocked"
316		_, exists, err := f.knownObjects.GetByKey(id)
317		_, itemsExist := f.items[id]
318		if err == nil && !exists && !itemsExist {
319			// Presumably, this was deleted when a relist happened.
320			// Don't provide a second report of the same deletion.
321			return nil
322		}
323	}
324
325	// exist in items and/or KnownObjects
326	return f.queueActionLocked(Deleted, obj)
327}
328
329// AddIfNotPresent inserts an item, and puts it in the queue. If the item is already
330// present in the set, it is neither enqueued nor added to the set.
331//
332// This is useful in a single producer/consumer scenario so that the consumer can
333// safely retry items without contending with the producer and potentially enqueueing
334// stale items.
335//
336// Important: obj must be a Deltas (the output of the Pop() function). Yes, this is
337// different from the Add/Update/Delete functions.
338func (f *DeltaFIFO) AddIfNotPresent(obj interface{}) error {
339	deltas, ok := obj.(Deltas)
340	if !ok {
341		return fmt.Errorf("object must be of type deltas, but got: %#v", obj)
342	}
343	id, err := f.KeyOf(deltas)
344	if err != nil {
345		return KeyError{obj, err}
346	}
347	f.lock.Lock()
348	defer f.lock.Unlock()
349	f.addIfNotPresent(id, deltas)
350	return nil
351}
352
353// addIfNotPresent inserts deltas under id if it does not exist, and assumes the caller
354// already holds the fifo lock.
355func (f *DeltaFIFO) addIfNotPresent(id string, deltas Deltas) {
356	f.populated = true
357	if _, exists := f.items[id]; exists {
358		return
359	}
360
361	f.queue = append(f.queue, id)
362	f.items[id] = deltas
363	f.cond.Broadcast()
364}
365
366// re-listing and watching can deliver the same update multiple times in any
367// order. This will combine the most recent two deltas if they are the same.
368func dedupDeltas(deltas Deltas) Deltas {
369	n := len(deltas)
370	if n < 2 {
371		return deltas
372	}
373	a := &deltas[n-1]
374	b := &deltas[n-2]
375	if out := isDup(a, b); out != nil {
376		deltas[n-2] = *out
377		return deltas[:n-1]
378	}
379	return deltas
380}
381
382// If a & b represent the same event, returns the delta that ought to be kept.
383// Otherwise, returns nil.
384// TODO: is there anything other than deletions that need deduping?
385func isDup(a, b *Delta) *Delta {
386	if out := isDeletionDup(a, b); out != nil {
387		return out
388	}
389	// TODO: Detect other duplicate situations? Are there any?
390	return nil
391}
392
393// keep the one with the most information if both are deletions.
394func isDeletionDup(a, b *Delta) *Delta {
395	if b.Type != Deleted || a.Type != Deleted {
396		return nil
397	}
398	// Do more sophisticated checks, or is this sufficient?
399	if _, ok := b.Object.(DeletedFinalStateUnknown); ok {
400		return a
401	}
402	return b
403}
404
405// queueActionLocked appends to the delta list for the object.
406// Caller must lock first.
407func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
408	id, err := f.KeyOf(obj)
409	if err != nil {
410		return KeyError{obj, err}
411	}
412	oldDeltas := f.items[id]
413	newDeltas := append(oldDeltas, Delta{actionType, obj})
414	newDeltas = dedupDeltas(newDeltas)
415
416	if len(newDeltas) > 0 {
417		if _, exists := f.items[id]; !exists {
418			f.queue = append(f.queue, id)
419		}
420		f.items[id] = newDeltas
421		f.cond.Broadcast()
422	} else {
423		// This never happens, because dedupDeltas never returns an empty list
424		// when given a non-empty list (as it is here).
425		// If somehow it happens anyway, deal with it but complain.
426		if oldDeltas == nil {
427			klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; ignoring", id, oldDeltas, obj)
428			return nil
429		}
430		klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; breaking invariant by storing empty Deltas", id, oldDeltas, obj)
431		f.items[id] = newDeltas
432		return fmt.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; broke DeltaFIFO invariant by storing empty Deltas", id, oldDeltas, obj)
433	}
434	return nil
435}
436
437// List returns a list of all the items; it returns the object
438// from the most recent Delta.
439// You should treat the items returned inside the deltas as immutable.
440func (f *DeltaFIFO) List() []interface{} {
441	f.lock.RLock()
442	defer f.lock.RUnlock()
443	return f.listLocked()
444}
445
446func (f *DeltaFIFO) listLocked() []interface{} {
447	list := make([]interface{}, 0, len(f.items))
448	for _, item := range f.items {
449		list = append(list, item.Newest().Object)
450	}
451	return list
452}
453
454// ListKeys returns a list of all the keys of the objects currently
455// in the FIFO.
456func (f *DeltaFIFO) ListKeys() []string {
457	f.lock.RLock()
458	defer f.lock.RUnlock()
459	list := make([]string, 0, len(f.items))
460	for key := range f.items {
461		list = append(list, key)
462	}
463	return list
464}
465
466// Get returns the complete list of deltas for the requested item,
467// or sets exists=false.
468// You should treat the items returned inside the deltas as immutable.
469func (f *DeltaFIFO) Get(obj interface{}) (item interface{}, exists bool, err error) {
470	key, err := f.KeyOf(obj)
471	if err != nil {
472		return nil, false, KeyError{obj, err}
473	}
474	return f.GetByKey(key)
475}
476
477// GetByKey returns the complete list of deltas for the requested item,
478// setting exists=false if that list is empty.
479// You should treat the items returned inside the deltas as immutable.
480func (f *DeltaFIFO) GetByKey(key string) (item interface{}, exists bool, err error) {
481	f.lock.RLock()
482	defer f.lock.RUnlock()
483	d, exists := f.items[key]
484	if exists {
485		// Copy item's slice so operations on this slice
486		// won't interfere with the object we return.
487		d = copyDeltas(d)
488	}
489	return d, exists, nil
490}
491
492// IsClosed checks if the queue is closed
493func (f *DeltaFIFO) IsClosed() bool {
494	f.lock.Lock()
495	defer f.lock.Unlock()
496	return f.closed
497}
498
499// Pop blocks until the queue has some items, and then returns one.  If
500// multiple items are ready, they are returned in the order in which they were
501// added/updated. The item is removed from the queue (and the store) before it
502// is returned, so if you don't successfully process it, you need to add it back
503// with AddIfNotPresent().
504// process function is called under lock, so it is safe to update data structures
505// in it that need to be in sync with the queue (e.g. knownKeys). The PopProcessFunc
506// may return an instance of ErrRequeue with a nested error to indicate the current
507// item should be requeued (equivalent to calling AddIfNotPresent under the lock).
508// process should avoid expensive I/O operation so that other queue operations, i.e.
509// Add() and Get(), won't be blocked for too long.
510//
511// Pop returns a 'Deltas', which has a complete list of all the things
512// that happened to the object (deltas) while it was sitting in the queue.
513func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
514	f.lock.Lock()
515	defer f.lock.Unlock()
516	for {
517		for len(f.queue) == 0 {
518			// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
519			// When Close() is called, the f.closed is set and the condition is broadcasted.
520			// Which causes this loop to continue and return from the Pop().
521			if f.closed {
522				return nil, ErrFIFOClosed
523			}
524
525			f.cond.Wait()
526		}
527		id := f.queue[0]
528		f.queue = f.queue[1:]
529		if f.initialPopulationCount > 0 {
530			f.initialPopulationCount--
531		}
532		item, ok := f.items[id]
533		if !ok {
534			// This should never happen
535			klog.Errorf("Inconceivable! %q was in f.queue but not f.items; ignoring.", id)
536			continue
537		}
538		delete(f.items, id)
539		err := process(item)
540		if e, ok := err.(ErrRequeue); ok {
541			f.addIfNotPresent(id, item)
542			err = e.Err
543		}
544		// Don't need to copyDeltas here, because we're transferring
545		// ownership to the caller.
546		return item, err
547	}
548}
549
550// Replace atomically does two things: (1) it adds the given objects
551// using the Sync or Replace DeltaType and then (2) it does some deletions.
552// In particular: for every pre-existing key K that is not the key of
553// an object in `list` there is the effect of
554// `Delete(DeletedFinalStateUnknown{K, O})` where O is current object
555// of K.  If `f.knownObjects == nil` then the pre-existing keys are
556// those in `f.items` and the current object of K is the `.Newest()`
557// of the Deltas associated with K.  Otherwise the pre-existing keys
558// are those listed by `f.knownObjects` and the current object of K is
559// what `f.knownObjects.GetByKey(K)` returns.
560func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
561	f.lock.Lock()
562	defer f.lock.Unlock()
563	keys := make(sets.String, len(list))
564
565	// keep backwards compat for old clients
566	action := Sync
567	if f.emitDeltaTypeReplaced {
568		action = Replaced
569	}
570
571	// Add Sync/Replaced action for each new item.
572	for _, item := range list {
573		key, err := f.KeyOf(item)
574		if err != nil {
575			return KeyError{item, err}
576		}
577		keys.Insert(key)
578		if err := f.queueActionLocked(action, item); err != nil {
579			return fmt.Errorf("couldn't enqueue object: %v", err)
580		}
581	}
582
583	if f.knownObjects == nil {
584		// Do deletion detection against our own list.
585		queuedDeletions := 0
586		for k, oldItem := range f.items {
587			if keys.Has(k) {
588				continue
589			}
590			// Delete pre-existing items not in the new list.
591			// This could happen if watch deletion event was missed while
592			// disconnected from apiserver.
593			var deletedObj interface{}
594			if n := oldItem.Newest(); n != nil {
595				deletedObj = n.Object
596			}
597			queuedDeletions++
598			if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
599				return err
600			}
601		}
602
603		if !f.populated {
604			f.populated = true
605			// While there shouldn't be any queued deletions in the initial
606			// population of the queue, it's better to be on the safe side.
607			f.initialPopulationCount = keys.Len() + queuedDeletions
608		}
609
610		return nil
611	}
612
613	// Detect deletions not already in the queue.
614	knownKeys := f.knownObjects.ListKeys()
615	queuedDeletions := 0
616	for _, k := range knownKeys {
617		if keys.Has(k) {
618			continue
619		}
620
621		deletedObj, exists, err := f.knownObjects.GetByKey(k)
622		if err != nil {
623			deletedObj = nil
624			klog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k)
625		} else if !exists {
626			deletedObj = nil
627			klog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)
628		}
629		queuedDeletions++
630		if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
631			return err
632		}
633	}
634
635	if !f.populated {
636		f.populated = true
637		f.initialPopulationCount = keys.Len() + queuedDeletions
638	}
639
640	return nil
641}
642
643// Resync adds, with a Sync type of Delta, every object listed by
644// `f.knownObjects` whose key is not already queued for processing.
645// If `f.knownObjects` is `nil` then Resync does nothing.
646func (f *DeltaFIFO) Resync() error {
647	f.lock.Lock()
648	defer f.lock.Unlock()
649
650	if f.knownObjects == nil {
651		return nil
652	}
653
654	keys := f.knownObjects.ListKeys()
655	for _, k := range keys {
656		if err := f.syncKeyLocked(k); err != nil {
657			return err
658		}
659	}
660	return nil
661}
662
663func (f *DeltaFIFO) syncKeyLocked(key string) error {
664	obj, exists, err := f.knownObjects.GetByKey(key)
665	if err != nil {
666		klog.Errorf("Unexpected error %v during lookup of key %v, unable to queue object for sync", err, key)
667		return nil
668	} else if !exists {
669		klog.Infof("Key %v does not exist in known objects store, unable to queue object for sync", key)
670		return nil
671	}
672
673	// If we are doing Resync() and there is already an event queued for that object,
674	// we ignore the Resync for it. This is to avoid the race, in which the resync
675	// comes with the previous value of object (since queueing an event for the object
676	// doesn't trigger changing the underlying store <knownObjects>.
677	id, err := f.KeyOf(obj)
678	if err != nil {
679		return KeyError{obj, err}
680	}
681	if len(f.items[id]) > 0 {
682		return nil
683	}
684
685	if err := f.queueActionLocked(Sync, obj); err != nil {
686		return fmt.Errorf("couldn't queue object: %v", err)
687	}
688	return nil
689}
690
691// A KeyListerGetter is anything that knows how to list its keys and look up by key.
692type KeyListerGetter interface {
693	KeyLister
694	KeyGetter
695}
696
697// A KeyLister is anything that knows how to list its keys.
698type KeyLister interface {
699	ListKeys() []string
700}
701
702// A KeyGetter is anything that knows how to get the value stored under a given key.
703type KeyGetter interface {
704	// GetByKey returns the value associated with the key, or sets exists=false.
705	GetByKey(key string) (value interface{}, exists bool, err error)
706}
707
708// Oldest is a convenience function that returns the oldest delta, or
709// nil if there are no deltas.
710func (d Deltas) Oldest() *Delta {
711	if len(d) > 0 {
712		return &d[0]
713	}
714	return nil
715}
716
717// Newest is a convenience function that returns the newest delta, or
718// nil if there are no deltas.
719func (d Deltas) Newest() *Delta {
720	if n := len(d); n > 0 {
721		return &d[n-1]
722	}
723	return nil
724}
725
726// copyDeltas returns a shallow copy of d; that is, it copies the slice but not
727// the objects in the slice. This allows Get/List to return an object that we
728// know won't be clobbered by a subsequent modifications.
729func copyDeltas(d Deltas) Deltas {
730	d2 := make(Deltas, len(d))
731	copy(d2, d)
732	return d2
733}
734
735// DeletedFinalStateUnknown is placed into a DeltaFIFO in the case where an object
736// was deleted but the watch deletion event was missed while disconnected from
737// apiserver. In this case we don't know the final "resting" state of the object, so
738// there's a chance the included `Obj` is stale.
739type DeletedFinalStateUnknown struct {
740	Key string
741	Obj interface{}
742}
743