1/*
2Copyright 2017 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17// This file implements a heap data structure.
18
19package cache
20
21import (
22	"container/heap"
23	"fmt"
24	"sync"
25)
26
27const (
28	closedMsg = "heap is closed"
29)
30
31// LessFunc is used to compare two objects in the heap.
32type LessFunc func(interface{}, interface{}) bool
33
34type heapItem struct {
35	obj   interface{} // The object which is stored in the heap.
36	index int         // The index of the object's key in the Heap.queue.
37}
38
39type itemKeyValue struct {
40	key string
41	obj interface{}
42}
43
44// heapData is an internal struct that implements the standard heap interface
45// and keeps the data stored in the heap.
46type heapData struct {
47	// items is a map from key of the objects to the objects and their index.
48	// We depend on the property that items in the map are in the queue and vice versa.
49	items map[string]*heapItem
50	// queue implements a heap data structure and keeps the order of elements
51	// according to the heap invariant. The queue keeps the keys of objects stored
52	// in "items".
53	queue []string
54
55	// keyFunc is used to make the key used for queued item insertion and retrieval, and
56	// should be deterministic.
57	keyFunc KeyFunc
58	// lessFunc is used to compare two objects in the heap.
59	lessFunc LessFunc
60}
61
62var (
63	_ = heap.Interface(&heapData{}) // heapData is a standard heap
64)
65
66// Less compares two objects and returns true if the first one should go
67// in front of the second one in the heap.
68func (h *heapData) Less(i, j int) bool {
69	if i > len(h.queue) || j > len(h.queue) {
70		return false
71	}
72	itemi, ok := h.items[h.queue[i]]
73	if !ok {
74		return false
75	}
76	itemj, ok := h.items[h.queue[j]]
77	if !ok {
78		return false
79	}
80	return h.lessFunc(itemi.obj, itemj.obj)
81}
82
83// Len returns the number of items in the Heap.
84func (h *heapData) Len() int { return len(h.queue) }
85
86// Swap implements swapping of two elements in the heap. This is a part of standard
87// heap interface and should never be called directly.
88func (h *heapData) Swap(i, j int) {
89	h.queue[i], h.queue[j] = h.queue[j], h.queue[i]
90	item := h.items[h.queue[i]]
91	item.index = i
92	item = h.items[h.queue[j]]
93	item.index = j
94}
95
96// Push is supposed to be called by heap.Push only.
97func (h *heapData) Push(kv interface{}) {
98	keyValue := kv.(*itemKeyValue)
99	n := len(h.queue)
100	h.items[keyValue.key] = &heapItem{keyValue.obj, n}
101	h.queue = append(h.queue, keyValue.key)
102}
103
104// Pop is supposed to be called by heap.Pop only.
105func (h *heapData) Pop() interface{} {
106	key := h.queue[len(h.queue)-1]
107	h.queue = h.queue[0 : len(h.queue)-1]
108	item, ok := h.items[key]
109	if !ok {
110		// This is an error
111		return nil
112	}
113	delete(h.items, key)
114	return item.obj
115}
116
117// Heap is a thread-safe producer/consumer queue that implements a heap data structure.
118// It can be used to implement priority queues and similar data structures.
119type Heap struct {
120	lock sync.RWMutex
121	cond sync.Cond
122
123	// data stores objects and has a queue that keeps their ordering according
124	// to the heap invariant.
125	data *heapData
126
127	// closed indicates that the queue is closed.
128	// It is mainly used to let Pop() exit its control loop while waiting for an item.
129	closed bool
130}
131
132// Close the Heap and signals condition variables that may be waiting to pop
133// items from the heap.
134func (h *Heap) Close() {
135	h.lock.Lock()
136	defer h.lock.Unlock()
137	h.closed = true
138	h.cond.Broadcast()
139}
140
141// Add inserts an item, and puts it in the queue. The item is updated if it
142// already exists.
143func (h *Heap) Add(obj interface{}) error {
144	key, err := h.data.keyFunc(obj)
145	if err != nil {
146		return KeyError{obj, err}
147	}
148	h.lock.Lock()
149	defer h.lock.Unlock()
150	if h.closed {
151		return fmt.Errorf(closedMsg)
152	}
153	if _, exists := h.data.items[key]; exists {
154		h.data.items[key].obj = obj
155		heap.Fix(h.data, h.data.items[key].index)
156	} else {
157		h.addIfNotPresentLocked(key, obj)
158	}
159	h.cond.Broadcast()
160	return nil
161}
162
163// BulkAdd adds all the items in the list to the queue and then signals the condition
164// variable. It is useful when the caller would like to add all of the items
165// to the queue before consumer starts processing them.
166func (h *Heap) BulkAdd(list []interface{}) error {
167	h.lock.Lock()
168	defer h.lock.Unlock()
169	if h.closed {
170		return fmt.Errorf(closedMsg)
171	}
172	for _, obj := range list {
173		key, err := h.data.keyFunc(obj)
174		if err != nil {
175			return KeyError{obj, err}
176		}
177		if _, exists := h.data.items[key]; exists {
178			h.data.items[key].obj = obj
179			heap.Fix(h.data, h.data.items[key].index)
180		} else {
181			h.addIfNotPresentLocked(key, obj)
182		}
183	}
184	h.cond.Broadcast()
185	return nil
186}
187
188// AddIfNotPresent inserts an item, and puts it in the queue. If an item with
189// the key is present in the map, no changes is made to the item.
190//
191// This is useful in a single producer/consumer scenario so that the consumer can
192// safely retry items without contending with the producer and potentially enqueueing
193// stale items.
194func (h *Heap) AddIfNotPresent(obj interface{}) error {
195	id, err := h.data.keyFunc(obj)
196	if err != nil {
197		return KeyError{obj, err}
198	}
199	h.lock.Lock()
200	defer h.lock.Unlock()
201	if h.closed {
202		return fmt.Errorf(closedMsg)
203	}
204	h.addIfNotPresentLocked(id, obj)
205	h.cond.Broadcast()
206	return nil
207}
208
209// addIfNotPresentLocked assumes the lock is already held and adds the provided
210// item to the queue if it does not already exist.
211func (h *Heap) addIfNotPresentLocked(key string, obj interface{}) {
212	if _, exists := h.data.items[key]; exists {
213		return
214	}
215	heap.Push(h.data, &itemKeyValue{key, obj})
216}
217
218// Update is the same as Add in this implementation. When the item does not
219// exist, it is added.
220func (h *Heap) Update(obj interface{}) error {
221	return h.Add(obj)
222}
223
224// Delete removes an item.
225func (h *Heap) Delete(obj interface{}) error {
226	key, err := h.data.keyFunc(obj)
227	if err != nil {
228		return KeyError{obj, err}
229	}
230	h.lock.Lock()
231	defer h.lock.Unlock()
232	if item, ok := h.data.items[key]; ok {
233		heap.Remove(h.data, item.index)
234		return nil
235	}
236	return fmt.Errorf("object not found")
237}
238
239// Pop waits until an item is ready. If multiple items are
240// ready, they are returned in the order given by Heap.data.lessFunc.
241func (h *Heap) Pop() (interface{}, error) {
242	h.lock.Lock()
243	defer h.lock.Unlock()
244	for len(h.data.queue) == 0 {
245		// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
246		// When Close() is called, the h.closed is set and the condition is broadcast,
247		// which causes this loop to continue and return from the Pop().
248		if h.closed {
249			return nil, fmt.Errorf("heap is closed")
250		}
251		h.cond.Wait()
252	}
253	obj := heap.Pop(h.data)
254	if obj == nil {
255		return nil, fmt.Errorf("object was removed from heap data")
256	}
257
258	return obj, nil
259}
260
261// List returns a list of all the items.
262func (h *Heap) List() []interface{} {
263	h.lock.RLock()
264	defer h.lock.RUnlock()
265	list := make([]interface{}, 0, len(h.data.items))
266	for _, item := range h.data.items {
267		list = append(list, item.obj)
268	}
269	return list
270}
271
272// ListKeys returns a list of all the keys of the objects currently in the Heap.
273func (h *Heap) ListKeys() []string {
274	h.lock.RLock()
275	defer h.lock.RUnlock()
276	list := make([]string, 0, len(h.data.items))
277	for key := range h.data.items {
278		list = append(list, key)
279	}
280	return list
281}
282
283// Get returns the requested item, or sets exists=false.
284func (h *Heap) Get(obj interface{}) (interface{}, bool, error) {
285	key, err := h.data.keyFunc(obj)
286	if err != nil {
287		return nil, false, KeyError{obj, err}
288	}
289	return h.GetByKey(key)
290}
291
292// GetByKey returns the requested item, or sets exists=false.
293func (h *Heap) GetByKey(key string) (interface{}, bool, error) {
294	h.lock.RLock()
295	defer h.lock.RUnlock()
296	item, exists := h.data.items[key]
297	if !exists {
298		return nil, false, nil
299	}
300	return item.obj, true, nil
301}
302
303// IsClosed returns true if the queue is closed.
304func (h *Heap) IsClosed() bool {
305	h.lock.RLock()
306	defer h.lock.RUnlock()
307	return h.closed
308}
309
310// NewHeap returns a Heap which can be used to queue up items to process.
311func NewHeap(keyFn KeyFunc, lessFn LessFunc) *Heap {
312	h := &Heap{
313		data: &heapData{
314			items:    map[string]*heapItem{},
315			queue:    []string{},
316			keyFunc:  keyFn,
317			lessFunc: lessFn,
318		},
319	}
320	h.cond.L = &h.lock
321	return h
322}
323