1/* 2Copyright 2014 The Kubernetes Authors. 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package cache 18 19import ( 20 "errors" 21 "sync" 22 23 "k8s.io/apimachinery/pkg/util/sets" 24) 25 26// PopProcessFunc is passed to Pop() method of Queue interface. 27// It is supposed to process the accumulator popped from the queue. 28type PopProcessFunc func(interface{}) error 29 30// ErrRequeue may be returned by a PopProcessFunc to safely requeue 31// the current item. The value of Err will be returned from Pop. 32type ErrRequeue struct { 33 // Err is returned by the Pop function 34 Err error 35} 36 37// ErrFIFOClosed used when FIFO is closed 38var ErrFIFOClosed = errors.New("DeltaFIFO: manipulating with closed queue") 39 40func (e ErrRequeue) Error() string { 41 if e.Err == nil { 42 return "the popped item should be requeued without returning an error" 43 } 44 return e.Err.Error() 45} 46 47// Queue extends Store with a collection of Store keys to "process". 48// Every Add, Update, or Delete may put the object's key in that collection. 49// A Queue has a way to derive the corresponding key given an accumulator. 50// A Queue can be accessed concurrently from multiple goroutines. 51// A Queue can be "closed", after which Pop operations return an error. 52type Queue interface { 53 Store 54 55 // Pop blocks until there is at least one key to process or the 56 // Queue is closed. In the latter case Pop returns with an error. 57 // In the former case Pop atomically picks one key to process, 58 // removes that (key, accumulator) association from the Store, and 59 // processes the accumulator. Pop returns the accumulator that 60 // was processed and the result of processing. The PopProcessFunc 61 // may return an ErrRequeue{inner} and in this case Pop will (a) 62 // return that (key, accumulator) association to the Queue as part 63 // of the atomic processing and (b) return the inner error from 64 // Pop. 65 Pop(PopProcessFunc) (interface{}, error) 66 67 // AddIfNotPresent puts the given accumulator into the Queue (in 68 // association with the accumulator's key) if and only if that key 69 // is not already associated with a non-empty accumulator. 70 AddIfNotPresent(interface{}) error 71 72 // HasSynced returns true if the first batch of keys have all been 73 // popped. The first batch of keys are those of the first Replace 74 // operation if that happened before any Add, AddIfNotPresent, 75 // Update, or Delete; otherwise the first batch is empty. 76 HasSynced() bool 77 78 // Close the queue 79 Close() 80} 81 82// Pop is helper function for popping from Queue. 83// WARNING: Do NOT use this function in non-test code to avoid races 84// unless you really really really really know what you are doing. 85func Pop(queue Queue) interface{} { 86 var result interface{} 87 queue.Pop(func(obj interface{}) error { 88 result = obj 89 return nil 90 }) 91 return result 92} 93 94// FIFO is a Queue in which (a) each accumulator is simply the most 95// recently provided object and (b) the collection of keys to process 96// is a FIFO. The accumulators all start out empty, and deleting an 97// object from its accumulator empties the accumulator. The Resync 98// operation is a no-op. 99// 100// Thus: if multiple adds/updates of a single object happen while that 101// object's key is in the queue before it has been processed then it 102// will only be processed once, and when it is processed the most 103// recent version will be processed. This can't be done with a channel 104// 105// FIFO solves this use case: 106// * You want to process every object (exactly) once. 107// * You want to process the most recent version of the object when you process it. 108// * You do not want to process deleted objects, they should be removed from the queue. 109// * You do not want to periodically reprocess objects. 110// Compare with DeltaFIFO for other use cases. 111type FIFO struct { 112 lock sync.RWMutex 113 cond sync.Cond 114 // We depend on the property that every key in `items` is also in `queue` 115 items map[string]interface{} 116 queue []string 117 118 // populated is true if the first batch of items inserted by Replace() has been populated 119 // or Delete/Add/Update was called first. 120 populated bool 121 // initialPopulationCount is the number of items inserted by the first call of Replace() 122 initialPopulationCount int 123 124 // keyFunc is used to make the key used for queued item insertion and retrieval, and 125 // should be deterministic. 126 keyFunc KeyFunc 127 128 // Indication the queue is closed. 129 // Used to indicate a queue is closed so a control loop can exit when a queue is empty. 130 // Currently, not used to gate any of CRED operations. 131 closed bool 132} 133 134var ( 135 _ = Queue(&FIFO{}) // FIFO is a Queue 136) 137 138// Close the queue. 139func (f *FIFO) Close() { 140 f.lock.Lock() 141 defer f.lock.Unlock() 142 f.closed = true 143 f.cond.Broadcast() 144} 145 146// HasSynced returns true if an Add/Update/Delete/AddIfNotPresent are called first, 147// or the first batch of items inserted by Replace() has been popped. 148func (f *FIFO) HasSynced() bool { 149 f.lock.Lock() 150 defer f.lock.Unlock() 151 return f.populated && f.initialPopulationCount == 0 152} 153 154// Add inserts an item, and puts it in the queue. The item is only enqueued 155// if it doesn't already exist in the set. 156func (f *FIFO) Add(obj interface{}) error { 157 id, err := f.keyFunc(obj) 158 if err != nil { 159 return KeyError{obj, err} 160 } 161 f.lock.Lock() 162 defer f.lock.Unlock() 163 f.populated = true 164 if _, exists := f.items[id]; !exists { 165 f.queue = append(f.queue, id) 166 } 167 f.items[id] = obj 168 f.cond.Broadcast() 169 return nil 170} 171 172// AddIfNotPresent inserts an item, and puts it in the queue. If the item is already 173// present in the set, it is neither enqueued nor added to the set. 174// 175// This is useful in a single producer/consumer scenario so that the consumer can 176// safely retry items without contending with the producer and potentially enqueueing 177// stale items. 178func (f *FIFO) AddIfNotPresent(obj interface{}) error { 179 id, err := f.keyFunc(obj) 180 if err != nil { 181 return KeyError{obj, err} 182 } 183 f.lock.Lock() 184 defer f.lock.Unlock() 185 f.addIfNotPresent(id, obj) 186 return nil 187} 188 189// addIfNotPresent assumes the fifo lock is already held and adds the provided 190// item to the queue under id if it does not already exist. 191func (f *FIFO) addIfNotPresent(id string, obj interface{}) { 192 f.populated = true 193 if _, exists := f.items[id]; exists { 194 return 195 } 196 197 f.queue = append(f.queue, id) 198 f.items[id] = obj 199 f.cond.Broadcast() 200} 201 202// Update is the same as Add in this implementation. 203func (f *FIFO) Update(obj interface{}) error { 204 return f.Add(obj) 205} 206 207// Delete removes an item. It doesn't add it to the queue, because 208// this implementation assumes the consumer only cares about the objects, 209// not the order in which they were created/added. 210func (f *FIFO) Delete(obj interface{}) error { 211 id, err := f.keyFunc(obj) 212 if err != nil { 213 return KeyError{obj, err} 214 } 215 f.lock.Lock() 216 defer f.lock.Unlock() 217 f.populated = true 218 delete(f.items, id) 219 return err 220} 221 222// List returns a list of all the items. 223func (f *FIFO) List() []interface{} { 224 f.lock.RLock() 225 defer f.lock.RUnlock() 226 list := make([]interface{}, 0, len(f.items)) 227 for _, item := range f.items { 228 list = append(list, item) 229 } 230 return list 231} 232 233// ListKeys returns a list of all the keys of the objects currently 234// in the FIFO. 235func (f *FIFO) ListKeys() []string { 236 f.lock.RLock() 237 defer f.lock.RUnlock() 238 list := make([]string, 0, len(f.items)) 239 for key := range f.items { 240 list = append(list, key) 241 } 242 return list 243} 244 245// Get returns the requested item, or sets exists=false. 246func (f *FIFO) Get(obj interface{}) (item interface{}, exists bool, err error) { 247 key, err := f.keyFunc(obj) 248 if err != nil { 249 return nil, false, KeyError{obj, err} 250 } 251 return f.GetByKey(key) 252} 253 254// GetByKey returns the requested item, or sets exists=false. 255func (f *FIFO) GetByKey(key string) (item interface{}, exists bool, err error) { 256 f.lock.RLock() 257 defer f.lock.RUnlock() 258 item, exists = f.items[key] 259 return item, exists, nil 260} 261 262// IsClosed checks if the queue is closed 263func (f *FIFO) IsClosed() bool { 264 f.lock.Lock() 265 defer f.lock.Unlock() 266 return f.closed 267} 268 269// Pop waits until an item is ready and processes it. If multiple items are 270// ready, they are returned in the order in which they were added/updated. 271// The item is removed from the queue (and the store) before it is processed, 272// so if you don't successfully process it, it should be added back with 273// AddIfNotPresent(). process function is called under lock, so it is safe 274// update data structures in it that need to be in sync with the queue. 275func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) { 276 f.lock.Lock() 277 defer f.lock.Unlock() 278 for { 279 for len(f.queue) == 0 { 280 // When the queue is empty, invocation of Pop() is blocked until new item is enqueued. 281 // When Close() is called, the f.closed is set and the condition is broadcasted. 282 // Which causes this loop to continue and return from the Pop(). 283 if f.closed { 284 return nil, ErrFIFOClosed 285 } 286 287 f.cond.Wait() 288 } 289 id := f.queue[0] 290 f.queue = f.queue[1:] 291 if f.initialPopulationCount > 0 { 292 f.initialPopulationCount-- 293 } 294 item, ok := f.items[id] 295 if !ok { 296 // Item may have been deleted subsequently. 297 continue 298 } 299 delete(f.items, id) 300 err := process(item) 301 if e, ok := err.(ErrRequeue); ok { 302 f.addIfNotPresent(id, item) 303 err = e.Err 304 } 305 return item, err 306 } 307} 308 309// Replace will delete the contents of 'f', using instead the given map. 310// 'f' takes ownership of the map, you should not reference the map again 311// after calling this function. f's queue is reset, too; upon return, it 312// will contain the items in the map, in no particular order. 313func (f *FIFO) Replace(list []interface{}, resourceVersion string) error { 314 items := make(map[string]interface{}, len(list)) 315 for _, item := range list { 316 key, err := f.keyFunc(item) 317 if err != nil { 318 return KeyError{item, err} 319 } 320 items[key] = item 321 } 322 323 f.lock.Lock() 324 defer f.lock.Unlock() 325 326 if !f.populated { 327 f.populated = true 328 f.initialPopulationCount = len(items) 329 } 330 331 f.items = items 332 f.queue = f.queue[:0] 333 for id := range items { 334 f.queue = append(f.queue, id) 335 } 336 if len(f.queue) > 0 { 337 f.cond.Broadcast() 338 } 339 return nil 340} 341 342// Resync will ensure that every object in the Store has its key in the queue. 343// This should be a no-op, because that property is maintained by all operations. 344func (f *FIFO) Resync() error { 345 f.lock.Lock() 346 defer f.lock.Unlock() 347 348 inQueue := sets.NewString() 349 for _, id := range f.queue { 350 inQueue.Insert(id) 351 } 352 for id := range f.items { 353 if !inQueue.Has(id) { 354 f.queue = append(f.queue, id) 355 } 356 } 357 if len(f.queue) > 0 { 358 f.cond.Broadcast() 359 } 360 return nil 361} 362 363// NewFIFO returns a Store which can be used to queue up items to 364// process. 365func NewFIFO(keyFunc KeyFunc) *FIFO { 366 f := &FIFO{ 367 items: map[string]interface{}{}, 368 queue: []string{}, 369 keyFunc: keyFunc, 370 } 371 f.cond.L = &f.lock 372 return f 373} 374