1/*
2Copyright 2014 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package cache
18
19import (
20	"errors"
21	"sync"
22
23	"k8s.io/apimachinery/pkg/util/sets"
24)
25
26// PopProcessFunc is passed to Pop() method of Queue interface.
27// It is supposed to process the accumulator popped from the queue.
28type PopProcessFunc func(interface{}) error
29
30// ErrRequeue may be returned by a PopProcessFunc to safely requeue
31// the current item. The value of Err will be returned from Pop.
32type ErrRequeue struct {
33	// Err is returned by the Pop function
34	Err error
35}
36
37// ErrFIFOClosed used when FIFO is closed
38var ErrFIFOClosed = errors.New("DeltaFIFO: manipulating with closed queue")
39
40func (e ErrRequeue) Error() string {
41	if e.Err == nil {
42		return "the popped item should be requeued without returning an error"
43	}
44	return e.Err.Error()
45}
46
47// Queue extends Store with a collection of Store keys to "process".
48// Every Add, Update, or Delete may put the object's key in that collection.
49// A Queue has a way to derive the corresponding key given an accumulator.
50// A Queue can be accessed concurrently from multiple goroutines.
51// A Queue can be "closed", after which Pop operations return an error.
52type Queue interface {
53	Store
54
55	// Pop blocks until there is at least one key to process or the
56	// Queue is closed.  In the latter case Pop returns with an error.
57	// In the former case Pop atomically picks one key to process,
58	// removes that (key, accumulator) association from the Store, and
59	// processes the accumulator.  Pop returns the accumulator that
60	// was processed and the result of processing.  The PopProcessFunc
61	// may return an ErrRequeue{inner} and in this case Pop will (a)
62	// return that (key, accumulator) association to the Queue as part
63	// of the atomic processing and (b) return the inner error from
64	// Pop.
65	Pop(PopProcessFunc) (interface{}, error)
66
67	// AddIfNotPresent puts the given accumulator into the Queue (in
68	// association with the accumulator's key) if and only if that key
69	// is not already associated with a non-empty accumulator.
70	AddIfNotPresent(interface{}) error
71
72	// HasSynced returns true if the first batch of keys have all been
73	// popped.  The first batch of keys are those of the first Replace
74	// operation if that happened before any Add, AddIfNotPresent,
75	// Update, or Delete; otherwise the first batch is empty.
76	HasSynced() bool
77
78	// Close the queue
79	Close()
80}
81
82// Pop is helper function for popping from Queue.
83// WARNING: Do NOT use this function in non-test code to avoid races
84// unless you really really really really know what you are doing.
85func Pop(queue Queue) interface{} {
86	var result interface{}
87	queue.Pop(func(obj interface{}) error {
88		result = obj
89		return nil
90	})
91	return result
92}
93
94// FIFO is a Queue in which (a) each accumulator is simply the most
95// recently provided object and (b) the collection of keys to process
96// is a FIFO.  The accumulators all start out empty, and deleting an
97// object from its accumulator empties the accumulator.  The Resync
98// operation is a no-op.
99//
100// Thus: if multiple adds/updates of a single object happen while that
101// object's key is in the queue before it has been processed then it
102// will only be processed once, and when it is processed the most
103// recent version will be processed. This can't be done with a channel
104//
105// FIFO solves this use case:
106//  * You want to process every object (exactly) once.
107//  * You want to process the most recent version of the object when you process it.
108//  * You do not want to process deleted objects, they should be removed from the queue.
109//  * You do not want to periodically reprocess objects.
110// Compare with DeltaFIFO for other use cases.
111type FIFO struct {
112	lock sync.RWMutex
113	cond sync.Cond
114	// We depend on the property that every key in `items` is also in `queue`
115	items map[string]interface{}
116	queue []string
117
118	// populated is true if the first batch of items inserted by Replace() has been populated
119	// or Delete/Add/Update was called first.
120	populated bool
121	// initialPopulationCount is the number of items inserted by the first call of Replace()
122	initialPopulationCount int
123
124	// keyFunc is used to make the key used for queued item insertion and retrieval, and
125	// should be deterministic.
126	keyFunc KeyFunc
127
128	// Indication the queue is closed.
129	// Used to indicate a queue is closed so a control loop can exit when a queue is empty.
130	// Currently, not used to gate any of CRED operations.
131	closed bool
132}
133
134var (
135	_ = Queue(&FIFO{}) // FIFO is a Queue
136)
137
138// Close the queue.
139func (f *FIFO) Close() {
140	f.lock.Lock()
141	defer f.lock.Unlock()
142	f.closed = true
143	f.cond.Broadcast()
144}
145
146// HasSynced returns true if an Add/Update/Delete/AddIfNotPresent are called first,
147// or the first batch of items inserted by Replace() has been popped.
148func (f *FIFO) HasSynced() bool {
149	f.lock.Lock()
150	defer f.lock.Unlock()
151	return f.populated && f.initialPopulationCount == 0
152}
153
154// Add inserts an item, and puts it in the queue. The item is only enqueued
155// if it doesn't already exist in the set.
156func (f *FIFO) Add(obj interface{}) error {
157	id, err := f.keyFunc(obj)
158	if err != nil {
159		return KeyError{obj, err}
160	}
161	f.lock.Lock()
162	defer f.lock.Unlock()
163	f.populated = true
164	if _, exists := f.items[id]; !exists {
165		f.queue = append(f.queue, id)
166	}
167	f.items[id] = obj
168	f.cond.Broadcast()
169	return nil
170}
171
172// AddIfNotPresent inserts an item, and puts it in the queue. If the item is already
173// present in the set, it is neither enqueued nor added to the set.
174//
175// This is useful in a single producer/consumer scenario so that the consumer can
176// safely retry items without contending with the producer and potentially enqueueing
177// stale items.
178func (f *FIFO) AddIfNotPresent(obj interface{}) error {
179	id, err := f.keyFunc(obj)
180	if err != nil {
181		return KeyError{obj, err}
182	}
183	f.lock.Lock()
184	defer f.lock.Unlock()
185	f.addIfNotPresent(id, obj)
186	return nil
187}
188
189// addIfNotPresent assumes the fifo lock is already held and adds the provided
190// item to the queue under id if it does not already exist.
191func (f *FIFO) addIfNotPresent(id string, obj interface{}) {
192	f.populated = true
193	if _, exists := f.items[id]; exists {
194		return
195	}
196
197	f.queue = append(f.queue, id)
198	f.items[id] = obj
199	f.cond.Broadcast()
200}
201
202// Update is the same as Add in this implementation.
203func (f *FIFO) Update(obj interface{}) error {
204	return f.Add(obj)
205}
206
207// Delete removes an item. It doesn't add it to the queue, because
208// this implementation assumes the consumer only cares about the objects,
209// not the order in which they were created/added.
210func (f *FIFO) Delete(obj interface{}) error {
211	id, err := f.keyFunc(obj)
212	if err != nil {
213		return KeyError{obj, err}
214	}
215	f.lock.Lock()
216	defer f.lock.Unlock()
217	f.populated = true
218	delete(f.items, id)
219	return err
220}
221
222// List returns a list of all the items.
223func (f *FIFO) List() []interface{} {
224	f.lock.RLock()
225	defer f.lock.RUnlock()
226	list := make([]interface{}, 0, len(f.items))
227	for _, item := range f.items {
228		list = append(list, item)
229	}
230	return list
231}
232
233// ListKeys returns a list of all the keys of the objects currently
234// in the FIFO.
235func (f *FIFO) ListKeys() []string {
236	f.lock.RLock()
237	defer f.lock.RUnlock()
238	list := make([]string, 0, len(f.items))
239	for key := range f.items {
240		list = append(list, key)
241	}
242	return list
243}
244
245// Get returns the requested item, or sets exists=false.
246func (f *FIFO) Get(obj interface{}) (item interface{}, exists bool, err error) {
247	key, err := f.keyFunc(obj)
248	if err != nil {
249		return nil, false, KeyError{obj, err}
250	}
251	return f.GetByKey(key)
252}
253
254// GetByKey returns the requested item, or sets exists=false.
255func (f *FIFO) GetByKey(key string) (item interface{}, exists bool, err error) {
256	f.lock.RLock()
257	defer f.lock.RUnlock()
258	item, exists = f.items[key]
259	return item, exists, nil
260}
261
262// IsClosed checks if the queue is closed
263func (f *FIFO) IsClosed() bool {
264	f.lock.Lock()
265	defer f.lock.Unlock()
266	return f.closed
267}
268
269// Pop waits until an item is ready and processes it. If multiple items are
270// ready, they are returned in the order in which they were added/updated.
271// The item is removed from the queue (and the store) before it is processed,
272// so if you don't successfully process it, it should be added back with
273// AddIfNotPresent(). process function is called under lock, so it is safe
274// update data structures in it that need to be in sync with the queue.
275func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) {
276	f.lock.Lock()
277	defer f.lock.Unlock()
278	for {
279		for len(f.queue) == 0 {
280			// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
281			// When Close() is called, the f.closed is set and the condition is broadcasted.
282			// Which causes this loop to continue and return from the Pop().
283			if f.closed {
284				return nil, ErrFIFOClosed
285			}
286
287			f.cond.Wait()
288		}
289		id := f.queue[0]
290		f.queue = f.queue[1:]
291		if f.initialPopulationCount > 0 {
292			f.initialPopulationCount--
293		}
294		item, ok := f.items[id]
295		if !ok {
296			// Item may have been deleted subsequently.
297			continue
298		}
299		delete(f.items, id)
300		err := process(item)
301		if e, ok := err.(ErrRequeue); ok {
302			f.addIfNotPresent(id, item)
303			err = e.Err
304		}
305		return item, err
306	}
307}
308
309// Replace will delete the contents of 'f', using instead the given map.
310// 'f' takes ownership of the map, you should not reference the map again
311// after calling this function. f's queue is reset, too; upon return, it
312// will contain the items in the map, in no particular order.
313func (f *FIFO) Replace(list []interface{}, resourceVersion string) error {
314	items := make(map[string]interface{}, len(list))
315	for _, item := range list {
316		key, err := f.keyFunc(item)
317		if err != nil {
318			return KeyError{item, err}
319		}
320		items[key] = item
321	}
322
323	f.lock.Lock()
324	defer f.lock.Unlock()
325
326	if !f.populated {
327		f.populated = true
328		f.initialPopulationCount = len(items)
329	}
330
331	f.items = items
332	f.queue = f.queue[:0]
333	for id := range items {
334		f.queue = append(f.queue, id)
335	}
336	if len(f.queue) > 0 {
337		f.cond.Broadcast()
338	}
339	return nil
340}
341
342// Resync will ensure that every object in the Store has its key in the queue.
343// This should be a no-op, because that property is maintained by all operations.
344func (f *FIFO) Resync() error {
345	f.lock.Lock()
346	defer f.lock.Unlock()
347
348	inQueue := sets.NewString()
349	for _, id := range f.queue {
350		inQueue.Insert(id)
351	}
352	for id := range f.items {
353		if !inQueue.Has(id) {
354			f.queue = append(f.queue, id)
355		}
356	}
357	if len(f.queue) > 0 {
358		f.cond.Broadcast()
359	}
360	return nil
361}
362
363// NewFIFO returns a Store which can be used to queue up items to
364// process.
365func NewFIFO(keyFunc KeyFunc) *FIFO {
366	f := &FIFO{
367		items:   map[string]interface{}{},
368		queue:   []string{},
369		keyFunc: keyFunc,
370	}
371	f.cond.L = &f.lock
372	return f
373}
374