1/*
2Copyright 2014 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package cache
18
19import (
20	"fmt"
21	"sync"
22
23	"k8s.io/apimachinery/pkg/util/sets"
24)
25
26// ThreadSafeStore is an interface that allows concurrent indexed
27// access to a storage backend.  It is like Indexer but does not
28// (necessarily) know how to extract the Store key from a given
29// object.
30//
31// TL;DR caveats: you must not modify anything returned by Get or List as it will break
32// the indexing feature in addition to not being thread safe.
33//
34// The guarantees of thread safety provided by List/Get are only valid if the caller
35// treats returned items as read-only. For example, a pointer inserted in the store
36// through `Add` will be returned as is by `Get`. Multiple clients might invoke `Get`
37// on the same key and modify the pointer in a non-thread-safe way. Also note that
38// modifying objects stored by the indexers (if any) will *not* automatically lead
39// to a re-index. So it's not a good idea to directly modify the objects returned by
40// Get/List, in general.
41type ThreadSafeStore interface {
42	Add(key string, obj interface{})
43	Update(key string, obj interface{})
44	Delete(key string)
45	Get(key string) (item interface{}, exists bool)
46	List() []interface{}
47	ListKeys() []string
48	Replace(map[string]interface{}, string)
49	Index(indexName string, obj interface{}) ([]interface{}, error)
50	IndexKeys(indexName, indexKey string) ([]string, error)
51	ListIndexFuncValues(name string) []string
52	ByIndex(indexName, indexKey string) ([]interface{}, error)
53	GetIndexers() Indexers
54
55	// AddIndexers adds more indexers to this store.  If you call this after you already have data
56	// in the store, the results are undefined.
57	AddIndexers(newIndexers Indexers) error
58	// Resync is a no-op and is deprecated
59	Resync() error
60}
61
62// threadSafeMap implements ThreadSafeStore
63type threadSafeMap struct {
64	lock  sync.RWMutex
65	items map[string]interface{}
66
67	// indexers maps a name to an IndexFunc
68	indexers Indexers
69	// indices maps a name to an Index
70	indices Indices
71}
72
73func (c *threadSafeMap) Add(key string, obj interface{}) {
74	c.lock.Lock()
75	defer c.lock.Unlock()
76	oldObject := c.items[key]
77	c.items[key] = obj
78	c.updateIndices(oldObject, obj, key)
79}
80
81func (c *threadSafeMap) Update(key string, obj interface{}) {
82	c.lock.Lock()
83	defer c.lock.Unlock()
84	oldObject := c.items[key]
85	c.items[key] = obj
86	c.updateIndices(oldObject, obj, key)
87}
88
89func (c *threadSafeMap) Delete(key string) {
90	c.lock.Lock()
91	defer c.lock.Unlock()
92	if obj, exists := c.items[key]; exists {
93		c.deleteFromIndices(obj, key)
94		delete(c.items, key)
95	}
96}
97
98func (c *threadSafeMap) Get(key string) (item interface{}, exists bool) {
99	c.lock.RLock()
100	defer c.lock.RUnlock()
101	item, exists = c.items[key]
102	return item, exists
103}
104
105func (c *threadSafeMap) List() []interface{} {
106	c.lock.RLock()
107	defer c.lock.RUnlock()
108	list := make([]interface{}, 0, len(c.items))
109	for _, item := range c.items {
110		list = append(list, item)
111	}
112	return list
113}
114
115// ListKeys returns a list of all the keys of the objects currently
116// in the threadSafeMap.
117func (c *threadSafeMap) ListKeys() []string {
118	c.lock.RLock()
119	defer c.lock.RUnlock()
120	list := make([]string, 0, len(c.items))
121	for key := range c.items {
122		list = append(list, key)
123	}
124	return list
125}
126
127func (c *threadSafeMap) Replace(items map[string]interface{}, resourceVersion string) {
128	c.lock.Lock()
129	defer c.lock.Unlock()
130	c.items = items
131
132	// rebuild any index
133	c.indices = Indices{}
134	for key, item := range c.items {
135		c.updateIndices(nil, item, key)
136	}
137}
138
139// Index returns a list of items that match the given object on the index function.
140// Index is thread-safe so long as you treat all items as immutable.
141func (c *threadSafeMap) Index(indexName string, obj interface{}) ([]interface{}, error) {
142	c.lock.RLock()
143	defer c.lock.RUnlock()
144
145	indexFunc := c.indexers[indexName]
146	if indexFunc == nil {
147		return nil, fmt.Errorf("Index with name %s does not exist", indexName)
148	}
149
150	indexedValues, err := indexFunc(obj)
151	if err != nil {
152		return nil, err
153	}
154	index := c.indices[indexName]
155
156	var storeKeySet sets.String
157	if len(indexedValues) == 1 {
158		// In majority of cases, there is exactly one value matching.
159		// Optimize the most common path - deduping is not needed here.
160		storeKeySet = index[indexedValues[0]]
161	} else {
162		// Need to de-dupe the return list.
163		// Since multiple keys are allowed, this can happen.
164		storeKeySet = sets.String{}
165		for _, indexedValue := range indexedValues {
166			for key := range index[indexedValue] {
167				storeKeySet.Insert(key)
168			}
169		}
170	}
171
172	list := make([]interface{}, 0, storeKeySet.Len())
173	for storeKey := range storeKeySet {
174		list = append(list, c.items[storeKey])
175	}
176	return list, nil
177}
178
179// ByIndex returns a list of the items whose indexed values in the given index include the given indexed value
180func (c *threadSafeMap) ByIndex(indexName, indexedValue string) ([]interface{}, error) {
181	c.lock.RLock()
182	defer c.lock.RUnlock()
183
184	indexFunc := c.indexers[indexName]
185	if indexFunc == nil {
186		return nil, fmt.Errorf("Index with name %s does not exist", indexName)
187	}
188
189	index := c.indices[indexName]
190
191	set := index[indexedValue]
192	list := make([]interface{}, 0, set.Len())
193	for key := range set {
194		list = append(list, c.items[key])
195	}
196
197	return list, nil
198}
199
200// IndexKeys returns a list of the Store keys of the objects whose indexed values in the given index include the given indexed value.
201// IndexKeys is thread-safe so long as you treat all items as immutable.
202func (c *threadSafeMap) IndexKeys(indexName, indexedValue string) ([]string, error) {
203	c.lock.RLock()
204	defer c.lock.RUnlock()
205
206	indexFunc := c.indexers[indexName]
207	if indexFunc == nil {
208		return nil, fmt.Errorf("Index with name %s does not exist", indexName)
209	}
210
211	index := c.indices[indexName]
212
213	set := index[indexedValue]
214	return set.List(), nil
215}
216
217func (c *threadSafeMap) ListIndexFuncValues(indexName string) []string {
218	c.lock.RLock()
219	defer c.lock.RUnlock()
220
221	index := c.indices[indexName]
222	names := make([]string, 0, len(index))
223	for key := range index {
224		names = append(names, key)
225	}
226	return names
227}
228
229func (c *threadSafeMap) GetIndexers() Indexers {
230	return c.indexers
231}
232
233func (c *threadSafeMap) AddIndexers(newIndexers Indexers) error {
234	c.lock.Lock()
235	defer c.lock.Unlock()
236
237	if len(c.items) > 0 {
238		return fmt.Errorf("cannot add indexers to running index")
239	}
240
241	oldKeys := sets.StringKeySet(c.indexers)
242	newKeys := sets.StringKeySet(newIndexers)
243
244	if oldKeys.HasAny(newKeys.List()...) {
245		return fmt.Errorf("indexer conflict: %v", oldKeys.Intersection(newKeys))
246	}
247
248	for k, v := range newIndexers {
249		c.indexers[k] = v
250	}
251	return nil
252}
253
254// updateIndices modifies the objects location in the managed indexes, if this is an update, you must provide an oldObj
255// updateIndices must be called from a function that already has a lock on the cache
256func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) {
257	// if we got an old object, we need to remove it before we add it again
258	if oldObj != nil {
259		c.deleteFromIndices(oldObj, key)
260	}
261	for name, indexFunc := range c.indexers {
262		indexValues, err := indexFunc(newObj)
263		if err != nil {
264			panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
265		}
266		index := c.indices[name]
267		if index == nil {
268			index = Index{}
269			c.indices[name] = index
270		}
271
272		for _, indexValue := range indexValues {
273			set := index[indexValue]
274			if set == nil {
275				set = sets.String{}
276				index[indexValue] = set
277			}
278			set.Insert(key)
279		}
280	}
281}
282
283// deleteFromIndices removes the object from each of the managed indexes
284// it is intended to be called from a function that already has a lock on the cache
285func (c *threadSafeMap) deleteFromIndices(obj interface{}, key string) {
286	for name, indexFunc := range c.indexers {
287		indexValues, err := indexFunc(obj)
288		if err != nil {
289			panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
290		}
291
292		index := c.indices[name]
293		if index == nil {
294			continue
295		}
296		for _, indexValue := range indexValues {
297			set := index[indexValue]
298			if set != nil {
299				set.Delete(key)
300
301				// If we don't delete the set when zero, indices with high cardinality
302				// short lived resources can cause memory to increase over time from
303				// unused empty sets. See `kubernetes/kubernetes/issues/84959`.
304				if len(set) == 0 {
305					delete(index, indexValue)
306				}
307			}
308		}
309	}
310}
311
312func (c *threadSafeMap) Resync() error {
313	// Nothing to do
314	return nil
315}
316
317// NewThreadSafeStore creates a new instance of ThreadSafeStore.
318func NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore {
319	return &threadSafeMap{
320		items:    map[string]interface{}{},
321		indexers: indexers,
322		indices:  indices,
323	}
324}
325