1/* 2Copyright 2015 The Kubernetes Authors. 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package cache 18 19import ( 20 "fmt" 21 "sync" 22 "time" 23 24 "k8s.io/apimachinery/pkg/api/meta" 25 "k8s.io/apimachinery/pkg/runtime" 26 "k8s.io/apimachinery/pkg/util/clock" 27 utilruntime "k8s.io/apimachinery/pkg/util/runtime" 28 "k8s.io/apimachinery/pkg/util/wait" 29 "k8s.io/utils/buffer" 30 31 "k8s.io/klog" 32) 33 34// SharedInformer provides eventually consistent linkage of its 35// clients to the authoritative state of a given collection of 36// objects. An object is identified by its API group, kind/resource, 37// namespace, and name; the `ObjectMeta.UID` is not part of an 38// object's ID as far as this contract is concerned. One 39// SharedInformer provides linkage to objects of a particular API 40// group and kind/resource. The linked object collection of a 41// SharedInformer may be further restricted to one namespace and/or by 42// label selector and/or field selector. 43// 44// The authoritative state of an object is what apiservers provide 45// access to, and an object goes through a strict sequence of states. 46// An object state is either "absent" or present with a 47// ResourceVersion and other appropriate content. 48// 49// A SharedInformer maintains a local cache, exposed by GetStore() and 50// by GetIndexer() in the case of an indexed informer, of the state of 51// each relevant object. This cache is eventually consistent with the 52// authoritative state. This means that, unless prevented by 53// persistent communication problems, if ever a particular object ID X 54// is authoritatively associated with a state S then for every 55// SharedInformer I whose collection includes (X, S) eventually either 56// (1) I's cache associates X with S or a later state of X, (2) I is 57// stopped, or (3) the authoritative state service for X terminates. 58// To be formally complete, we say that the absent state meets any 59// restriction by label selector or field selector. 60// 61// For a given informer and relevant object ID X, the sequence of 62// states that appears in the informer's cache is a subsequence of the 63// states authoritatively associated with X. That is, some states 64// might never appear in the cache but ordering among the appearing 65// states is correct. Note, however, that there is no promise about 66// ordering between states seen for different objects. 67// 68// The local cache starts out empty, and gets populated and updated 69// during `Run()`. 70// 71// As a simple example, if a collection of objects is henceforth 72// unchanging, a SharedInformer is created that links to that 73// collection, and that SharedInformer is `Run()` then that 74// SharedInformer's cache eventually holds an exact copy of that 75// collection (unless it is stopped too soon, the authoritative state 76// service ends, or communication problems between the two 77// persistently thwart achievement). 78// 79// As another simple example, if the local cache ever holds a 80// non-absent state for some object ID and the object is eventually 81// removed from the authoritative state then eventually the object is 82// removed from the local cache (unless the SharedInformer is stopped 83// too soon, the authoritative state service ends, or communication 84// problems persistently thwart the desired result). 85// 86// The keys in the Store are of the form namespace/name for namespaced 87// objects, and are simply the name for non-namespaced objects. 88// Clients can use `MetaNamespaceKeyFunc(obj)` to extract the key for 89// a given object, and `SplitMetaNamespaceKey(key)` to split a key 90// into its constituent parts. 91// 92// Every query against the local cache is answered entirely from one 93// snapshot of the cache's state. Thus, the result of a `List` call 94// will not contain two entries with the same namespace and name. 95// 96// A client is identified here by a ResourceEventHandler. For every 97// update to the SharedInformer's local cache and for every client 98// added before `Run()`, eventually either the SharedInformer is 99// stopped or the client is notified of the update. A client added 100// after `Run()` starts gets a startup batch of notifications of 101// additions of the object existing in the cache at the time that 102// client was added; also, for every update to the SharedInformer's 103// local cache after that client was added, eventually either the 104// SharedInformer is stopped or that client is notified of that 105// update. Client notifications happen after the corresponding cache 106// update and, in the case of a SharedIndexInformer, after the 107// corresponding index updates. It is possible that additional cache 108// and index updates happen before such a prescribed notification. 109// For a given SharedInformer and client, the notifications are 110// delivered sequentially. For a given SharedInformer, client, and 111// object ID, the notifications are delivered in order. Because 112// `ObjectMeta.UID` has no role in identifying objects, it is possible 113// that when (1) object O1 with ID (e.g. namespace and name) X and 114// `ObjectMeta.UID` U1 in the SharedInformer's local cache is deleted 115// and later (2) another object O2 with ID X and ObjectMeta.UID U2 is 116// created the informer's clients are not notified of (1) and (2) but 117// rather are notified only of an update from O1 to O2. Clients that 118// need to detect such cases might do so by comparing the `ObjectMeta.UID` 119// field of the old and the new object in the code that handles update 120// notifications (i.e. `OnUpdate` method of ResourceEventHandler). 121// 122// A client must process each notification promptly; a SharedInformer 123// is not engineered to deal well with a large backlog of 124// notifications to deliver. Lengthy processing should be passed off 125// to something else, for example through a 126// `client-go/util/workqueue`. 127// 128// A delete notification exposes the last locally known non-absent 129// state, except that its ResourceVersion is replaced with a 130// ResourceVersion in which the object is actually absent. 131type SharedInformer interface { 132 // AddEventHandler adds an event handler to the shared informer using the shared informer's resync 133 // period. Events to a single handler are delivered sequentially, but there is no coordination 134 // between different handlers. 135 AddEventHandler(handler ResourceEventHandler) 136 // AddEventHandlerWithResyncPeriod adds an event handler to the 137 // shared informer with the requested resync period; zero means 138 // this handler does not care about resyncs. The resync operation 139 // consists of delivering to the handler an update notification 140 // for every object in the informer's local cache; it does not add 141 // any interactions with the authoritative storage. Some 142 // informers do no resyncs at all, not even for handlers added 143 // with a non-zero resyncPeriod. For an informer that does 144 // resyncs, and for each handler that requests resyncs, that 145 // informer develops a nominal resync period that is no shorter 146 // than the requested period but may be longer. The actual time 147 // between any two resyncs may be longer than the nominal period 148 // because the implementation takes time to do work and there may 149 // be competing load and scheduling noise. 150 AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) 151 // GetStore returns the informer's local cache as a Store. 152 GetStore() Store 153 // GetController is deprecated, it does nothing useful 154 GetController() Controller 155 // Run starts and runs the shared informer, returning after it stops. 156 // The informer will be stopped when stopCh is closed. 157 Run(stopCh <-chan struct{}) 158 // HasSynced returns true if the shared informer's store has been 159 // informed by at least one full LIST of the authoritative state 160 // of the informer's object collection. This is unrelated to "resync". 161 HasSynced() bool 162 // LastSyncResourceVersion is the resource version observed when last synced with the underlying 163 // store. The value returned is not synchronized with access to the underlying store and is not 164 // thread-safe. 165 LastSyncResourceVersion() string 166} 167 168// SharedIndexInformer provides add and get Indexers ability based on SharedInformer. 169type SharedIndexInformer interface { 170 SharedInformer 171 // AddIndexers add indexers to the informer before it starts. 172 AddIndexers(indexers Indexers) error 173 GetIndexer() Indexer 174} 175 176// NewSharedInformer creates a new instance for the listwatcher. 177func NewSharedInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration) SharedInformer { 178 return NewSharedIndexInformer(lw, exampleObject, defaultEventHandlerResyncPeriod, Indexers{}) 179} 180 181// NewSharedIndexInformer creates a new instance for the listwatcher. 182// The created informer will not do resyncs if the given 183// defaultEventHandlerResyncPeriod is zero. Otherwise: for each 184// handler that with a non-zero requested resync period, whether added 185// before or after the informer starts, the nominal resync period is 186// the requested resync period rounded up to a multiple of the 187// informer's resync checking period. Such an informer's resync 188// checking period is established when the informer starts running, 189// and is the maximum of (a) the minimum of the resync periods 190// requested before the informer starts and the 191// defaultEventHandlerResyncPeriod given here and (b) the constant 192// `minimumResyncPeriod` defined in this file. 193func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer { 194 realClock := &clock.RealClock{} 195 sharedIndexInformer := &sharedIndexInformer{ 196 processor: &sharedProcessor{clock: realClock}, 197 indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers), 198 listerWatcher: lw, 199 objectType: exampleObject, 200 resyncCheckPeriod: defaultEventHandlerResyncPeriod, 201 defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod, 202 cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)), 203 clock: realClock, 204 } 205 return sharedIndexInformer 206} 207 208// InformerSynced is a function that can be used to determine if an informer has synced. This is useful for determining if caches have synced. 209type InformerSynced func() bool 210 211const ( 212 // syncedPollPeriod controls how often you look at the status of your sync funcs 213 syncedPollPeriod = 100 * time.Millisecond 214 215 // initialBufferSize is the initial number of event notifications that can be buffered. 216 initialBufferSize = 1024 217) 218 219// WaitForNamedCacheSync is a wrapper around WaitForCacheSync that generates log messages 220// indicating that the caller identified by name is waiting for syncs, followed by 221// either a successful or failed sync. 222func WaitForNamedCacheSync(controllerName string, stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool { 223 klog.Infof("Waiting for caches to sync for %s", controllerName) 224 225 if !WaitForCacheSync(stopCh, cacheSyncs...) { 226 utilruntime.HandleError(fmt.Errorf("unable to sync caches for %s", controllerName)) 227 return false 228 } 229 230 klog.Infof("Caches are synced for %s ", controllerName) 231 return true 232} 233 234// WaitForCacheSync waits for caches to populate. It returns true if it was successful, false 235// if the controller should shutdown 236// callers should prefer WaitForNamedCacheSync() 237func WaitForCacheSync(stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool { 238 err := wait.PollImmediateUntil(syncedPollPeriod, 239 func() (bool, error) { 240 for _, syncFunc := range cacheSyncs { 241 if !syncFunc() { 242 return false, nil 243 } 244 } 245 return true, nil 246 }, 247 stopCh) 248 if err != nil { 249 klog.V(2).Infof("stop requested") 250 return false 251 } 252 253 klog.V(4).Infof("caches populated") 254 return true 255} 256 257// `*sharedIndexInformer` implements SharedIndexInformer and has three 258// main components. One is an indexed local cache, `indexer Indexer`. 259// The second main component is a Controller that pulls 260// objects/notifications using the ListerWatcher and pushes them into 261// a DeltaFIFO --- whose knownObjects is the informer's local cache 262// --- while concurrently Popping Deltas values from that fifo and 263// processing them with `sharedIndexInformer::HandleDeltas`. Each 264// invocation of HandleDeltas, which is done with the fifo's lock 265// held, processes each Delta in turn. For each Delta this both 266// updates the local cache and stuffs the relevant notification into 267// the sharedProcessor. The third main component is that 268// sharedProcessor, which is responsible for relaying those 269// notifications to each of the informer's clients. 270type sharedIndexInformer struct { 271 indexer Indexer 272 controller Controller 273 274 processor *sharedProcessor 275 cacheMutationDetector MutationDetector 276 277 listerWatcher ListerWatcher 278 279 // objectType is an example object of the type this informer is 280 // expected to handle. Only the type needs to be right, except 281 // that when that is `unstructured.Unstructured` the object's 282 // `"apiVersion"` and `"kind"` must also be right. 283 objectType runtime.Object 284 285 // resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call 286 // shouldResync to check if any of our listeners need a resync. 287 resyncCheckPeriod time.Duration 288 // defaultEventHandlerResyncPeriod is the default resync period for any handlers added via 289 // AddEventHandler (i.e. they don't specify one and just want to use the shared informer's default 290 // value). 291 defaultEventHandlerResyncPeriod time.Duration 292 // clock allows for testability 293 clock clock.Clock 294 295 started, stopped bool 296 startedLock sync.Mutex 297 298 // blockDeltas gives a way to stop all event distribution so that a late event handler 299 // can safely join the shared informer. 300 blockDeltas sync.Mutex 301} 302 303// dummyController hides the fact that a SharedInformer is different from a dedicated one 304// where a caller can `Run`. The run method is disconnected in this case, because higher 305// level logic will decide when to start the SharedInformer and related controller. 306// Because returning information back is always asynchronous, the legacy callers shouldn't 307// notice any change in behavior. 308type dummyController struct { 309 informer *sharedIndexInformer 310} 311 312func (v *dummyController) Run(stopCh <-chan struct{}) { 313} 314 315func (v *dummyController) HasSynced() bool { 316 return v.informer.HasSynced() 317} 318 319func (v *dummyController) LastSyncResourceVersion() string { 320 return "" 321} 322 323type updateNotification struct { 324 oldObj interface{} 325 newObj interface{} 326} 327 328type addNotification struct { 329 newObj interface{} 330} 331 332type deleteNotification struct { 333 oldObj interface{} 334} 335 336func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) { 337 defer utilruntime.HandleCrash() 338 339 fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ 340 KnownObjects: s.indexer, 341 EmitDeltaTypeReplaced: true, 342 }) 343 344 cfg := &Config{ 345 Queue: fifo, 346 ListerWatcher: s.listerWatcher, 347 ObjectType: s.objectType, 348 FullResyncPeriod: s.resyncCheckPeriod, 349 RetryOnError: false, 350 ShouldResync: s.processor.shouldResync, 351 352 Process: s.HandleDeltas, 353 } 354 355 func() { 356 s.startedLock.Lock() 357 defer s.startedLock.Unlock() 358 359 s.controller = New(cfg) 360 s.controller.(*controller).clock = s.clock 361 s.started = true 362 }() 363 364 // Separate stop channel because Processor should be stopped strictly after controller 365 processorStopCh := make(chan struct{}) 366 var wg wait.Group 367 defer wg.Wait() // Wait for Processor to stop 368 defer close(processorStopCh) // Tell Processor to stop 369 wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run) 370 wg.StartWithChannel(processorStopCh, s.processor.run) 371 372 defer func() { 373 s.startedLock.Lock() 374 defer s.startedLock.Unlock() 375 s.stopped = true // Don't want any new listeners 376 }() 377 s.controller.Run(stopCh) 378} 379 380func (s *sharedIndexInformer) HasSynced() bool { 381 s.startedLock.Lock() 382 defer s.startedLock.Unlock() 383 384 if s.controller == nil { 385 return false 386 } 387 return s.controller.HasSynced() 388} 389 390func (s *sharedIndexInformer) LastSyncResourceVersion() string { 391 s.startedLock.Lock() 392 defer s.startedLock.Unlock() 393 394 if s.controller == nil { 395 return "" 396 } 397 return s.controller.LastSyncResourceVersion() 398} 399 400func (s *sharedIndexInformer) GetStore() Store { 401 return s.indexer 402} 403 404func (s *sharedIndexInformer) GetIndexer() Indexer { 405 return s.indexer 406} 407 408func (s *sharedIndexInformer) AddIndexers(indexers Indexers) error { 409 s.startedLock.Lock() 410 defer s.startedLock.Unlock() 411 412 if s.started { 413 return fmt.Errorf("informer has already started") 414 } 415 416 return s.indexer.AddIndexers(indexers) 417} 418 419func (s *sharedIndexInformer) GetController() Controller { 420 return &dummyController{informer: s} 421} 422 423func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) { 424 s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod) 425} 426 427func determineResyncPeriod(desired, check time.Duration) time.Duration { 428 if desired == 0 { 429 return desired 430 } 431 if check == 0 { 432 klog.Warningf("The specified resyncPeriod %v is invalid because this shared informer doesn't support resyncing", desired) 433 return 0 434 } 435 if desired < check { 436 klog.Warningf("The specified resyncPeriod %v is being increased to the minimum resyncCheckPeriod %v", desired, check) 437 return check 438 } 439 return desired 440} 441 442const minimumResyncPeriod = 1 * time.Second 443 444func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) { 445 s.startedLock.Lock() 446 defer s.startedLock.Unlock() 447 448 if s.stopped { 449 klog.V(2).Infof("Handler %v was not added to shared informer because it has stopped already", handler) 450 return 451 } 452 453 if resyncPeriod > 0 { 454 if resyncPeriod < minimumResyncPeriod { 455 klog.Warningf("resyncPeriod %d is too small. Changing it to the minimum allowed value of %d", resyncPeriod, minimumResyncPeriod) 456 resyncPeriod = minimumResyncPeriod 457 } 458 459 if resyncPeriod < s.resyncCheckPeriod { 460 if s.started { 461 klog.Warningf("resyncPeriod %d is smaller than resyncCheckPeriod %d and the informer has already started. Changing it to %d", resyncPeriod, s.resyncCheckPeriod, s.resyncCheckPeriod) 462 resyncPeriod = s.resyncCheckPeriod 463 } else { 464 // if the event handler's resyncPeriod is smaller than the current resyncCheckPeriod, update 465 // resyncCheckPeriod to match resyncPeriod and adjust the resync periods of all the listeners 466 // accordingly 467 s.resyncCheckPeriod = resyncPeriod 468 s.processor.resyncCheckPeriodChanged(resyncPeriod) 469 } 470 } 471 } 472 473 listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize) 474 475 if !s.started { 476 s.processor.addListener(listener) 477 return 478 } 479 480 // in order to safely join, we have to 481 // 1. stop sending add/update/delete notifications 482 // 2. do a list against the store 483 // 3. send synthetic "Add" events to the new handler 484 // 4. unblock 485 s.blockDeltas.Lock() 486 defer s.blockDeltas.Unlock() 487 488 s.processor.addListener(listener) 489 for _, item := range s.indexer.List() { 490 listener.add(addNotification{newObj: item}) 491 } 492} 493 494func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error { 495 s.blockDeltas.Lock() 496 defer s.blockDeltas.Unlock() 497 498 // from oldest to newest 499 for _, d := range obj.(Deltas) { 500 switch d.Type { 501 case Sync, Replaced, Added, Updated: 502 s.cacheMutationDetector.AddObject(d.Object) 503 if old, exists, err := s.indexer.Get(d.Object); err == nil && exists { 504 if err := s.indexer.Update(d.Object); err != nil { 505 return err 506 } 507 508 isSync := false 509 switch { 510 case d.Type == Sync: 511 // Sync events are only propagated to listeners that requested resync 512 isSync = true 513 case d.Type == Replaced: 514 if accessor, err := meta.Accessor(d.Object); err == nil { 515 if oldAccessor, err := meta.Accessor(old); err == nil { 516 // Replaced events that didn't change resourceVersion are treated as resync events 517 // and only propagated to listeners that requested resync 518 isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion() 519 } 520 } 521 } 522 s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync) 523 } else { 524 if err := s.indexer.Add(d.Object); err != nil { 525 return err 526 } 527 s.processor.distribute(addNotification{newObj: d.Object}, false) 528 } 529 case Deleted: 530 if err := s.indexer.Delete(d.Object); err != nil { 531 return err 532 } 533 s.processor.distribute(deleteNotification{oldObj: d.Object}, false) 534 } 535 } 536 return nil 537} 538 539// sharedProcessor has a collection of processorListener and can 540// distribute a notification object to its listeners. There are two 541// kinds of distribute operations. The sync distributions go to a 542// subset of the listeners that (a) is recomputed in the occasional 543// calls to shouldResync and (b) every listener is initially put in. 544// The non-sync distributions go to every listener. 545type sharedProcessor struct { 546 listenersStarted bool 547 listenersLock sync.RWMutex 548 listeners []*processorListener 549 syncingListeners []*processorListener 550 clock clock.Clock 551 wg wait.Group 552} 553 554func (p *sharedProcessor) addListener(listener *processorListener) { 555 p.listenersLock.Lock() 556 defer p.listenersLock.Unlock() 557 558 p.addListenerLocked(listener) 559 if p.listenersStarted { 560 p.wg.Start(listener.run) 561 p.wg.Start(listener.pop) 562 } 563} 564 565func (p *sharedProcessor) addListenerLocked(listener *processorListener) { 566 p.listeners = append(p.listeners, listener) 567 p.syncingListeners = append(p.syncingListeners, listener) 568} 569 570func (p *sharedProcessor) distribute(obj interface{}, sync bool) { 571 p.listenersLock.RLock() 572 defer p.listenersLock.RUnlock() 573 574 if sync { 575 for _, listener := range p.syncingListeners { 576 listener.add(obj) 577 } 578 } else { 579 for _, listener := range p.listeners { 580 listener.add(obj) 581 } 582 } 583} 584 585func (p *sharedProcessor) run(stopCh <-chan struct{}) { 586 func() { 587 p.listenersLock.RLock() 588 defer p.listenersLock.RUnlock() 589 for _, listener := range p.listeners { 590 p.wg.Start(listener.run) 591 p.wg.Start(listener.pop) 592 } 593 p.listenersStarted = true 594 }() 595 <-stopCh 596 p.listenersLock.RLock() 597 defer p.listenersLock.RUnlock() 598 for _, listener := range p.listeners { 599 close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop 600 } 601 p.wg.Wait() // Wait for all .pop() and .run() to stop 602} 603 604// shouldResync queries every listener to determine if any of them need a resync, based on each 605// listener's resyncPeriod. 606func (p *sharedProcessor) shouldResync() bool { 607 p.listenersLock.Lock() 608 defer p.listenersLock.Unlock() 609 610 p.syncingListeners = []*processorListener{} 611 612 resyncNeeded := false 613 now := p.clock.Now() 614 for _, listener := range p.listeners { 615 // need to loop through all the listeners to see if they need to resync so we can prepare any 616 // listeners that are going to be resyncing. 617 if listener.shouldResync(now) { 618 resyncNeeded = true 619 p.syncingListeners = append(p.syncingListeners, listener) 620 listener.determineNextResync(now) 621 } 622 } 623 return resyncNeeded 624} 625 626func (p *sharedProcessor) resyncCheckPeriodChanged(resyncCheckPeriod time.Duration) { 627 p.listenersLock.RLock() 628 defer p.listenersLock.RUnlock() 629 630 for _, listener := range p.listeners { 631 resyncPeriod := determineResyncPeriod(listener.requestedResyncPeriod, resyncCheckPeriod) 632 listener.setResyncPeriod(resyncPeriod) 633 } 634} 635 636// processorListener relays notifications from a sharedProcessor to 637// one ResourceEventHandler --- using two goroutines, two unbuffered 638// channels, and an unbounded ring buffer. The `add(notification)` 639// function sends the given notification to `addCh`. One goroutine 640// runs `pop()`, which pumps notifications from `addCh` to `nextCh` 641// using storage in the ring buffer while `nextCh` is not keeping up. 642// Another goroutine runs `run()`, which receives notifications from 643// `nextCh` and synchronously invokes the appropriate handler method. 644// 645// processorListener also keeps track of the adjusted requested resync 646// period of the listener. 647type processorListener struct { 648 nextCh chan interface{} 649 addCh chan interface{} 650 651 handler ResourceEventHandler 652 653 // pendingNotifications is an unbounded ring buffer that holds all notifications not yet distributed. 654 // There is one per listener, but a failing/stalled listener will have infinite pendingNotifications 655 // added until we OOM. 656 // TODO: This is no worse than before, since reflectors were backed by unbounded DeltaFIFOs, but 657 // we should try to do something better. 658 pendingNotifications buffer.RingGrowing 659 660 // requestedResyncPeriod is how frequently the listener wants a 661 // full resync from the shared informer, but modified by two 662 // adjustments. One is imposing a lower bound, 663 // `minimumResyncPeriod`. The other is another lower bound, the 664 // sharedProcessor's `resyncCheckPeriod`, that is imposed (a) only 665 // in AddEventHandlerWithResyncPeriod invocations made after the 666 // sharedProcessor starts and (b) only if the informer does 667 // resyncs at all. 668 requestedResyncPeriod time.Duration 669 // resyncPeriod is the threshold that will be used in the logic 670 // for this listener. This value differs from 671 // requestedResyncPeriod only when the sharedIndexInformer does 672 // not do resyncs, in which case the value here is zero. The 673 // actual time between resyncs depends on when the 674 // sharedProcessor's `shouldResync` function is invoked and when 675 // the sharedIndexInformer processes `Sync` type Delta objects. 676 resyncPeriod time.Duration 677 // nextResync is the earliest time the listener should get a full resync 678 nextResync time.Time 679 // resyncLock guards access to resyncPeriod and nextResync 680 resyncLock sync.Mutex 681} 682 683func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int) *processorListener { 684 ret := &processorListener{ 685 nextCh: make(chan interface{}), 686 addCh: make(chan interface{}), 687 handler: handler, 688 pendingNotifications: *buffer.NewRingGrowing(bufferSize), 689 requestedResyncPeriod: requestedResyncPeriod, 690 resyncPeriod: resyncPeriod, 691 } 692 693 ret.determineNextResync(now) 694 695 return ret 696} 697 698func (p *processorListener) add(notification interface{}) { 699 p.addCh <- notification 700} 701 702func (p *processorListener) pop() { 703 defer utilruntime.HandleCrash() 704 defer close(p.nextCh) // Tell .run() to stop 705 706 var nextCh chan<- interface{} 707 var notification interface{} 708 for { 709 select { 710 case nextCh <- notification: 711 // Notification dispatched 712 var ok bool 713 notification, ok = p.pendingNotifications.ReadOne() 714 if !ok { // Nothing to pop 715 nextCh = nil // Disable this select case 716 } 717 case notificationToAdd, ok := <-p.addCh: 718 if !ok { 719 return 720 } 721 if notification == nil { // No notification to pop (and pendingNotifications is empty) 722 // Optimize the case - skip adding to pendingNotifications 723 notification = notificationToAdd 724 nextCh = p.nextCh 725 } else { // There is already a notification waiting to be dispatched 726 p.pendingNotifications.WriteOne(notificationToAdd) 727 } 728 } 729 } 730} 731 732func (p *processorListener) run() { 733 // this call blocks until the channel is closed. When a panic happens during the notification 734 // we will catch it, **the offending item will be skipped!**, and after a short delay (one second) 735 // the next notification will be attempted. This is usually better than the alternative of never 736 // delivering again. 737 stopCh := make(chan struct{}) 738 wait.Until(func() { 739 for next := range p.nextCh { 740 switch notification := next.(type) { 741 case updateNotification: 742 p.handler.OnUpdate(notification.oldObj, notification.newObj) 743 case addNotification: 744 p.handler.OnAdd(notification.newObj) 745 case deleteNotification: 746 p.handler.OnDelete(notification.oldObj) 747 default: 748 utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next)) 749 } 750 } 751 // the only way to get here is if the p.nextCh is empty and closed 752 close(stopCh) 753 }, 1*time.Second, stopCh) 754} 755 756// shouldResync deterimines if the listener needs a resync. If the listener's resyncPeriod is 0, 757// this always returns false. 758func (p *processorListener) shouldResync(now time.Time) bool { 759 p.resyncLock.Lock() 760 defer p.resyncLock.Unlock() 761 762 if p.resyncPeriod == 0 { 763 return false 764 } 765 766 return now.After(p.nextResync) || now.Equal(p.nextResync) 767} 768 769func (p *processorListener) determineNextResync(now time.Time) { 770 p.resyncLock.Lock() 771 defer p.resyncLock.Unlock() 772 773 p.nextResync = now.Add(p.resyncPeriod) 774} 775 776func (p *processorListener) setResyncPeriod(resyncPeriod time.Duration) { 777 p.resyncLock.Lock() 778 defer p.resyncLock.Unlock() 779 780 p.resyncPeriod = resyncPeriod 781} 782