1/* 2Copyright 2014 The Kubernetes Authors. 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package cache 18 19import ( 20 "errors" 21 "fmt" 22 "sync" 23 24 "k8s.io/apimachinery/pkg/util/sets" 25 26 "k8s.io/klog" 27) 28 29// NewDeltaFIFO returns a Store which can be used process changes to items. 30// 31// keyFunc is used to figure out what key an object should have. (It's 32// exposed in the returned DeltaFIFO's KeyOf() method, with bonus features.) 33// 34// 'keyLister' is expected to return a list of keys that the consumer of 35// this queue "knows about". It is used to decide which items are missing 36// when Replace() is called; 'Deleted' deltas are produced for these items. 37// It may be nil if you don't need to detect all deletions. 38// TODO: consider merging keyLister with this object, tracking a list of 39// "known" keys when Pop() is called. Have to think about how that 40// affects error retrying. 41// NOTE: It is possible to misuse this and cause a race when using an 42// external known object source. 43// Whether there is a potential race depends on how the comsumer 44// modifies knownObjects. In Pop(), process function is called under 45// lock, so it is safe to update data structures in it that need to be 46// in sync with the queue (e.g. knownObjects). 47// 48// Example: 49// In case of sharedIndexInformer being a consumer 50// (https://github.com/kubernetes/kubernetes/blob/0cdd940f/staging/ 51// src/k8s.io/client-go/tools/cache/shared_informer.go#L192), 52// there is no race as knownObjects (s.indexer) is modified safely 53// under DeltaFIFO's lock. The only exceptions are GetStore() and 54// GetIndexer() methods, which expose ways to modify the underlying 55// storage. Currently these two methods are used for creating Lister 56// and internal tests. 57// 58// Also see the comment on DeltaFIFO. 59func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO { 60 f := &DeltaFIFO{ 61 items: map[string]Deltas{}, 62 queue: []string{}, 63 keyFunc: keyFunc, 64 knownObjects: knownObjects, 65 } 66 f.cond.L = &f.lock 67 return f 68} 69 70// DeltaFIFO is like FIFO, but allows you to process deletes. 71// 72// DeltaFIFO is a producer-consumer queue, where a Reflector is 73// intended to be the producer, and the consumer is whatever calls 74// the Pop() method. 75// 76// DeltaFIFO solves this use case: 77// * You want to process every object change (delta) at most once. 78// * When you process an object, you want to see everything 79// that's happened to it since you last processed it. 80// * You want to process the deletion of objects. 81// * You might want to periodically reprocess objects. 82// 83// DeltaFIFO's Pop(), Get(), and GetByKey() methods return 84// interface{} to satisfy the Store/Queue interfaces, but it 85// will always return an object of type Deltas. 86// 87// A note on threading: If you call Pop() in parallel from multiple 88// threads, you could end up with multiple threads processing slightly 89// different versions of the same object. 90// 91// A note on the KeyLister used by the DeltaFIFO: It's main purpose is 92// to list keys that are "known", for the purpose of figuring out which 93// items have been deleted when Replace() or Delete() are called. The deleted 94// object will be included in the DeleteFinalStateUnknown markers. These objects 95// could be stale. 96type DeltaFIFO struct { 97 // lock/cond protects access to 'items' and 'queue'. 98 lock sync.RWMutex 99 cond sync.Cond 100 101 // We depend on the property that items in the set are in 102 // the queue and vice versa, and that all Deltas in this 103 // map have at least one Delta. 104 items map[string]Deltas 105 queue []string 106 107 // populated is true if the first batch of items inserted by Replace() has been populated 108 // or Delete/Add/Update was called first. 109 populated bool 110 // initialPopulationCount is the number of items inserted by the first call of Replace() 111 initialPopulationCount int 112 113 // keyFunc is used to make the key used for queued item 114 // insertion and retrieval, and should be deterministic. 115 keyFunc KeyFunc 116 117 // knownObjects list keys that are "known", for the 118 // purpose of figuring out which items have been deleted 119 // when Replace() or Delete() is called. 120 knownObjects KeyListerGetter 121 122 // Indication the queue is closed. 123 // Used to indicate a queue is closed so a control loop can exit when a queue is empty. 124 // Currently, not used to gate any of CRED operations. 125 closed bool 126 closedLock sync.Mutex 127} 128 129var ( 130 _ = Queue(&DeltaFIFO{}) // DeltaFIFO is a Queue 131) 132 133var ( 134 // ErrZeroLengthDeltasObject is returned in a KeyError if a Deltas 135 // object with zero length is encountered (should be impossible, 136 // but included for completeness). 137 ErrZeroLengthDeltasObject = errors.New("0 length Deltas object; can't get key") 138) 139 140// Close the queue. 141func (f *DeltaFIFO) Close() { 142 f.closedLock.Lock() 143 defer f.closedLock.Unlock() 144 f.closed = true 145 f.cond.Broadcast() 146} 147 148// KeyOf exposes f's keyFunc, but also detects the key of a Deltas object or 149// DeletedFinalStateUnknown objects. 150func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error) { 151 if d, ok := obj.(Deltas); ok { 152 if len(d) == 0 { 153 return "", KeyError{obj, ErrZeroLengthDeltasObject} 154 } 155 obj = d.Newest().Object 156 } 157 if d, ok := obj.(DeletedFinalStateUnknown); ok { 158 return d.Key, nil 159 } 160 return f.keyFunc(obj) 161} 162 163// Return true if an Add/Update/Delete/AddIfNotPresent are called first, 164// or an Update called first but the first batch of items inserted by Replace() has been popped 165func (f *DeltaFIFO) HasSynced() bool { 166 f.lock.Lock() 167 defer f.lock.Unlock() 168 return f.populated && f.initialPopulationCount == 0 169} 170 171// Add inserts an item, and puts it in the queue. The item is only enqueued 172// if it doesn't already exist in the set. 173func (f *DeltaFIFO) Add(obj interface{}) error { 174 f.lock.Lock() 175 defer f.lock.Unlock() 176 f.populated = true 177 return f.queueActionLocked(Added, obj) 178} 179 180// Update is just like Add, but makes an Updated Delta. 181func (f *DeltaFIFO) Update(obj interface{}) error { 182 f.lock.Lock() 183 defer f.lock.Unlock() 184 f.populated = true 185 return f.queueActionLocked(Updated, obj) 186} 187 188// Delete is just like Add, but makes an Deleted Delta. If the item does not 189// already exist, it will be ignored. (It may have already been deleted by a 190// Replace (re-list), for example. 191func (f *DeltaFIFO) Delete(obj interface{}) error { 192 id, err := f.KeyOf(obj) 193 if err != nil { 194 return KeyError{obj, err} 195 } 196 f.lock.Lock() 197 defer f.lock.Unlock() 198 f.populated = true 199 if f.knownObjects == nil { 200 if _, exists := f.items[id]; !exists { 201 // Presumably, this was deleted when a relist happened. 202 // Don't provide a second report of the same deletion. 203 return nil 204 } 205 } else { 206 // We only want to skip the "deletion" action if the object doesn't 207 // exist in knownObjects and it doesn't have corresponding item in items. 208 // Note that even if there is a "deletion" action in items, we can ignore it, 209 // because it will be deduped automatically in "queueActionLocked" 210 _, exists, err := f.knownObjects.GetByKey(id) 211 _, itemsExist := f.items[id] 212 if err == nil && !exists && !itemsExist { 213 // Presumably, this was deleted when a relist happened. 214 // Don't provide a second report of the same deletion. 215 return nil 216 } 217 } 218 219 return f.queueActionLocked(Deleted, obj) 220} 221 222// AddIfNotPresent inserts an item, and puts it in the queue. If the item is already 223// present in the set, it is neither enqueued nor added to the set. 224// 225// This is useful in a single producer/consumer scenario so that the consumer can 226// safely retry items without contending with the producer and potentially enqueueing 227// stale items. 228// 229// Important: obj must be a Deltas (the output of the Pop() function). Yes, this is 230// different from the Add/Update/Delete functions. 231func (f *DeltaFIFO) AddIfNotPresent(obj interface{}) error { 232 deltas, ok := obj.(Deltas) 233 if !ok { 234 return fmt.Errorf("object must be of type deltas, but got: %#v", obj) 235 } 236 id, err := f.KeyOf(deltas.Newest().Object) 237 if err != nil { 238 return KeyError{obj, err} 239 } 240 f.lock.Lock() 241 defer f.lock.Unlock() 242 f.addIfNotPresent(id, deltas) 243 return nil 244} 245 246// addIfNotPresent inserts deltas under id if it does not exist, and assumes the caller 247// already holds the fifo lock. 248func (f *DeltaFIFO) addIfNotPresent(id string, deltas Deltas) { 249 f.populated = true 250 if _, exists := f.items[id]; exists { 251 return 252 } 253 254 f.queue = append(f.queue, id) 255 f.items[id] = deltas 256 f.cond.Broadcast() 257} 258 259// re-listing and watching can deliver the same update multiple times in any 260// order. This will combine the most recent two deltas if they are the same. 261func dedupDeltas(deltas Deltas) Deltas { 262 n := len(deltas) 263 if n < 2 { 264 return deltas 265 } 266 a := &deltas[n-1] 267 b := &deltas[n-2] 268 if out := isDup(a, b); out != nil { 269 d := append(Deltas{}, deltas[:n-2]...) 270 return append(d, *out) 271 } 272 return deltas 273} 274 275// If a & b represent the same event, returns the delta that ought to be kept. 276// Otherwise, returns nil. 277// TODO: is there anything other than deletions that need deduping? 278func isDup(a, b *Delta) *Delta { 279 if out := isDeletionDup(a, b); out != nil { 280 return out 281 } 282 // TODO: Detect other duplicate situations? Are there any? 283 return nil 284} 285 286// keep the one with the most information if both are deletions. 287func isDeletionDup(a, b *Delta) *Delta { 288 if b.Type != Deleted || a.Type != Deleted { 289 return nil 290 } 291 // Do more sophisticated checks, or is this sufficient? 292 if _, ok := b.Object.(DeletedFinalStateUnknown); ok { 293 return a 294 } 295 return b 296} 297 298// willObjectBeDeletedLocked returns true only if the last delta for the 299// given object is Delete. Caller must lock first. 300func (f *DeltaFIFO) willObjectBeDeletedLocked(id string) bool { 301 deltas := f.items[id] 302 return len(deltas) > 0 && deltas[len(deltas)-1].Type == Deleted 303} 304 305// queueActionLocked appends to the delta list for the object. 306// Caller must lock first. 307func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error { 308 id, err := f.KeyOf(obj) 309 if err != nil { 310 return KeyError{obj, err} 311 } 312 313 // If object is supposed to be deleted (last event is Deleted), 314 // then we should ignore Sync events, because it would result in 315 // recreation of this object. 316 if actionType == Sync && f.willObjectBeDeletedLocked(id) { 317 return nil 318 } 319 320 newDeltas := append(f.items[id], Delta{actionType, obj}) 321 newDeltas = dedupDeltas(newDeltas) 322 323 if len(newDeltas) > 0 { 324 if _, exists := f.items[id]; !exists { 325 f.queue = append(f.queue, id) 326 } 327 f.items[id] = newDeltas 328 f.cond.Broadcast() 329 } else { 330 // We need to remove this from our map (extra items in the queue are 331 // ignored if they are not in the map). 332 delete(f.items, id) 333 } 334 return nil 335} 336 337// List returns a list of all the items; it returns the object 338// from the most recent Delta. 339// You should treat the items returned inside the deltas as immutable. 340func (f *DeltaFIFO) List() []interface{} { 341 f.lock.RLock() 342 defer f.lock.RUnlock() 343 return f.listLocked() 344} 345 346func (f *DeltaFIFO) listLocked() []interface{} { 347 list := make([]interface{}, 0, len(f.items)) 348 for _, item := range f.items { 349 list = append(list, item.Newest().Object) 350 } 351 return list 352} 353 354// ListKeys returns a list of all the keys of the objects currently 355// in the FIFO. 356func (f *DeltaFIFO) ListKeys() []string { 357 f.lock.RLock() 358 defer f.lock.RUnlock() 359 list := make([]string, 0, len(f.items)) 360 for key := range f.items { 361 list = append(list, key) 362 } 363 return list 364} 365 366// Get returns the complete list of deltas for the requested item, 367// or sets exists=false. 368// You should treat the items returned inside the deltas as immutable. 369func (f *DeltaFIFO) Get(obj interface{}) (item interface{}, exists bool, err error) { 370 key, err := f.KeyOf(obj) 371 if err != nil { 372 return nil, false, KeyError{obj, err} 373 } 374 return f.GetByKey(key) 375} 376 377// GetByKey returns the complete list of deltas for the requested item, 378// setting exists=false if that list is empty. 379// You should treat the items returned inside the deltas as immutable. 380func (f *DeltaFIFO) GetByKey(key string) (item interface{}, exists bool, err error) { 381 f.lock.RLock() 382 defer f.lock.RUnlock() 383 d, exists := f.items[key] 384 if exists { 385 // Copy item's slice so operations on this slice 386 // won't interfere with the object we return. 387 d = copyDeltas(d) 388 } 389 return d, exists, nil 390} 391 392// Checks if the queue is closed 393func (f *DeltaFIFO) IsClosed() bool { 394 f.closedLock.Lock() 395 defer f.closedLock.Unlock() 396 return f.closed 397} 398 399// Pop blocks until an item is added to the queue, and then returns it. If 400// multiple items are ready, they are returned in the order in which they were 401// added/updated. The item is removed from the queue (and the store) before it 402// is returned, so if you don't successfully process it, you need to add it back 403// with AddIfNotPresent(). 404// process function is called under lock, so it is safe update data structures 405// in it that need to be in sync with the queue (e.g. knownKeys). The PopProcessFunc 406// may return an instance of ErrRequeue with a nested error to indicate the current 407// item should be requeued (equivalent to calling AddIfNotPresent under the lock). 408// 409// Pop returns a 'Deltas', which has a complete list of all the things 410// that happened to the object (deltas) while it was sitting in the queue. 411func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) { 412 f.lock.Lock() 413 defer f.lock.Unlock() 414 for { 415 for len(f.queue) == 0 { 416 // When the queue is empty, invocation of Pop() is blocked until new item is enqueued. 417 // When Close() is called, the f.closed is set and the condition is broadcasted. 418 // Which causes this loop to continue and return from the Pop(). 419 if f.IsClosed() { 420 return nil, FIFOClosedError 421 } 422 423 f.cond.Wait() 424 } 425 id := f.queue[0] 426 f.queue = f.queue[1:] 427 if f.initialPopulationCount > 0 { 428 f.initialPopulationCount-- 429 } 430 item, ok := f.items[id] 431 if !ok { 432 // Item may have been deleted subsequently. 433 continue 434 } 435 delete(f.items, id) 436 err := process(item) 437 if e, ok := err.(ErrRequeue); ok { 438 f.addIfNotPresent(id, item) 439 err = e.Err 440 } 441 // Don't need to copyDeltas here, because we're transferring 442 // ownership to the caller. 443 return item, err 444 } 445} 446 447// Replace will delete the contents of 'f', using instead the given map. 448// 'f' takes ownership of the map, you should not reference the map again 449// after calling this function. f's queue is reset, too; upon return, it 450// will contain the items in the map, in no particular order. 451func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error { 452 f.lock.Lock() 453 defer f.lock.Unlock() 454 keys := make(sets.String, len(list)) 455 456 for _, item := range list { 457 key, err := f.KeyOf(item) 458 if err != nil { 459 return KeyError{item, err} 460 } 461 keys.Insert(key) 462 if err := f.queueActionLocked(Sync, item); err != nil { 463 return fmt.Errorf("couldn't enqueue object: %v", err) 464 } 465 } 466 467 if f.knownObjects == nil { 468 // Do deletion detection against our own list. 469 queuedDeletions := 0 470 for k, oldItem := range f.items { 471 if keys.Has(k) { 472 continue 473 } 474 var deletedObj interface{} 475 if n := oldItem.Newest(); n != nil { 476 deletedObj = n.Object 477 } 478 queuedDeletions++ 479 if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil { 480 return err 481 } 482 } 483 484 if !f.populated { 485 f.populated = true 486 // While there shouldn't be any queued deletions in the initial 487 // population of the queue, it's better to be on the safe side. 488 f.initialPopulationCount = len(list) + queuedDeletions 489 } 490 491 return nil 492 } 493 494 // Detect deletions not already in the queue. 495 knownKeys := f.knownObjects.ListKeys() 496 queuedDeletions := 0 497 for _, k := range knownKeys { 498 if keys.Has(k) { 499 continue 500 } 501 502 deletedObj, exists, err := f.knownObjects.GetByKey(k) 503 if err != nil { 504 deletedObj = nil 505 klog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k) 506 } else if !exists { 507 deletedObj = nil 508 klog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k) 509 } 510 queuedDeletions++ 511 if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil { 512 return err 513 } 514 } 515 516 if !f.populated { 517 f.populated = true 518 f.initialPopulationCount = len(list) + queuedDeletions 519 } 520 521 return nil 522} 523 524// Resync will send a sync event for each item 525func (f *DeltaFIFO) Resync() error { 526 f.lock.Lock() 527 defer f.lock.Unlock() 528 529 if f.knownObjects == nil { 530 return nil 531 } 532 533 keys := f.knownObjects.ListKeys() 534 for _, k := range keys { 535 if err := f.syncKeyLocked(k); err != nil { 536 return err 537 } 538 } 539 return nil 540} 541 542func (f *DeltaFIFO) syncKey(key string) error { 543 f.lock.Lock() 544 defer f.lock.Unlock() 545 546 return f.syncKeyLocked(key) 547} 548 549func (f *DeltaFIFO) syncKeyLocked(key string) error { 550 obj, exists, err := f.knownObjects.GetByKey(key) 551 if err != nil { 552 klog.Errorf("Unexpected error %v during lookup of key %v, unable to queue object for sync", err, key) 553 return nil 554 } else if !exists { 555 klog.Infof("Key %v does not exist in known objects store, unable to queue object for sync", key) 556 return nil 557 } 558 559 // If we are doing Resync() and there is already an event queued for that object, 560 // we ignore the Resync for it. This is to avoid the race, in which the resync 561 // comes with the previous value of object (since queueing an event for the object 562 // doesn't trigger changing the underlying store <knownObjects>. 563 id, err := f.KeyOf(obj) 564 if err != nil { 565 return KeyError{obj, err} 566 } 567 if len(f.items[id]) > 0 { 568 return nil 569 } 570 571 if err := f.queueActionLocked(Sync, obj); err != nil { 572 return fmt.Errorf("couldn't queue object: %v", err) 573 } 574 return nil 575} 576 577// A KeyListerGetter is anything that knows how to list its keys and look up by key. 578type KeyListerGetter interface { 579 KeyLister 580 KeyGetter 581} 582 583// A KeyLister is anything that knows how to list its keys. 584type KeyLister interface { 585 ListKeys() []string 586} 587 588// A KeyGetter is anything that knows how to get the value stored under a given key. 589type KeyGetter interface { 590 GetByKey(key string) (interface{}, bool, error) 591} 592 593// DeltaType is the type of a change (addition, deletion, etc) 594type DeltaType string 595 596const ( 597 Added DeltaType = "Added" 598 Updated DeltaType = "Updated" 599 Deleted DeltaType = "Deleted" 600 // The other types are obvious. You'll get Sync deltas when: 601 // * A watch expires/errors out and a new list/watch cycle is started. 602 // * You've turned on periodic syncs. 603 // (Anything that trigger's DeltaFIFO's Replace() method.) 604 Sync DeltaType = "Sync" 605) 606 607// Delta is the type stored by a DeltaFIFO. It tells you what change 608// happened, and the object's state after* that change. 609// 610// [*] Unless the change is a deletion, and then you'll get the final 611// state of the object before it was deleted. 612type Delta struct { 613 Type DeltaType 614 Object interface{} 615} 616 617// Deltas is a list of one or more 'Delta's to an individual object. 618// The oldest delta is at index 0, the newest delta is the last one. 619type Deltas []Delta 620 621// Oldest is a convenience function that returns the oldest delta, or 622// nil if there are no deltas. 623func (d Deltas) Oldest() *Delta { 624 if len(d) > 0 { 625 return &d[0] 626 } 627 return nil 628} 629 630// Newest is a convenience function that returns the newest delta, or 631// nil if there are no deltas. 632func (d Deltas) Newest() *Delta { 633 if n := len(d); n > 0 { 634 return &d[n-1] 635 } 636 return nil 637} 638 639// copyDeltas returns a shallow copy of d; that is, it copies the slice but not 640// the objects in the slice. This allows Get/List to return an object that we 641// know won't be clobbered by a subsequent modifications. 642func copyDeltas(d Deltas) Deltas { 643 d2 := make(Deltas, len(d)) 644 copy(d2, d) 645 return d2 646} 647 648// DeletedFinalStateUnknown is placed into a DeltaFIFO in the case where 649// an object was deleted but the watch deletion event was missed. In this 650// case we don't know the final "resting" state of the object, so there's 651// a chance the included `Obj` is stale. 652type DeletedFinalStateUnknown struct { 653 Key string 654 Obj interface{} 655} 656