1/*
2Copyright 2014 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package cache
18
19import (
20	"errors"
21	"fmt"
22	"sync"
23
24	"k8s.io/apimachinery/pkg/util/sets"
25
26	"k8s.io/klog"
27)
28
29// NewDeltaFIFO returns a Store which can be used process changes to items.
30//
31// keyFunc is used to figure out what key an object should have. (It's
32// exposed in the returned DeltaFIFO's KeyOf() method, with bonus features.)
33//
34// 'keyLister' is expected to return a list of keys that the consumer of
35// this queue "knows about". It is used to decide which items are missing
36// when Replace() is called; 'Deleted' deltas are produced for these items.
37// It may be nil if you don't need to detect all deletions.
38// TODO: consider merging keyLister with this object, tracking a list of
39//       "known" keys when Pop() is called. Have to think about how that
40//       affects error retrying.
41// NOTE: It is possible to misuse this and cause a race when using an
42//       external known object source.
43//       Whether there is a potential race depends on how the comsumer
44//       modifies knownObjects. In Pop(), process function is called under
45//       lock, so it is safe to update data structures in it that need to be
46//       in sync with the queue (e.g. knownObjects).
47//
48//       Example:
49//       In case of sharedIndexInformer being a consumer
50//       (https://github.com/kubernetes/kubernetes/blob/0cdd940f/staging/
51//       src/k8s.io/client-go/tools/cache/shared_informer.go#L192),
52//       there is no race as knownObjects (s.indexer) is modified safely
53//       under DeltaFIFO's lock. The only exceptions are GetStore() and
54//       GetIndexer() methods, which expose ways to modify the underlying
55//       storage. Currently these two methods are used for creating Lister
56//       and internal tests.
57//
58// Also see the comment on DeltaFIFO.
59func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO {
60	f := &DeltaFIFO{
61		items:        map[string]Deltas{},
62		queue:        []string{},
63		keyFunc:      keyFunc,
64		knownObjects: knownObjects,
65	}
66	f.cond.L = &f.lock
67	return f
68}
69
70// DeltaFIFO is like FIFO, but allows you to process deletes.
71//
72// DeltaFIFO is a producer-consumer queue, where a Reflector is
73// intended to be the producer, and the consumer is whatever calls
74// the Pop() method.
75//
76// DeltaFIFO solves this use case:
77//  * You want to process every object change (delta) at most once.
78//  * When you process an object, you want to see everything
79//    that's happened to it since you last processed it.
80//  * You want to process the deletion of objects.
81//  * You might want to periodically reprocess objects.
82//
83// DeltaFIFO's Pop(), Get(), and GetByKey() methods return
84// interface{} to satisfy the Store/Queue interfaces, but it
85// will always return an object of type Deltas.
86//
87// A note on threading: If you call Pop() in parallel from multiple
88// threads, you could end up with multiple threads processing slightly
89// different versions of the same object.
90//
91// A note on the KeyLister used by the DeltaFIFO: It's main purpose is
92// to list keys that are "known", for the purpose of figuring out which
93// items have been deleted when Replace() or Delete() are called. The deleted
94// object will be included in the DeleteFinalStateUnknown markers. These objects
95// could be stale.
96type DeltaFIFO struct {
97	// lock/cond protects access to 'items' and 'queue'.
98	lock sync.RWMutex
99	cond sync.Cond
100
101	// We depend on the property that items in the set are in
102	// the queue and vice versa, and that all Deltas in this
103	// map have at least one Delta.
104	items map[string]Deltas
105	queue []string
106
107	// populated is true if the first batch of items inserted by Replace() has been populated
108	// or Delete/Add/Update was called first.
109	populated bool
110	// initialPopulationCount is the number of items inserted by the first call of Replace()
111	initialPopulationCount int
112
113	// keyFunc is used to make the key used for queued item
114	// insertion and retrieval, and should be deterministic.
115	keyFunc KeyFunc
116
117	// knownObjects list keys that are "known", for the
118	// purpose of figuring out which items have been deleted
119	// when Replace() or Delete() is called.
120	knownObjects KeyListerGetter
121
122	// Indication the queue is closed.
123	// Used to indicate a queue is closed so a control loop can exit when a queue is empty.
124	// Currently, not used to gate any of CRED operations.
125	closed     bool
126	closedLock sync.Mutex
127}
128
129var (
130	_ = Queue(&DeltaFIFO{}) // DeltaFIFO is a Queue
131)
132
133var (
134	// ErrZeroLengthDeltasObject is returned in a KeyError if a Deltas
135	// object with zero length is encountered (should be impossible,
136	// but included for completeness).
137	ErrZeroLengthDeltasObject = errors.New("0 length Deltas object; can't get key")
138)
139
140// Close the queue.
141func (f *DeltaFIFO) Close() {
142	f.closedLock.Lock()
143	defer f.closedLock.Unlock()
144	f.closed = true
145	f.cond.Broadcast()
146}
147
148// KeyOf exposes f's keyFunc, but also detects the key of a Deltas object or
149// DeletedFinalStateUnknown objects.
150func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error) {
151	if d, ok := obj.(Deltas); ok {
152		if len(d) == 0 {
153			return "", KeyError{obj, ErrZeroLengthDeltasObject}
154		}
155		obj = d.Newest().Object
156	}
157	if d, ok := obj.(DeletedFinalStateUnknown); ok {
158		return d.Key, nil
159	}
160	return f.keyFunc(obj)
161}
162
163// Return true if an Add/Update/Delete/AddIfNotPresent are called first,
164// or an Update called first but the first batch of items inserted by Replace() has been popped
165func (f *DeltaFIFO) HasSynced() bool {
166	f.lock.Lock()
167	defer f.lock.Unlock()
168	return f.populated && f.initialPopulationCount == 0
169}
170
171// Add inserts an item, and puts it in the queue. The item is only enqueued
172// if it doesn't already exist in the set.
173func (f *DeltaFIFO) Add(obj interface{}) error {
174	f.lock.Lock()
175	defer f.lock.Unlock()
176	f.populated = true
177	return f.queueActionLocked(Added, obj)
178}
179
180// Update is just like Add, but makes an Updated Delta.
181func (f *DeltaFIFO) Update(obj interface{}) error {
182	f.lock.Lock()
183	defer f.lock.Unlock()
184	f.populated = true
185	return f.queueActionLocked(Updated, obj)
186}
187
188// Delete is just like Add, but makes an Deleted Delta. If the item does not
189// already exist, it will be ignored. (It may have already been deleted by a
190// Replace (re-list), for example.
191func (f *DeltaFIFO) Delete(obj interface{}) error {
192	id, err := f.KeyOf(obj)
193	if err != nil {
194		return KeyError{obj, err}
195	}
196	f.lock.Lock()
197	defer f.lock.Unlock()
198	f.populated = true
199	if f.knownObjects == nil {
200		if _, exists := f.items[id]; !exists {
201			// Presumably, this was deleted when a relist happened.
202			// Don't provide a second report of the same deletion.
203			return nil
204		}
205	} else {
206		// We only want to skip the "deletion" action if the object doesn't
207		// exist in knownObjects and it doesn't have corresponding item in items.
208		// Note that even if there is a "deletion" action in items, we can ignore it,
209		// because it will be deduped automatically in "queueActionLocked"
210		_, exists, err := f.knownObjects.GetByKey(id)
211		_, itemsExist := f.items[id]
212		if err == nil && !exists && !itemsExist {
213			// Presumably, this was deleted when a relist happened.
214			// Don't provide a second report of the same deletion.
215			return nil
216		}
217	}
218
219	return f.queueActionLocked(Deleted, obj)
220}
221
222// AddIfNotPresent inserts an item, and puts it in the queue. If the item is already
223// present in the set, it is neither enqueued nor added to the set.
224//
225// This is useful in a single producer/consumer scenario so that the consumer can
226// safely retry items without contending with the producer and potentially enqueueing
227// stale items.
228//
229// Important: obj must be a Deltas (the output of the Pop() function). Yes, this is
230// different from the Add/Update/Delete functions.
231func (f *DeltaFIFO) AddIfNotPresent(obj interface{}) error {
232	deltas, ok := obj.(Deltas)
233	if !ok {
234		return fmt.Errorf("object must be of type deltas, but got: %#v", obj)
235	}
236	id, err := f.KeyOf(deltas.Newest().Object)
237	if err != nil {
238		return KeyError{obj, err}
239	}
240	f.lock.Lock()
241	defer f.lock.Unlock()
242	f.addIfNotPresent(id, deltas)
243	return nil
244}
245
246// addIfNotPresent inserts deltas under id if it does not exist, and assumes the caller
247// already holds the fifo lock.
248func (f *DeltaFIFO) addIfNotPresent(id string, deltas Deltas) {
249	f.populated = true
250	if _, exists := f.items[id]; exists {
251		return
252	}
253
254	f.queue = append(f.queue, id)
255	f.items[id] = deltas
256	f.cond.Broadcast()
257}
258
259// re-listing and watching can deliver the same update multiple times in any
260// order. This will combine the most recent two deltas if they are the same.
261func dedupDeltas(deltas Deltas) Deltas {
262	n := len(deltas)
263	if n < 2 {
264		return deltas
265	}
266	a := &deltas[n-1]
267	b := &deltas[n-2]
268	if out := isDup(a, b); out != nil {
269		d := append(Deltas{}, deltas[:n-2]...)
270		return append(d, *out)
271	}
272	return deltas
273}
274
275// If a & b represent the same event, returns the delta that ought to be kept.
276// Otherwise, returns nil.
277// TODO: is there anything other than deletions that need deduping?
278func isDup(a, b *Delta) *Delta {
279	if out := isDeletionDup(a, b); out != nil {
280		return out
281	}
282	// TODO: Detect other duplicate situations? Are there any?
283	return nil
284}
285
286// keep the one with the most information if both are deletions.
287func isDeletionDup(a, b *Delta) *Delta {
288	if b.Type != Deleted || a.Type != Deleted {
289		return nil
290	}
291	// Do more sophisticated checks, or is this sufficient?
292	if _, ok := b.Object.(DeletedFinalStateUnknown); ok {
293		return a
294	}
295	return b
296}
297
298// willObjectBeDeletedLocked returns true only if the last delta for the
299// given object is Delete. Caller must lock first.
300func (f *DeltaFIFO) willObjectBeDeletedLocked(id string) bool {
301	deltas := f.items[id]
302	return len(deltas) > 0 && deltas[len(deltas)-1].Type == Deleted
303}
304
305// queueActionLocked appends to the delta list for the object.
306// Caller must lock first.
307func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
308	id, err := f.KeyOf(obj)
309	if err != nil {
310		return KeyError{obj, err}
311	}
312
313	// If object is supposed to be deleted (last event is Deleted),
314	// then we should ignore Sync events, because it would result in
315	// recreation of this object.
316	if actionType == Sync && f.willObjectBeDeletedLocked(id) {
317		return nil
318	}
319
320	newDeltas := append(f.items[id], Delta{actionType, obj})
321	newDeltas = dedupDeltas(newDeltas)
322
323	if len(newDeltas) > 0 {
324		if _, exists := f.items[id]; !exists {
325			f.queue = append(f.queue, id)
326		}
327		f.items[id] = newDeltas
328		f.cond.Broadcast()
329	} else {
330		// We need to remove this from our map (extra items in the queue are
331		// ignored if they are not in the map).
332		delete(f.items, id)
333	}
334	return nil
335}
336
337// List returns a list of all the items; it returns the object
338// from the most recent Delta.
339// You should treat the items returned inside the deltas as immutable.
340func (f *DeltaFIFO) List() []interface{} {
341	f.lock.RLock()
342	defer f.lock.RUnlock()
343	return f.listLocked()
344}
345
346func (f *DeltaFIFO) listLocked() []interface{} {
347	list := make([]interface{}, 0, len(f.items))
348	for _, item := range f.items {
349		list = append(list, item.Newest().Object)
350	}
351	return list
352}
353
354// ListKeys returns a list of all the keys of the objects currently
355// in the FIFO.
356func (f *DeltaFIFO) ListKeys() []string {
357	f.lock.RLock()
358	defer f.lock.RUnlock()
359	list := make([]string, 0, len(f.items))
360	for key := range f.items {
361		list = append(list, key)
362	}
363	return list
364}
365
366// Get returns the complete list of deltas for the requested item,
367// or sets exists=false.
368// You should treat the items returned inside the deltas as immutable.
369func (f *DeltaFIFO) Get(obj interface{}) (item interface{}, exists bool, err error) {
370	key, err := f.KeyOf(obj)
371	if err != nil {
372		return nil, false, KeyError{obj, err}
373	}
374	return f.GetByKey(key)
375}
376
377// GetByKey returns the complete list of deltas for the requested item,
378// setting exists=false if that list is empty.
379// You should treat the items returned inside the deltas as immutable.
380func (f *DeltaFIFO) GetByKey(key string) (item interface{}, exists bool, err error) {
381	f.lock.RLock()
382	defer f.lock.RUnlock()
383	d, exists := f.items[key]
384	if exists {
385		// Copy item's slice so operations on this slice
386		// won't interfere with the object we return.
387		d = copyDeltas(d)
388	}
389	return d, exists, nil
390}
391
392// Checks if the queue is closed
393func (f *DeltaFIFO) IsClosed() bool {
394	f.closedLock.Lock()
395	defer f.closedLock.Unlock()
396	return f.closed
397}
398
399// Pop blocks until an item is added to the queue, and then returns it.  If
400// multiple items are ready, they are returned in the order in which they were
401// added/updated. The item is removed from the queue (and the store) before it
402// is returned, so if you don't successfully process it, you need to add it back
403// with AddIfNotPresent().
404// process function is called under lock, so it is safe update data structures
405// in it that need to be in sync with the queue (e.g. knownKeys). The PopProcessFunc
406// may return an instance of ErrRequeue with a nested error to indicate the current
407// item should be requeued (equivalent to calling AddIfNotPresent under the lock).
408//
409// Pop returns a 'Deltas', which has a complete list of all the things
410// that happened to the object (deltas) while it was sitting in the queue.
411func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
412	f.lock.Lock()
413	defer f.lock.Unlock()
414	for {
415		for len(f.queue) == 0 {
416			// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
417			// When Close() is called, the f.closed is set and the condition is broadcasted.
418			// Which causes this loop to continue and return from the Pop().
419			if f.IsClosed() {
420				return nil, FIFOClosedError
421			}
422
423			f.cond.Wait()
424		}
425		id := f.queue[0]
426		f.queue = f.queue[1:]
427		if f.initialPopulationCount > 0 {
428			f.initialPopulationCount--
429		}
430		item, ok := f.items[id]
431		if !ok {
432			// Item may have been deleted subsequently.
433			continue
434		}
435		delete(f.items, id)
436		err := process(item)
437		if e, ok := err.(ErrRequeue); ok {
438			f.addIfNotPresent(id, item)
439			err = e.Err
440		}
441		// Don't need to copyDeltas here, because we're transferring
442		// ownership to the caller.
443		return item, err
444	}
445}
446
447// Replace will delete the contents of 'f', using instead the given map.
448// 'f' takes ownership of the map, you should not reference the map again
449// after calling this function. f's queue is reset, too; upon return, it
450// will contain the items in the map, in no particular order.
451func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
452	f.lock.Lock()
453	defer f.lock.Unlock()
454	keys := make(sets.String, len(list))
455
456	for _, item := range list {
457		key, err := f.KeyOf(item)
458		if err != nil {
459			return KeyError{item, err}
460		}
461		keys.Insert(key)
462		if err := f.queueActionLocked(Sync, item); err != nil {
463			return fmt.Errorf("couldn't enqueue object: %v", err)
464		}
465	}
466
467	if f.knownObjects == nil {
468		// Do deletion detection against our own list.
469		queuedDeletions := 0
470		for k, oldItem := range f.items {
471			if keys.Has(k) {
472				continue
473			}
474			var deletedObj interface{}
475			if n := oldItem.Newest(); n != nil {
476				deletedObj = n.Object
477			}
478			queuedDeletions++
479			if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
480				return err
481			}
482		}
483
484		if !f.populated {
485			f.populated = true
486			// While there shouldn't be any queued deletions in the initial
487			// population of the queue, it's better to be on the safe side.
488			f.initialPopulationCount = len(list) + queuedDeletions
489		}
490
491		return nil
492	}
493
494	// Detect deletions not already in the queue.
495	knownKeys := f.knownObjects.ListKeys()
496	queuedDeletions := 0
497	for _, k := range knownKeys {
498		if keys.Has(k) {
499			continue
500		}
501
502		deletedObj, exists, err := f.knownObjects.GetByKey(k)
503		if err != nil {
504			deletedObj = nil
505			klog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k)
506		} else if !exists {
507			deletedObj = nil
508			klog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)
509		}
510		queuedDeletions++
511		if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
512			return err
513		}
514	}
515
516	if !f.populated {
517		f.populated = true
518		f.initialPopulationCount = len(list) + queuedDeletions
519	}
520
521	return nil
522}
523
524// Resync will send a sync event for each item
525func (f *DeltaFIFO) Resync() error {
526	f.lock.Lock()
527	defer f.lock.Unlock()
528
529	if f.knownObjects == nil {
530		return nil
531	}
532
533	keys := f.knownObjects.ListKeys()
534	for _, k := range keys {
535		if err := f.syncKeyLocked(k); err != nil {
536			return err
537		}
538	}
539	return nil
540}
541
542func (f *DeltaFIFO) syncKey(key string) error {
543	f.lock.Lock()
544	defer f.lock.Unlock()
545
546	return f.syncKeyLocked(key)
547}
548
549func (f *DeltaFIFO) syncKeyLocked(key string) error {
550	obj, exists, err := f.knownObjects.GetByKey(key)
551	if err != nil {
552		klog.Errorf("Unexpected error %v during lookup of key %v, unable to queue object for sync", err, key)
553		return nil
554	} else if !exists {
555		klog.Infof("Key %v does not exist in known objects store, unable to queue object for sync", key)
556		return nil
557	}
558
559	// If we are doing Resync() and there is already an event queued for that object,
560	// we ignore the Resync for it. This is to avoid the race, in which the resync
561	// comes with the previous value of object (since queueing an event for the object
562	// doesn't trigger changing the underlying store <knownObjects>.
563	id, err := f.KeyOf(obj)
564	if err != nil {
565		return KeyError{obj, err}
566	}
567	if len(f.items[id]) > 0 {
568		return nil
569	}
570
571	if err := f.queueActionLocked(Sync, obj); err != nil {
572		return fmt.Errorf("couldn't queue object: %v", err)
573	}
574	return nil
575}
576
577// A KeyListerGetter is anything that knows how to list its keys and look up by key.
578type KeyListerGetter interface {
579	KeyLister
580	KeyGetter
581}
582
583// A KeyLister is anything that knows how to list its keys.
584type KeyLister interface {
585	ListKeys() []string
586}
587
588// A KeyGetter is anything that knows how to get the value stored under a given key.
589type KeyGetter interface {
590	GetByKey(key string) (interface{}, bool, error)
591}
592
593// DeltaType is the type of a change (addition, deletion, etc)
594type DeltaType string
595
596const (
597	Added   DeltaType = "Added"
598	Updated DeltaType = "Updated"
599	Deleted DeltaType = "Deleted"
600	// The other types are obvious. You'll get Sync deltas when:
601	//  * A watch expires/errors out and a new list/watch cycle is started.
602	//  * You've turned on periodic syncs.
603	// (Anything that trigger's DeltaFIFO's Replace() method.)
604	Sync DeltaType = "Sync"
605)
606
607// Delta is the type stored by a DeltaFIFO. It tells you what change
608// happened, and the object's state after* that change.
609//
610// [*] Unless the change is a deletion, and then you'll get the final
611//     state of the object before it was deleted.
612type Delta struct {
613	Type   DeltaType
614	Object interface{}
615}
616
617// Deltas is a list of one or more 'Delta's to an individual object.
618// The oldest delta is at index 0, the newest delta is the last one.
619type Deltas []Delta
620
621// Oldest is a convenience function that returns the oldest delta, or
622// nil if there are no deltas.
623func (d Deltas) Oldest() *Delta {
624	if len(d) > 0 {
625		return &d[0]
626	}
627	return nil
628}
629
630// Newest is a convenience function that returns the newest delta, or
631// nil if there are no deltas.
632func (d Deltas) Newest() *Delta {
633	if n := len(d); n > 0 {
634		return &d[n-1]
635	}
636	return nil
637}
638
639// copyDeltas returns a shallow copy of d; that is, it copies the slice but not
640// the objects in the slice. This allows Get/List to return an object that we
641// know won't be clobbered by a subsequent modifications.
642func copyDeltas(d Deltas) Deltas {
643	d2 := make(Deltas, len(d))
644	copy(d2, d)
645	return d2
646}
647
648// DeletedFinalStateUnknown is placed into a DeltaFIFO in the case where
649// an object was deleted but the watch deletion event was missed. In this
650// case we don't know the final "resting" state of the object, so there's
651// a chance the included `Obj` is stale.
652type DeletedFinalStateUnknown struct {
653	Key string
654	Obj interface{}
655}
656