1/* 2Copyright 2014 The Kubernetes Authors. 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package cache 18 19import ( 20 "fmt" 21 "sync" 22 23 "k8s.io/apimachinery/pkg/util/sets" 24) 25 26// ThreadSafeStore is an interface that allows concurrent indexed 27// access to a storage backend. It is like Indexer but does not 28// (necessarily) know how to extract the Store key from a given 29// object. 30// 31// TL;DR caveats: you must not modify anything returned by Get or List as it will break 32// the indexing feature in addition to not being thread safe. 33// 34// The guarantees of thread safety provided by List/Get are only valid if the caller 35// treats returned items as read-only. For example, a pointer inserted in the store 36// through `Add` will be returned as is by `Get`. Multiple clients might invoke `Get` 37// on the same key and modify the pointer in a non-thread-safe way. Also note that 38// modifying objects stored by the indexers (if any) will *not* automatically lead 39// to a re-index. So it's not a good idea to directly modify the objects returned by 40// Get/List, in general. 41type ThreadSafeStore interface { 42 Add(key string, obj interface{}) 43 Update(key string, obj interface{}) 44 Delete(key string) 45 Get(key string) (item interface{}, exists bool) 46 List() []interface{} 47 ListKeys() []string 48 Replace(map[string]interface{}, string) 49 Index(indexName string, obj interface{}) ([]interface{}, error) 50 IndexKeys(indexName, indexKey string) ([]string, error) 51 ListIndexFuncValues(name string) []string 52 ByIndex(indexName, indexKey string) ([]interface{}, error) 53 GetIndexers() Indexers 54 55 // AddIndexers adds more indexers to this store. If you call this after you already have data 56 // in the store, the results are undefined. 57 AddIndexers(newIndexers Indexers) error 58 // Resync is a no-op and is deprecated 59 Resync() error 60} 61 62// threadSafeMap implements ThreadSafeStore 63type threadSafeMap struct { 64 lock sync.RWMutex 65 items map[string]interface{} 66 67 // indexers maps a name to an IndexFunc 68 indexers Indexers 69 // indices maps a name to an Index 70 indices Indices 71} 72 73func (c *threadSafeMap) Add(key string, obj interface{}) { 74 c.lock.Lock() 75 defer c.lock.Unlock() 76 oldObject := c.items[key] 77 c.items[key] = obj 78 c.updateIndices(oldObject, obj, key) 79} 80 81func (c *threadSafeMap) Update(key string, obj interface{}) { 82 c.lock.Lock() 83 defer c.lock.Unlock() 84 oldObject := c.items[key] 85 c.items[key] = obj 86 c.updateIndices(oldObject, obj, key) 87} 88 89func (c *threadSafeMap) Delete(key string) { 90 c.lock.Lock() 91 defer c.lock.Unlock() 92 if obj, exists := c.items[key]; exists { 93 c.deleteFromIndices(obj, key) 94 delete(c.items, key) 95 } 96} 97 98func (c *threadSafeMap) Get(key string) (item interface{}, exists bool) { 99 c.lock.RLock() 100 defer c.lock.RUnlock() 101 item, exists = c.items[key] 102 return item, exists 103} 104 105func (c *threadSafeMap) List() []interface{} { 106 c.lock.RLock() 107 defer c.lock.RUnlock() 108 list := make([]interface{}, 0, len(c.items)) 109 for _, item := range c.items { 110 list = append(list, item) 111 } 112 return list 113} 114 115// ListKeys returns a list of all the keys of the objects currently 116// in the threadSafeMap. 117func (c *threadSafeMap) ListKeys() []string { 118 c.lock.RLock() 119 defer c.lock.RUnlock() 120 list := make([]string, 0, len(c.items)) 121 for key := range c.items { 122 list = append(list, key) 123 } 124 return list 125} 126 127func (c *threadSafeMap) Replace(items map[string]interface{}, resourceVersion string) { 128 c.lock.Lock() 129 defer c.lock.Unlock() 130 c.items = items 131 132 // rebuild any index 133 c.indices = Indices{} 134 for key, item := range c.items { 135 c.updateIndices(nil, item, key) 136 } 137} 138 139// Index returns a list of items that match the given object on the index function. 140// Index is thread-safe so long as you treat all items as immutable. 141func (c *threadSafeMap) Index(indexName string, obj interface{}) ([]interface{}, error) { 142 c.lock.RLock() 143 defer c.lock.RUnlock() 144 145 indexFunc := c.indexers[indexName] 146 if indexFunc == nil { 147 return nil, fmt.Errorf("Index with name %s does not exist", indexName) 148 } 149 150 indexedValues, err := indexFunc(obj) 151 if err != nil { 152 return nil, err 153 } 154 index := c.indices[indexName] 155 156 var storeKeySet sets.String 157 if len(indexedValues) == 1 { 158 // In majority of cases, there is exactly one value matching. 159 // Optimize the most common path - deduping is not needed here. 160 storeKeySet = index[indexedValues[0]] 161 } else { 162 // Need to de-dupe the return list. 163 // Since multiple keys are allowed, this can happen. 164 storeKeySet = sets.String{} 165 for _, indexedValue := range indexedValues { 166 for key := range index[indexedValue] { 167 storeKeySet.Insert(key) 168 } 169 } 170 } 171 172 list := make([]interface{}, 0, storeKeySet.Len()) 173 for storeKey := range storeKeySet { 174 list = append(list, c.items[storeKey]) 175 } 176 return list, nil 177} 178 179// ByIndex returns a list of the items whose indexed values in the given index include the given indexed value 180func (c *threadSafeMap) ByIndex(indexName, indexedValue string) ([]interface{}, error) { 181 c.lock.RLock() 182 defer c.lock.RUnlock() 183 184 indexFunc := c.indexers[indexName] 185 if indexFunc == nil { 186 return nil, fmt.Errorf("Index with name %s does not exist", indexName) 187 } 188 189 index := c.indices[indexName] 190 191 set := index[indexedValue] 192 list := make([]interface{}, 0, set.Len()) 193 for key := range set { 194 list = append(list, c.items[key]) 195 } 196 197 return list, nil 198} 199 200// IndexKeys returns a list of the Store keys of the objects whose indexed values in the given index include the given indexed value. 201// IndexKeys is thread-safe so long as you treat all items as immutable. 202func (c *threadSafeMap) IndexKeys(indexName, indexedValue string) ([]string, error) { 203 c.lock.RLock() 204 defer c.lock.RUnlock() 205 206 indexFunc := c.indexers[indexName] 207 if indexFunc == nil { 208 return nil, fmt.Errorf("Index with name %s does not exist", indexName) 209 } 210 211 index := c.indices[indexName] 212 213 set := index[indexedValue] 214 return set.List(), nil 215} 216 217func (c *threadSafeMap) ListIndexFuncValues(indexName string) []string { 218 c.lock.RLock() 219 defer c.lock.RUnlock() 220 221 index := c.indices[indexName] 222 names := make([]string, 0, len(index)) 223 for key := range index { 224 names = append(names, key) 225 } 226 return names 227} 228 229func (c *threadSafeMap) GetIndexers() Indexers { 230 return c.indexers 231} 232 233func (c *threadSafeMap) AddIndexers(newIndexers Indexers) error { 234 c.lock.Lock() 235 defer c.lock.Unlock() 236 237 if len(c.items) > 0 { 238 return fmt.Errorf("cannot add indexers to running index") 239 } 240 241 oldKeys := sets.StringKeySet(c.indexers) 242 newKeys := sets.StringKeySet(newIndexers) 243 244 if oldKeys.HasAny(newKeys.List()...) { 245 return fmt.Errorf("indexer conflict: %v", oldKeys.Intersection(newKeys)) 246 } 247 248 for k, v := range newIndexers { 249 c.indexers[k] = v 250 } 251 return nil 252} 253 254// updateIndices modifies the objects location in the managed indexes, if this is an update, you must provide an oldObj 255// updateIndices must be called from a function that already has a lock on the cache 256func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) { 257 // if we got an old object, we need to remove it before we add it again 258 if oldObj != nil { 259 c.deleteFromIndices(oldObj, key) 260 } 261 for name, indexFunc := range c.indexers { 262 indexValues, err := indexFunc(newObj) 263 if err != nil { 264 panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err)) 265 } 266 index := c.indices[name] 267 if index == nil { 268 index = Index{} 269 c.indices[name] = index 270 } 271 272 for _, indexValue := range indexValues { 273 set := index[indexValue] 274 if set == nil { 275 set = sets.String{} 276 index[indexValue] = set 277 } 278 set.Insert(key) 279 } 280 } 281} 282 283// deleteFromIndices removes the object from each of the managed indexes 284// it is intended to be called from a function that already has a lock on the cache 285func (c *threadSafeMap) deleteFromIndices(obj interface{}, key string) { 286 for name, indexFunc := range c.indexers { 287 indexValues, err := indexFunc(obj) 288 if err != nil { 289 panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err)) 290 } 291 292 index := c.indices[name] 293 if index == nil { 294 continue 295 } 296 for _, indexValue := range indexValues { 297 set := index[indexValue] 298 if set != nil { 299 set.Delete(key) 300 301 // If we don't delete the set when zero, indices with high cardinality 302 // short lived resources can cause memory to increase over time from 303 // unused empty sets. See `kubernetes/kubernetes/issues/84959`. 304 if len(set) == 0 { 305 delete(index, indexValue) 306 } 307 } 308 } 309 } 310} 311 312func (c *threadSafeMap) Resync() error { 313 // Nothing to do 314 return nil 315} 316 317// NewThreadSafeStore creates a new instance of ThreadSafeStore. 318func NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore { 319 return &threadSafeMap{ 320 items: map[string]interface{}{}, 321 indexers: indexers, 322 indices: indices, 323 } 324} 325