1/* 2Copyright 2014 The Kubernetes Authors. 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package cache 18 19import ( 20 "errors" 21 "fmt" 22 "sync" 23 24 "k8s.io/apimachinery/pkg/util/sets" 25 26 "k8s.io/klog/v2" 27) 28 29// DeltaFIFOOptions is the configuration parameters for DeltaFIFO. All are 30// optional. 31type DeltaFIFOOptions struct { 32 33 // KeyFunction is used to figure out what key an object should have. (It's 34 // exposed in the returned DeltaFIFO's KeyOf() method, with additional 35 // handling around deleted objects and queue state). 36 // Optional, the default is MetaNamespaceKeyFunc. 37 KeyFunction KeyFunc 38 39 // KnownObjects is expected to return a list of keys that the consumer of 40 // this queue "knows about". It is used to decide which items are missing 41 // when Replace() is called; 'Deleted' deltas are produced for the missing items. 42 // KnownObjects may be nil if you can tolerate missing deletions on Replace(). 43 KnownObjects KeyListerGetter 44 45 // EmitDeltaTypeReplaced indicates that the queue consumer 46 // understands the Replaced DeltaType. Before the `Replaced` event type was 47 // added, calls to Replace() were handled the same as Sync(). For 48 // backwards-compatibility purposes, this is false by default. 49 // When true, `Replaced` events will be sent for items passed to a Replace() call. 50 // When false, `Sync` events will be sent instead. 51 EmitDeltaTypeReplaced bool 52} 53 54// DeltaFIFO is like FIFO, but differs in two ways. One is that the 55// accumulator associated with a given object's key is not that object 56// but rather a Deltas, which is a slice of Delta values for that 57// object. Applying an object to a Deltas means to append a Delta 58// except when the potentially appended Delta is a Deleted and the 59// Deltas already ends with a Deleted. In that case the Deltas does 60// not grow, although the terminal Deleted will be replaced by the new 61// Deleted if the older Deleted's object is a 62// DeletedFinalStateUnknown. 63// 64// The other difference is that DeltaFIFO has two additional ways that 65// an object can be applied to an accumulator: Replaced and Sync. 66// If EmitDeltaTypeReplaced is not set to true, Sync will be used in 67// replace events for backwards compatibility. Sync is used for periodic 68// resync events. 69// 70// DeltaFIFO is a producer-consumer queue, where a Reflector is 71// intended to be the producer, and the consumer is whatever calls 72// the Pop() method. 73// 74// DeltaFIFO solves this use case: 75// * You want to process every object change (delta) at most once. 76// * When you process an object, you want to see everything 77// that's happened to it since you last processed it. 78// * You want to process the deletion of some of the objects. 79// * You might want to periodically reprocess objects. 80// 81// DeltaFIFO's Pop(), Get(), and GetByKey() methods return 82// interface{} to satisfy the Store/Queue interfaces, but they 83// will always return an object of type Deltas. List() returns 84// the newest object from each accumulator in the FIFO. 85// 86// A DeltaFIFO's knownObjects KeyListerGetter provides the abilities 87// to list Store keys and to get objects by Store key. The objects in 88// question are called "known objects" and this set of objects 89// modifies the behavior of the Delete, Replace, and Resync methods 90// (each in a different way). 91// 92// A note on threading: If you call Pop() in parallel from multiple 93// threads, you could end up with multiple threads processing slightly 94// different versions of the same object. 95type DeltaFIFO struct { 96 // lock/cond protects access to 'items' and 'queue'. 97 lock sync.RWMutex 98 cond sync.Cond 99 100 // `items` maps a key to a Deltas. 101 // Each such Deltas has at least one Delta. 102 items map[string]Deltas 103 104 // `queue` maintains FIFO order of keys for consumption in Pop(). 105 // There are no duplicates in `queue`. 106 // A key is in `queue` if and only if it is in `items`. 107 queue []string 108 109 // populated is true if the first batch of items inserted by Replace() has been populated 110 // or Delete/Add/Update/AddIfNotPresent was called first. 111 populated bool 112 // initialPopulationCount is the number of items inserted by the first call of Replace() 113 initialPopulationCount int 114 115 // keyFunc is used to make the key used for queued item 116 // insertion and retrieval, and should be deterministic. 117 keyFunc KeyFunc 118 119 // knownObjects list keys that are "known" --- affecting Delete(), 120 // Replace(), and Resync() 121 knownObjects KeyListerGetter 122 123 // Used to indicate a queue is closed so a control loop can exit when a queue is empty. 124 // Currently, not used to gate any of CRED operations. 125 closed bool 126 127 // emitDeltaTypeReplaced is whether to emit the Replaced or Sync 128 // DeltaType when Replace() is called (to preserve backwards compat). 129 emitDeltaTypeReplaced bool 130} 131 132// DeltaType is the type of a change (addition, deletion, etc) 133type DeltaType string 134 135// Change type definition 136const ( 137 Added DeltaType = "Added" 138 Updated DeltaType = "Updated" 139 Deleted DeltaType = "Deleted" 140 // Replaced is emitted when we encountered watch errors and had to do a 141 // relist. We don't know if the replaced object has changed. 142 // 143 // NOTE: Previous versions of DeltaFIFO would use Sync for Replace events 144 // as well. Hence, Replaced is only emitted when the option 145 // EmitDeltaTypeReplaced is true. 146 Replaced DeltaType = "Replaced" 147 // Sync is for synthetic events during a periodic resync. 148 Sync DeltaType = "Sync" 149) 150 151// Delta is a member of Deltas (a list of Delta objects) which 152// in its turn is the type stored by a DeltaFIFO. It tells you what 153// change happened, and the object's state after* that change. 154// 155// [*] Unless the change is a deletion, and then you'll get the final 156// state of the object before it was deleted. 157type Delta struct { 158 Type DeltaType 159 Object interface{} 160} 161 162// Deltas is a list of one or more 'Delta's to an individual object. 163// The oldest delta is at index 0, the newest delta is the last one. 164type Deltas []Delta 165 166// NewDeltaFIFO returns a Queue which can be used to process changes to items. 167// 168// keyFunc is used to figure out what key an object should have. (It is 169// exposed in the returned DeltaFIFO's KeyOf() method, with additional handling 170// around deleted objects and queue state). 171// 172// 'knownObjects' may be supplied to modify the behavior of Delete, 173// Replace, and Resync. It may be nil if you do not need those 174// modifications. 175// 176// TODO: consider merging keyLister with this object, tracking a list of 177// "known" keys when Pop() is called. Have to think about how that 178// affects error retrying. 179// 180// NOTE: It is possible to misuse this and cause a race when using an 181// external known object source. 182// Whether there is a potential race depends on how the consumer 183// modifies knownObjects. In Pop(), process function is called under 184// lock, so it is safe to update data structures in it that need to be 185// in sync with the queue (e.g. knownObjects). 186// 187// Example: 188// In case of sharedIndexInformer being a consumer 189// (https://github.com/kubernetes/kubernetes/blob/0cdd940f/staging/src/k8s.io/client-go/tools/cache/shared_informer.go#L192), 190// there is no race as knownObjects (s.indexer) is modified safely 191// under DeltaFIFO's lock. The only exceptions are GetStore() and 192// GetIndexer() methods, which expose ways to modify the underlying 193// storage. Currently these two methods are used for creating Lister 194// and internal tests. 195// 196// Also see the comment on DeltaFIFO. 197// 198// Warning: This constructs a DeltaFIFO that does not differentiate between 199// events caused by a call to Replace (e.g., from a relist, which may 200// contain object updates), and synthetic events caused by a periodic resync 201// (which just emit the existing object). See https://issue.k8s.io/86015 for details. 202// 203// Use `NewDeltaFIFOWithOptions(DeltaFIFOOptions{..., EmitDeltaTypeReplaced: true})` 204// instead to receive a `Replaced` event depending on the type. 205// 206// Deprecated: Equivalent to NewDeltaFIFOWithOptions(DeltaFIFOOptions{KeyFunction: keyFunc, KnownObjects: knownObjects}) 207func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO { 208 return NewDeltaFIFOWithOptions(DeltaFIFOOptions{ 209 KeyFunction: keyFunc, 210 KnownObjects: knownObjects, 211 }) 212} 213 214// NewDeltaFIFOWithOptions returns a Queue which can be used to process changes to 215// items. See also the comment on DeltaFIFO. 216func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO { 217 if opts.KeyFunction == nil { 218 opts.KeyFunction = MetaNamespaceKeyFunc 219 } 220 221 f := &DeltaFIFO{ 222 items: map[string]Deltas{}, 223 queue: []string{}, 224 keyFunc: opts.KeyFunction, 225 knownObjects: opts.KnownObjects, 226 227 emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced, 228 } 229 f.cond.L = &f.lock 230 return f 231} 232 233var ( 234 _ = Queue(&DeltaFIFO{}) // DeltaFIFO is a Queue 235) 236 237var ( 238 // ErrZeroLengthDeltasObject is returned in a KeyError if a Deltas 239 // object with zero length is encountered (should be impossible, 240 // but included for completeness). 241 ErrZeroLengthDeltasObject = errors.New("0 length Deltas object; can't get key") 242) 243 244// Close the queue. 245func (f *DeltaFIFO) Close() { 246 f.lock.Lock() 247 defer f.lock.Unlock() 248 f.closed = true 249 f.cond.Broadcast() 250} 251 252// KeyOf exposes f's keyFunc, but also detects the key of a Deltas object or 253// DeletedFinalStateUnknown objects. 254func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error) { 255 if d, ok := obj.(Deltas); ok { 256 if len(d) == 0 { 257 return "", KeyError{obj, ErrZeroLengthDeltasObject} 258 } 259 obj = d.Newest().Object 260 } 261 if d, ok := obj.(DeletedFinalStateUnknown); ok { 262 return d.Key, nil 263 } 264 return f.keyFunc(obj) 265} 266 267// HasSynced returns true if an Add/Update/Delete/AddIfNotPresent are called first, 268// or the first batch of items inserted by Replace() has been popped. 269func (f *DeltaFIFO) HasSynced() bool { 270 f.lock.Lock() 271 defer f.lock.Unlock() 272 return f.populated && f.initialPopulationCount == 0 273} 274 275// Add inserts an item, and puts it in the queue. The item is only enqueued 276// if it doesn't already exist in the set. 277func (f *DeltaFIFO) Add(obj interface{}) error { 278 f.lock.Lock() 279 defer f.lock.Unlock() 280 f.populated = true 281 return f.queueActionLocked(Added, obj) 282} 283 284// Update is just like Add, but makes an Updated Delta. 285func (f *DeltaFIFO) Update(obj interface{}) error { 286 f.lock.Lock() 287 defer f.lock.Unlock() 288 f.populated = true 289 return f.queueActionLocked(Updated, obj) 290} 291 292// Delete is just like Add, but makes a Deleted Delta. If the given 293// object does not already exist, it will be ignored. (It may have 294// already been deleted by a Replace (re-list), for example.) In this 295// method `f.knownObjects`, if not nil, provides (via GetByKey) 296// _additional_ objects that are considered to already exist. 297func (f *DeltaFIFO) Delete(obj interface{}) error { 298 id, err := f.KeyOf(obj) 299 if err != nil { 300 return KeyError{obj, err} 301 } 302 f.lock.Lock() 303 defer f.lock.Unlock() 304 f.populated = true 305 if f.knownObjects == nil { 306 if _, exists := f.items[id]; !exists { 307 // Presumably, this was deleted when a relist happened. 308 // Don't provide a second report of the same deletion. 309 return nil 310 } 311 } else { 312 // We only want to skip the "deletion" action if the object doesn't 313 // exist in knownObjects and it doesn't have corresponding item in items. 314 // Note that even if there is a "deletion" action in items, we can ignore it, 315 // because it will be deduped automatically in "queueActionLocked" 316 _, exists, err := f.knownObjects.GetByKey(id) 317 _, itemsExist := f.items[id] 318 if err == nil && !exists && !itemsExist { 319 // Presumably, this was deleted when a relist happened. 320 // Don't provide a second report of the same deletion. 321 return nil 322 } 323 } 324 325 // exist in items and/or KnownObjects 326 return f.queueActionLocked(Deleted, obj) 327} 328 329// AddIfNotPresent inserts an item, and puts it in the queue. If the item is already 330// present in the set, it is neither enqueued nor added to the set. 331// 332// This is useful in a single producer/consumer scenario so that the consumer can 333// safely retry items without contending with the producer and potentially enqueueing 334// stale items. 335// 336// Important: obj must be a Deltas (the output of the Pop() function). Yes, this is 337// different from the Add/Update/Delete functions. 338func (f *DeltaFIFO) AddIfNotPresent(obj interface{}) error { 339 deltas, ok := obj.(Deltas) 340 if !ok { 341 return fmt.Errorf("object must be of type deltas, but got: %#v", obj) 342 } 343 id, err := f.KeyOf(deltas) 344 if err != nil { 345 return KeyError{obj, err} 346 } 347 f.lock.Lock() 348 defer f.lock.Unlock() 349 f.addIfNotPresent(id, deltas) 350 return nil 351} 352 353// addIfNotPresent inserts deltas under id if it does not exist, and assumes the caller 354// already holds the fifo lock. 355func (f *DeltaFIFO) addIfNotPresent(id string, deltas Deltas) { 356 f.populated = true 357 if _, exists := f.items[id]; exists { 358 return 359 } 360 361 f.queue = append(f.queue, id) 362 f.items[id] = deltas 363 f.cond.Broadcast() 364} 365 366// re-listing and watching can deliver the same update multiple times in any 367// order. This will combine the most recent two deltas if they are the same. 368func dedupDeltas(deltas Deltas) Deltas { 369 n := len(deltas) 370 if n < 2 { 371 return deltas 372 } 373 a := &deltas[n-1] 374 b := &deltas[n-2] 375 if out := isDup(a, b); out != nil { 376 deltas[n-2] = *out 377 return deltas[:n-1] 378 } 379 return deltas 380} 381 382// If a & b represent the same event, returns the delta that ought to be kept. 383// Otherwise, returns nil. 384// TODO: is there anything other than deletions that need deduping? 385func isDup(a, b *Delta) *Delta { 386 if out := isDeletionDup(a, b); out != nil { 387 return out 388 } 389 // TODO: Detect other duplicate situations? Are there any? 390 return nil 391} 392 393// keep the one with the most information if both are deletions. 394func isDeletionDup(a, b *Delta) *Delta { 395 if b.Type != Deleted || a.Type != Deleted { 396 return nil 397 } 398 // Do more sophisticated checks, or is this sufficient? 399 if _, ok := b.Object.(DeletedFinalStateUnknown); ok { 400 return a 401 } 402 return b 403} 404 405// queueActionLocked appends to the delta list for the object. 406// Caller must lock first. 407func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error { 408 id, err := f.KeyOf(obj) 409 if err != nil { 410 return KeyError{obj, err} 411 } 412 oldDeltas := f.items[id] 413 newDeltas := append(oldDeltas, Delta{actionType, obj}) 414 newDeltas = dedupDeltas(newDeltas) 415 416 if len(newDeltas) > 0 { 417 if _, exists := f.items[id]; !exists { 418 f.queue = append(f.queue, id) 419 } 420 f.items[id] = newDeltas 421 f.cond.Broadcast() 422 } else { 423 // This never happens, because dedupDeltas never returns an empty list 424 // when given a non-empty list (as it is here). 425 // If somehow it happens anyway, deal with it but complain. 426 if oldDeltas == nil { 427 klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; ignoring", id, oldDeltas, obj) 428 return nil 429 } 430 klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; breaking invariant by storing empty Deltas", id, oldDeltas, obj) 431 f.items[id] = newDeltas 432 return fmt.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; broke DeltaFIFO invariant by storing empty Deltas", id, oldDeltas, obj) 433 } 434 return nil 435} 436 437// List returns a list of all the items; it returns the object 438// from the most recent Delta. 439// You should treat the items returned inside the deltas as immutable. 440func (f *DeltaFIFO) List() []interface{} { 441 f.lock.RLock() 442 defer f.lock.RUnlock() 443 return f.listLocked() 444} 445 446func (f *DeltaFIFO) listLocked() []interface{} { 447 list := make([]interface{}, 0, len(f.items)) 448 for _, item := range f.items { 449 list = append(list, item.Newest().Object) 450 } 451 return list 452} 453 454// ListKeys returns a list of all the keys of the objects currently 455// in the FIFO. 456func (f *DeltaFIFO) ListKeys() []string { 457 f.lock.RLock() 458 defer f.lock.RUnlock() 459 list := make([]string, 0, len(f.items)) 460 for key := range f.items { 461 list = append(list, key) 462 } 463 return list 464} 465 466// Get returns the complete list of deltas for the requested item, 467// or sets exists=false. 468// You should treat the items returned inside the deltas as immutable. 469func (f *DeltaFIFO) Get(obj interface{}) (item interface{}, exists bool, err error) { 470 key, err := f.KeyOf(obj) 471 if err != nil { 472 return nil, false, KeyError{obj, err} 473 } 474 return f.GetByKey(key) 475} 476 477// GetByKey returns the complete list of deltas for the requested item, 478// setting exists=false if that list is empty. 479// You should treat the items returned inside the deltas as immutable. 480func (f *DeltaFIFO) GetByKey(key string) (item interface{}, exists bool, err error) { 481 f.lock.RLock() 482 defer f.lock.RUnlock() 483 d, exists := f.items[key] 484 if exists { 485 // Copy item's slice so operations on this slice 486 // won't interfere with the object we return. 487 d = copyDeltas(d) 488 } 489 return d, exists, nil 490} 491 492// IsClosed checks if the queue is closed 493func (f *DeltaFIFO) IsClosed() bool { 494 f.lock.Lock() 495 defer f.lock.Unlock() 496 return f.closed 497} 498 499// Pop blocks until the queue has some items, and then returns one. If 500// multiple items are ready, they are returned in the order in which they were 501// added/updated. The item is removed from the queue (and the store) before it 502// is returned, so if you don't successfully process it, you need to add it back 503// with AddIfNotPresent(). 504// process function is called under lock, so it is safe to update data structures 505// in it that need to be in sync with the queue (e.g. knownKeys). The PopProcessFunc 506// may return an instance of ErrRequeue with a nested error to indicate the current 507// item should be requeued (equivalent to calling AddIfNotPresent under the lock). 508// process should avoid expensive I/O operation so that other queue operations, i.e. 509// Add() and Get(), won't be blocked for too long. 510// 511// Pop returns a 'Deltas', which has a complete list of all the things 512// that happened to the object (deltas) while it was sitting in the queue. 513func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) { 514 f.lock.Lock() 515 defer f.lock.Unlock() 516 for { 517 for len(f.queue) == 0 { 518 // When the queue is empty, invocation of Pop() is blocked until new item is enqueued. 519 // When Close() is called, the f.closed is set and the condition is broadcasted. 520 // Which causes this loop to continue and return from the Pop(). 521 if f.closed { 522 return nil, ErrFIFOClosed 523 } 524 525 f.cond.Wait() 526 } 527 id := f.queue[0] 528 f.queue = f.queue[1:] 529 if f.initialPopulationCount > 0 { 530 f.initialPopulationCount-- 531 } 532 item, ok := f.items[id] 533 if !ok { 534 // This should never happen 535 klog.Errorf("Inconceivable! %q was in f.queue but not f.items; ignoring.", id) 536 continue 537 } 538 delete(f.items, id) 539 err := process(item) 540 if e, ok := err.(ErrRequeue); ok { 541 f.addIfNotPresent(id, item) 542 err = e.Err 543 } 544 // Don't need to copyDeltas here, because we're transferring 545 // ownership to the caller. 546 return item, err 547 } 548} 549 550// Replace atomically does two things: (1) it adds the given objects 551// using the Sync or Replace DeltaType and then (2) it does some deletions. 552// In particular: for every pre-existing key K that is not the key of 553// an object in `list` there is the effect of 554// `Delete(DeletedFinalStateUnknown{K, O})` where O is current object 555// of K. If `f.knownObjects == nil` then the pre-existing keys are 556// those in `f.items` and the current object of K is the `.Newest()` 557// of the Deltas associated with K. Otherwise the pre-existing keys 558// are those listed by `f.knownObjects` and the current object of K is 559// what `f.knownObjects.GetByKey(K)` returns. 560func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error { 561 f.lock.Lock() 562 defer f.lock.Unlock() 563 keys := make(sets.String, len(list)) 564 565 // keep backwards compat for old clients 566 action := Sync 567 if f.emitDeltaTypeReplaced { 568 action = Replaced 569 } 570 571 // Add Sync/Replaced action for each new item. 572 for _, item := range list { 573 key, err := f.KeyOf(item) 574 if err != nil { 575 return KeyError{item, err} 576 } 577 keys.Insert(key) 578 if err := f.queueActionLocked(action, item); err != nil { 579 return fmt.Errorf("couldn't enqueue object: %v", err) 580 } 581 } 582 583 if f.knownObjects == nil { 584 // Do deletion detection against our own list. 585 queuedDeletions := 0 586 for k, oldItem := range f.items { 587 if keys.Has(k) { 588 continue 589 } 590 // Delete pre-existing items not in the new list. 591 // This could happen if watch deletion event was missed while 592 // disconnected from apiserver. 593 var deletedObj interface{} 594 if n := oldItem.Newest(); n != nil { 595 deletedObj = n.Object 596 } 597 queuedDeletions++ 598 if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil { 599 return err 600 } 601 } 602 603 if !f.populated { 604 f.populated = true 605 // While there shouldn't be any queued deletions in the initial 606 // population of the queue, it's better to be on the safe side. 607 f.initialPopulationCount = keys.Len() + queuedDeletions 608 } 609 610 return nil 611 } 612 613 // Detect deletions not already in the queue. 614 knownKeys := f.knownObjects.ListKeys() 615 queuedDeletions := 0 616 for _, k := range knownKeys { 617 if keys.Has(k) { 618 continue 619 } 620 621 deletedObj, exists, err := f.knownObjects.GetByKey(k) 622 if err != nil { 623 deletedObj = nil 624 klog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k) 625 } else if !exists { 626 deletedObj = nil 627 klog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k) 628 } 629 queuedDeletions++ 630 if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil { 631 return err 632 } 633 } 634 635 if !f.populated { 636 f.populated = true 637 f.initialPopulationCount = keys.Len() + queuedDeletions 638 } 639 640 return nil 641} 642 643// Resync adds, with a Sync type of Delta, every object listed by 644// `f.knownObjects` whose key is not already queued for processing. 645// If `f.knownObjects` is `nil` then Resync does nothing. 646func (f *DeltaFIFO) Resync() error { 647 f.lock.Lock() 648 defer f.lock.Unlock() 649 650 if f.knownObjects == nil { 651 return nil 652 } 653 654 keys := f.knownObjects.ListKeys() 655 for _, k := range keys { 656 if err := f.syncKeyLocked(k); err != nil { 657 return err 658 } 659 } 660 return nil 661} 662 663func (f *DeltaFIFO) syncKeyLocked(key string) error { 664 obj, exists, err := f.knownObjects.GetByKey(key) 665 if err != nil { 666 klog.Errorf("Unexpected error %v during lookup of key %v, unable to queue object for sync", err, key) 667 return nil 668 } else if !exists { 669 klog.Infof("Key %v does not exist in known objects store, unable to queue object for sync", key) 670 return nil 671 } 672 673 // If we are doing Resync() and there is already an event queued for that object, 674 // we ignore the Resync for it. This is to avoid the race, in which the resync 675 // comes with the previous value of object (since queueing an event for the object 676 // doesn't trigger changing the underlying store <knownObjects>. 677 id, err := f.KeyOf(obj) 678 if err != nil { 679 return KeyError{obj, err} 680 } 681 if len(f.items[id]) > 0 { 682 return nil 683 } 684 685 if err := f.queueActionLocked(Sync, obj); err != nil { 686 return fmt.Errorf("couldn't queue object: %v", err) 687 } 688 return nil 689} 690 691// A KeyListerGetter is anything that knows how to list its keys and look up by key. 692type KeyListerGetter interface { 693 KeyLister 694 KeyGetter 695} 696 697// A KeyLister is anything that knows how to list its keys. 698type KeyLister interface { 699 ListKeys() []string 700} 701 702// A KeyGetter is anything that knows how to get the value stored under a given key. 703type KeyGetter interface { 704 // GetByKey returns the value associated with the key, or sets exists=false. 705 GetByKey(key string) (value interface{}, exists bool, err error) 706} 707 708// Oldest is a convenience function that returns the oldest delta, or 709// nil if there are no deltas. 710func (d Deltas) Oldest() *Delta { 711 if len(d) > 0 { 712 return &d[0] 713 } 714 return nil 715} 716 717// Newest is a convenience function that returns the newest delta, or 718// nil if there are no deltas. 719func (d Deltas) Newest() *Delta { 720 if n := len(d); n > 0 { 721 return &d[n-1] 722 } 723 return nil 724} 725 726// copyDeltas returns a shallow copy of d; that is, it copies the slice but not 727// the objects in the slice. This allows Get/List to return an object that we 728// know won't be clobbered by a subsequent modifications. 729func copyDeltas(d Deltas) Deltas { 730 d2 := make(Deltas, len(d)) 731 copy(d2, d) 732 return d2 733} 734 735// DeletedFinalStateUnknown is placed into a DeltaFIFO in the case where an object 736// was deleted but the watch deletion event was missed while disconnected from 737// apiserver. In this case we don't know the final "resting" state of the object, so 738// there's a chance the included `Obj` is stale. 739type DeletedFinalStateUnknown struct { 740 Key string 741 Obj interface{} 742} 743