1/* 2Copyright 2015 The Kubernetes Authors. 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package cache 18 19import ( 20 "fmt" 21 "sync" 22 "time" 23 24 "k8s.io/apimachinery/pkg/api/meta" 25 "k8s.io/apimachinery/pkg/runtime" 26 "k8s.io/apimachinery/pkg/util/clock" 27 utilruntime "k8s.io/apimachinery/pkg/util/runtime" 28 "k8s.io/apimachinery/pkg/util/wait" 29 "k8s.io/utils/buffer" 30 31 "k8s.io/klog/v2" 32) 33 34// SharedInformer provides eventually consistent linkage of its 35// clients to the authoritative state of a given collection of 36// objects. An object is identified by its API group, kind/resource, 37// namespace (if any), and name; the `ObjectMeta.UID` is not part of 38// an object's ID as far as this contract is concerned. One 39// SharedInformer provides linkage to objects of a particular API 40// group and kind/resource. The linked object collection of a 41// SharedInformer may be further restricted to one namespace (if 42// applicable) and/or by label selector and/or field selector. 43// 44// The authoritative state of an object is what apiservers provide 45// access to, and an object goes through a strict sequence of states. 46// An object state is either (1) present with a ResourceVersion and 47// other appropriate content or (2) "absent". 48// 49// A SharedInformer maintains a local cache --- exposed by GetStore(), 50// by GetIndexer() in the case of an indexed informer, and possibly by 51// machinery involved in creating and/or accessing the informer --- of 52// the state of each relevant object. This cache is eventually 53// consistent with the authoritative state. This means that, unless 54// prevented by persistent communication problems, if ever a 55// particular object ID X is authoritatively associated with a state S 56// then for every SharedInformer I whose collection includes (X, S) 57// eventually either (1) I's cache associates X with S or a later 58// state of X, (2) I is stopped, or (3) the authoritative state 59// service for X terminates. To be formally complete, we say that the 60// absent state meets any restriction by label selector or field 61// selector. 62// 63// For a given informer and relevant object ID X, the sequence of 64// states that appears in the informer's cache is a subsequence of the 65// states authoritatively associated with X. That is, some states 66// might never appear in the cache but ordering among the appearing 67// states is correct. Note, however, that there is no promise about 68// ordering between states seen for different objects. 69// 70// The local cache starts out empty, and gets populated and updated 71// during `Run()`. 72// 73// As a simple example, if a collection of objects is henceforth 74// unchanging, a SharedInformer is created that links to that 75// collection, and that SharedInformer is `Run()` then that 76// SharedInformer's cache eventually holds an exact copy of that 77// collection (unless it is stopped too soon, the authoritative state 78// service ends, or communication problems between the two 79// persistently thwart achievement). 80// 81// As another simple example, if the local cache ever holds a 82// non-absent state for some object ID and the object is eventually 83// removed from the authoritative state then eventually the object is 84// removed from the local cache (unless the SharedInformer is stopped 85// too soon, the authoritative state service ends, or communication 86// problems persistently thwart the desired result). 87// 88// The keys in the Store are of the form namespace/name for namespaced 89// objects, and are simply the name for non-namespaced objects. 90// Clients can use `MetaNamespaceKeyFunc(obj)` to extract the key for 91// a given object, and `SplitMetaNamespaceKey(key)` to split a key 92// into its constituent parts. 93// 94// Every query against the local cache is answered entirely from one 95// snapshot of the cache's state. Thus, the result of a `List` call 96// will not contain two entries with the same namespace and name. 97// 98// A client is identified here by a ResourceEventHandler. For every 99// update to the SharedInformer's local cache and for every client 100// added before `Run()`, eventually either the SharedInformer is 101// stopped or the client is notified of the update. A client added 102// after `Run()` starts gets a startup batch of notifications of 103// additions of the objects existing in the cache at the time that 104// client was added; also, for every update to the SharedInformer's 105// local cache after that client was added, eventually either the 106// SharedInformer is stopped or that client is notified of that 107// update. Client notifications happen after the corresponding cache 108// update and, in the case of a SharedIndexInformer, after the 109// corresponding index updates. It is possible that additional cache 110// and index updates happen before such a prescribed notification. 111// For a given SharedInformer and client, the notifications are 112// delivered sequentially. For a given SharedInformer, client, and 113// object ID, the notifications are delivered in order. Because 114// `ObjectMeta.UID` has no role in identifying objects, it is possible 115// that when (1) object O1 with ID (e.g. namespace and name) X and 116// `ObjectMeta.UID` U1 in the SharedInformer's local cache is deleted 117// and later (2) another object O2 with ID X and ObjectMeta.UID U2 is 118// created the informer's clients are not notified of (1) and (2) but 119// rather are notified only of an update from O1 to O2. Clients that 120// need to detect such cases might do so by comparing the `ObjectMeta.UID` 121// field of the old and the new object in the code that handles update 122// notifications (i.e. `OnUpdate` method of ResourceEventHandler). 123// 124// A client must process each notification promptly; a SharedInformer 125// is not engineered to deal well with a large backlog of 126// notifications to deliver. Lengthy processing should be passed off 127// to something else, for example through a 128// `client-go/util/workqueue`. 129// 130// A delete notification exposes the last locally known non-absent 131// state, except that its ResourceVersion is replaced with a 132// ResourceVersion in which the object is actually absent. 133type SharedInformer interface { 134 // AddEventHandler adds an event handler to the shared informer using the shared informer's resync 135 // period. Events to a single handler are delivered sequentially, but there is no coordination 136 // between different handlers. 137 AddEventHandler(handler ResourceEventHandler) 138 // AddEventHandlerWithResyncPeriod adds an event handler to the 139 // shared informer with the requested resync period; zero means 140 // this handler does not care about resyncs. The resync operation 141 // consists of delivering to the handler an update notification 142 // for every object in the informer's local cache; it does not add 143 // any interactions with the authoritative storage. Some 144 // informers do no resyncs at all, not even for handlers added 145 // with a non-zero resyncPeriod. For an informer that does 146 // resyncs, and for each handler that requests resyncs, that 147 // informer develops a nominal resync period that is no shorter 148 // than the requested period but may be longer. The actual time 149 // between any two resyncs may be longer than the nominal period 150 // because the implementation takes time to do work and there may 151 // be competing load and scheduling noise. 152 AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) 153 // GetStore returns the informer's local cache as a Store. 154 GetStore() Store 155 // GetController is deprecated, it does nothing useful 156 GetController() Controller 157 // Run starts and runs the shared informer, returning after it stops. 158 // The informer will be stopped when stopCh is closed. 159 Run(stopCh <-chan struct{}) 160 // HasSynced returns true if the shared informer's store has been 161 // informed by at least one full LIST of the authoritative state 162 // of the informer's object collection. This is unrelated to "resync". 163 HasSynced() bool 164 // LastSyncResourceVersion is the resource version observed when last synced with the underlying 165 // store. The value returned is not synchronized with access to the underlying store and is not 166 // thread-safe. 167 LastSyncResourceVersion() string 168 169 // The WatchErrorHandler is called whenever ListAndWatch drops the 170 // connection with an error. After calling this handler, the informer 171 // will backoff and retry. 172 // 173 // The default implementation looks at the error type and tries to log 174 // the error message at an appropriate level. 175 // 176 // There's only one handler, so if you call this multiple times, last one 177 // wins; calling after the informer has been started returns an error. 178 // 179 // The handler is intended for visibility, not to e.g. pause the consumers. 180 // The handler should return quickly - any expensive processing should be 181 // offloaded. 182 SetWatchErrorHandler(handler WatchErrorHandler) error 183} 184 185// SharedIndexInformer provides add and get Indexers ability based on SharedInformer. 186type SharedIndexInformer interface { 187 SharedInformer 188 // AddIndexers add indexers to the informer before it starts. 189 AddIndexers(indexers Indexers) error 190 GetIndexer() Indexer 191} 192 193// NewSharedInformer creates a new instance for the listwatcher. 194func NewSharedInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration) SharedInformer { 195 return NewSharedIndexInformer(lw, exampleObject, defaultEventHandlerResyncPeriod, Indexers{}) 196} 197 198// NewSharedIndexInformer creates a new instance for the listwatcher. 199// The created informer will not do resyncs if the given 200// defaultEventHandlerResyncPeriod is zero. Otherwise: for each 201// handler that with a non-zero requested resync period, whether added 202// before or after the informer starts, the nominal resync period is 203// the requested resync period rounded up to a multiple of the 204// informer's resync checking period. Such an informer's resync 205// checking period is established when the informer starts running, 206// and is the maximum of (a) the minimum of the resync periods 207// requested before the informer starts and the 208// defaultEventHandlerResyncPeriod given here and (b) the constant 209// `minimumResyncPeriod` defined in this file. 210func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer { 211 realClock := &clock.RealClock{} 212 sharedIndexInformer := &sharedIndexInformer{ 213 processor: &sharedProcessor{clock: realClock}, 214 indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers), 215 listerWatcher: lw, 216 objectType: exampleObject, 217 resyncCheckPeriod: defaultEventHandlerResyncPeriod, 218 defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod, 219 cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)), 220 clock: realClock, 221 } 222 return sharedIndexInformer 223} 224 225// InformerSynced is a function that can be used to determine if an informer has synced. This is useful for determining if caches have synced. 226type InformerSynced func() bool 227 228const ( 229 // syncedPollPeriod controls how often you look at the status of your sync funcs 230 syncedPollPeriod = 100 * time.Millisecond 231 232 // initialBufferSize is the initial number of event notifications that can be buffered. 233 initialBufferSize = 1024 234) 235 236// WaitForNamedCacheSync is a wrapper around WaitForCacheSync that generates log messages 237// indicating that the caller identified by name is waiting for syncs, followed by 238// either a successful or failed sync. 239func WaitForNamedCacheSync(controllerName string, stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool { 240 klog.Infof("Waiting for caches to sync for %s", controllerName) 241 242 if !WaitForCacheSync(stopCh, cacheSyncs...) { 243 utilruntime.HandleError(fmt.Errorf("unable to sync caches for %s", controllerName)) 244 return false 245 } 246 247 klog.Infof("Caches are synced for %s ", controllerName) 248 return true 249} 250 251// WaitForCacheSync waits for caches to populate. It returns true if it was successful, false 252// if the controller should shutdown 253// callers should prefer WaitForNamedCacheSync() 254func WaitForCacheSync(stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool { 255 err := wait.PollImmediateUntil(syncedPollPeriod, 256 func() (bool, error) { 257 for _, syncFunc := range cacheSyncs { 258 if !syncFunc() { 259 return false, nil 260 } 261 } 262 return true, nil 263 }, 264 stopCh) 265 if err != nil { 266 klog.V(2).Infof("stop requested") 267 return false 268 } 269 270 klog.V(4).Infof("caches populated") 271 return true 272} 273 274// `*sharedIndexInformer` implements SharedIndexInformer and has three 275// main components. One is an indexed local cache, `indexer Indexer`. 276// The second main component is a Controller that pulls 277// objects/notifications using the ListerWatcher and pushes them into 278// a DeltaFIFO --- whose knownObjects is the informer's local cache 279// --- while concurrently Popping Deltas values from that fifo and 280// processing them with `sharedIndexInformer::HandleDeltas`. Each 281// invocation of HandleDeltas, which is done with the fifo's lock 282// held, processes each Delta in turn. For each Delta this both 283// updates the local cache and stuffs the relevant notification into 284// the sharedProcessor. The third main component is that 285// sharedProcessor, which is responsible for relaying those 286// notifications to each of the informer's clients. 287type sharedIndexInformer struct { 288 indexer Indexer 289 controller Controller 290 291 processor *sharedProcessor 292 cacheMutationDetector MutationDetector 293 294 listerWatcher ListerWatcher 295 296 // objectType is an example object of the type this informer is 297 // expected to handle. Only the type needs to be right, except 298 // that when that is `unstructured.Unstructured` the object's 299 // `"apiVersion"` and `"kind"` must also be right. 300 objectType runtime.Object 301 302 // resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call 303 // shouldResync to check if any of our listeners need a resync. 304 resyncCheckPeriod time.Duration 305 // defaultEventHandlerResyncPeriod is the default resync period for any handlers added via 306 // AddEventHandler (i.e. they don't specify one and just want to use the shared informer's default 307 // value). 308 defaultEventHandlerResyncPeriod time.Duration 309 // clock allows for testability 310 clock clock.Clock 311 312 started, stopped bool 313 startedLock sync.Mutex 314 315 // blockDeltas gives a way to stop all event distribution so that a late event handler 316 // can safely join the shared informer. 317 blockDeltas sync.Mutex 318 319 // Called whenever the ListAndWatch drops the connection with an error. 320 watchErrorHandler WatchErrorHandler 321} 322 323// dummyController hides the fact that a SharedInformer is different from a dedicated one 324// where a caller can `Run`. The run method is disconnected in this case, because higher 325// level logic will decide when to start the SharedInformer and related controller. 326// Because returning information back is always asynchronous, the legacy callers shouldn't 327// notice any change in behavior. 328type dummyController struct { 329 informer *sharedIndexInformer 330} 331 332func (v *dummyController) Run(stopCh <-chan struct{}) { 333} 334 335func (v *dummyController) HasSynced() bool { 336 return v.informer.HasSynced() 337} 338 339func (v *dummyController) LastSyncResourceVersion() string { 340 return "" 341} 342 343type updateNotification struct { 344 oldObj interface{} 345 newObj interface{} 346} 347 348type addNotification struct { 349 newObj interface{} 350} 351 352type deleteNotification struct { 353 oldObj interface{} 354} 355 356func (s *sharedIndexInformer) SetWatchErrorHandler(handler WatchErrorHandler) error { 357 s.startedLock.Lock() 358 defer s.startedLock.Unlock() 359 360 if s.started { 361 return fmt.Errorf("informer has already started") 362 } 363 364 s.watchErrorHandler = handler 365 return nil 366} 367 368func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) { 369 defer utilruntime.HandleCrash() 370 371 fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ 372 KnownObjects: s.indexer, 373 EmitDeltaTypeReplaced: true, 374 }) 375 376 cfg := &Config{ 377 Queue: fifo, 378 ListerWatcher: s.listerWatcher, 379 ObjectType: s.objectType, 380 FullResyncPeriod: s.resyncCheckPeriod, 381 RetryOnError: false, 382 ShouldResync: s.processor.shouldResync, 383 384 Process: s.HandleDeltas, 385 WatchErrorHandler: s.watchErrorHandler, 386 } 387 388 func() { 389 s.startedLock.Lock() 390 defer s.startedLock.Unlock() 391 392 s.controller = New(cfg) 393 s.controller.(*controller).clock = s.clock 394 s.started = true 395 }() 396 397 // Separate stop channel because Processor should be stopped strictly after controller 398 processorStopCh := make(chan struct{}) 399 var wg wait.Group 400 defer wg.Wait() // Wait for Processor to stop 401 defer close(processorStopCh) // Tell Processor to stop 402 wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run) 403 wg.StartWithChannel(processorStopCh, s.processor.run) 404 405 defer func() { 406 s.startedLock.Lock() 407 defer s.startedLock.Unlock() 408 s.stopped = true // Don't want any new listeners 409 }() 410 s.controller.Run(stopCh) 411} 412 413func (s *sharedIndexInformer) HasSynced() bool { 414 s.startedLock.Lock() 415 defer s.startedLock.Unlock() 416 417 if s.controller == nil { 418 return false 419 } 420 return s.controller.HasSynced() 421} 422 423func (s *sharedIndexInformer) LastSyncResourceVersion() string { 424 s.startedLock.Lock() 425 defer s.startedLock.Unlock() 426 427 if s.controller == nil { 428 return "" 429 } 430 return s.controller.LastSyncResourceVersion() 431} 432 433func (s *sharedIndexInformer) GetStore() Store { 434 return s.indexer 435} 436 437func (s *sharedIndexInformer) GetIndexer() Indexer { 438 return s.indexer 439} 440 441func (s *sharedIndexInformer) AddIndexers(indexers Indexers) error { 442 s.startedLock.Lock() 443 defer s.startedLock.Unlock() 444 445 if s.started { 446 return fmt.Errorf("informer has already started") 447 } 448 449 return s.indexer.AddIndexers(indexers) 450} 451 452func (s *sharedIndexInformer) GetController() Controller { 453 return &dummyController{informer: s} 454} 455 456func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) { 457 s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod) 458} 459 460func determineResyncPeriod(desired, check time.Duration) time.Duration { 461 if desired == 0 { 462 return desired 463 } 464 if check == 0 { 465 klog.Warningf("The specified resyncPeriod %v is invalid because this shared informer doesn't support resyncing", desired) 466 return 0 467 } 468 if desired < check { 469 klog.Warningf("The specified resyncPeriod %v is being increased to the minimum resyncCheckPeriod %v", desired, check) 470 return check 471 } 472 return desired 473} 474 475const minimumResyncPeriod = 1 * time.Second 476 477func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) { 478 s.startedLock.Lock() 479 defer s.startedLock.Unlock() 480 481 if s.stopped { 482 klog.V(2).Infof("Handler %v was not added to shared informer because it has stopped already", handler) 483 return 484 } 485 486 if resyncPeriod > 0 { 487 if resyncPeriod < minimumResyncPeriod { 488 klog.Warningf("resyncPeriod %v is too small. Changing it to the minimum allowed value of %v", resyncPeriod, minimumResyncPeriod) 489 resyncPeriod = minimumResyncPeriod 490 } 491 492 if resyncPeriod < s.resyncCheckPeriod { 493 if s.started { 494 klog.Warningf("resyncPeriod %v is smaller than resyncCheckPeriod %v and the informer has already started. Changing it to %v", resyncPeriod, s.resyncCheckPeriod, s.resyncCheckPeriod) 495 resyncPeriod = s.resyncCheckPeriod 496 } else { 497 // if the event handler's resyncPeriod is smaller than the current resyncCheckPeriod, update 498 // resyncCheckPeriod to match resyncPeriod and adjust the resync periods of all the listeners 499 // accordingly 500 s.resyncCheckPeriod = resyncPeriod 501 s.processor.resyncCheckPeriodChanged(resyncPeriod) 502 } 503 } 504 } 505 506 listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize) 507 508 if !s.started { 509 s.processor.addListener(listener) 510 return 511 } 512 513 // in order to safely join, we have to 514 // 1. stop sending add/update/delete notifications 515 // 2. do a list against the store 516 // 3. send synthetic "Add" events to the new handler 517 // 4. unblock 518 s.blockDeltas.Lock() 519 defer s.blockDeltas.Unlock() 520 521 s.processor.addListener(listener) 522 for _, item := range s.indexer.List() { 523 listener.add(addNotification{newObj: item}) 524 } 525} 526 527func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error { 528 s.blockDeltas.Lock() 529 defer s.blockDeltas.Unlock() 530 531 // from oldest to newest 532 for _, d := range obj.(Deltas) { 533 switch d.Type { 534 case Sync, Replaced, Added, Updated: 535 s.cacheMutationDetector.AddObject(d.Object) 536 if old, exists, err := s.indexer.Get(d.Object); err == nil && exists { 537 if err := s.indexer.Update(d.Object); err != nil { 538 return err 539 } 540 541 isSync := false 542 switch { 543 case d.Type == Sync: 544 // Sync events are only propagated to listeners that requested resync 545 isSync = true 546 case d.Type == Replaced: 547 if accessor, err := meta.Accessor(d.Object); err == nil { 548 if oldAccessor, err := meta.Accessor(old); err == nil { 549 // Replaced events that didn't change resourceVersion are treated as resync events 550 // and only propagated to listeners that requested resync 551 isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion() 552 } 553 } 554 } 555 s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync) 556 } else { 557 if err := s.indexer.Add(d.Object); err != nil { 558 return err 559 } 560 s.processor.distribute(addNotification{newObj: d.Object}, false) 561 } 562 case Deleted: 563 if err := s.indexer.Delete(d.Object); err != nil { 564 return err 565 } 566 s.processor.distribute(deleteNotification{oldObj: d.Object}, false) 567 } 568 } 569 return nil 570} 571 572// sharedProcessor has a collection of processorListener and can 573// distribute a notification object to its listeners. There are two 574// kinds of distribute operations. The sync distributions go to a 575// subset of the listeners that (a) is recomputed in the occasional 576// calls to shouldResync and (b) every listener is initially put in. 577// The non-sync distributions go to every listener. 578type sharedProcessor struct { 579 listenersStarted bool 580 listenersLock sync.RWMutex 581 listeners []*processorListener 582 syncingListeners []*processorListener 583 clock clock.Clock 584 wg wait.Group 585} 586 587func (p *sharedProcessor) addListener(listener *processorListener) { 588 p.listenersLock.Lock() 589 defer p.listenersLock.Unlock() 590 591 p.addListenerLocked(listener) 592 if p.listenersStarted { 593 p.wg.Start(listener.run) 594 p.wg.Start(listener.pop) 595 } 596} 597 598func (p *sharedProcessor) addListenerLocked(listener *processorListener) { 599 p.listeners = append(p.listeners, listener) 600 p.syncingListeners = append(p.syncingListeners, listener) 601} 602 603func (p *sharedProcessor) distribute(obj interface{}, sync bool) { 604 p.listenersLock.RLock() 605 defer p.listenersLock.RUnlock() 606 607 if sync { 608 for _, listener := range p.syncingListeners { 609 listener.add(obj) 610 } 611 } else { 612 for _, listener := range p.listeners { 613 listener.add(obj) 614 } 615 } 616} 617 618func (p *sharedProcessor) run(stopCh <-chan struct{}) { 619 func() { 620 p.listenersLock.RLock() 621 defer p.listenersLock.RUnlock() 622 for _, listener := range p.listeners { 623 p.wg.Start(listener.run) 624 p.wg.Start(listener.pop) 625 } 626 p.listenersStarted = true 627 }() 628 <-stopCh 629 p.listenersLock.RLock() 630 defer p.listenersLock.RUnlock() 631 for _, listener := range p.listeners { 632 close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop 633 } 634 p.wg.Wait() // Wait for all .pop() and .run() to stop 635} 636 637// shouldResync queries every listener to determine if any of them need a resync, based on each 638// listener's resyncPeriod. 639func (p *sharedProcessor) shouldResync() bool { 640 p.listenersLock.Lock() 641 defer p.listenersLock.Unlock() 642 643 p.syncingListeners = []*processorListener{} 644 645 resyncNeeded := false 646 now := p.clock.Now() 647 for _, listener := range p.listeners { 648 // need to loop through all the listeners to see if they need to resync so we can prepare any 649 // listeners that are going to be resyncing. 650 if listener.shouldResync(now) { 651 resyncNeeded = true 652 p.syncingListeners = append(p.syncingListeners, listener) 653 listener.determineNextResync(now) 654 } 655 } 656 return resyncNeeded 657} 658 659func (p *sharedProcessor) resyncCheckPeriodChanged(resyncCheckPeriod time.Duration) { 660 p.listenersLock.RLock() 661 defer p.listenersLock.RUnlock() 662 663 for _, listener := range p.listeners { 664 resyncPeriod := determineResyncPeriod(listener.requestedResyncPeriod, resyncCheckPeriod) 665 listener.setResyncPeriod(resyncPeriod) 666 } 667} 668 669// processorListener relays notifications from a sharedProcessor to 670// one ResourceEventHandler --- using two goroutines, two unbuffered 671// channels, and an unbounded ring buffer. The `add(notification)` 672// function sends the given notification to `addCh`. One goroutine 673// runs `pop()`, which pumps notifications from `addCh` to `nextCh` 674// using storage in the ring buffer while `nextCh` is not keeping up. 675// Another goroutine runs `run()`, which receives notifications from 676// `nextCh` and synchronously invokes the appropriate handler method. 677// 678// processorListener also keeps track of the adjusted requested resync 679// period of the listener. 680type processorListener struct { 681 nextCh chan interface{} 682 addCh chan interface{} 683 684 handler ResourceEventHandler 685 686 // pendingNotifications is an unbounded ring buffer that holds all notifications not yet distributed. 687 // There is one per listener, but a failing/stalled listener will have infinite pendingNotifications 688 // added until we OOM. 689 // TODO: This is no worse than before, since reflectors were backed by unbounded DeltaFIFOs, but 690 // we should try to do something better. 691 pendingNotifications buffer.RingGrowing 692 693 // requestedResyncPeriod is how frequently the listener wants a 694 // full resync from the shared informer, but modified by two 695 // adjustments. One is imposing a lower bound, 696 // `minimumResyncPeriod`. The other is another lower bound, the 697 // sharedProcessor's `resyncCheckPeriod`, that is imposed (a) only 698 // in AddEventHandlerWithResyncPeriod invocations made after the 699 // sharedProcessor starts and (b) only if the informer does 700 // resyncs at all. 701 requestedResyncPeriod time.Duration 702 // resyncPeriod is the threshold that will be used in the logic 703 // for this listener. This value differs from 704 // requestedResyncPeriod only when the sharedIndexInformer does 705 // not do resyncs, in which case the value here is zero. The 706 // actual time between resyncs depends on when the 707 // sharedProcessor's `shouldResync` function is invoked and when 708 // the sharedIndexInformer processes `Sync` type Delta objects. 709 resyncPeriod time.Duration 710 // nextResync is the earliest time the listener should get a full resync 711 nextResync time.Time 712 // resyncLock guards access to resyncPeriod and nextResync 713 resyncLock sync.Mutex 714} 715 716func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int) *processorListener { 717 ret := &processorListener{ 718 nextCh: make(chan interface{}), 719 addCh: make(chan interface{}), 720 handler: handler, 721 pendingNotifications: *buffer.NewRingGrowing(bufferSize), 722 requestedResyncPeriod: requestedResyncPeriod, 723 resyncPeriod: resyncPeriod, 724 } 725 726 ret.determineNextResync(now) 727 728 return ret 729} 730 731func (p *processorListener) add(notification interface{}) { 732 p.addCh <- notification 733} 734 735func (p *processorListener) pop() { 736 defer utilruntime.HandleCrash() 737 defer close(p.nextCh) // Tell .run() to stop 738 739 var nextCh chan<- interface{} 740 var notification interface{} 741 for { 742 select { 743 case nextCh <- notification: 744 // Notification dispatched 745 var ok bool 746 notification, ok = p.pendingNotifications.ReadOne() 747 if !ok { // Nothing to pop 748 nextCh = nil // Disable this select case 749 } 750 case notificationToAdd, ok := <-p.addCh: 751 if !ok { 752 return 753 } 754 if notification == nil { // No notification to pop (and pendingNotifications is empty) 755 // Optimize the case - skip adding to pendingNotifications 756 notification = notificationToAdd 757 nextCh = p.nextCh 758 } else { // There is already a notification waiting to be dispatched 759 p.pendingNotifications.WriteOne(notificationToAdd) 760 } 761 } 762 } 763} 764 765func (p *processorListener) run() { 766 // this call blocks until the channel is closed. When a panic happens during the notification 767 // we will catch it, **the offending item will be skipped!**, and after a short delay (one second) 768 // the next notification will be attempted. This is usually better than the alternative of never 769 // delivering again. 770 stopCh := make(chan struct{}) 771 wait.Until(func() { 772 for next := range p.nextCh { 773 switch notification := next.(type) { 774 case updateNotification: 775 p.handler.OnUpdate(notification.oldObj, notification.newObj) 776 case addNotification: 777 p.handler.OnAdd(notification.newObj) 778 case deleteNotification: 779 p.handler.OnDelete(notification.oldObj) 780 default: 781 utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next)) 782 } 783 } 784 // the only way to get here is if the p.nextCh is empty and closed 785 close(stopCh) 786 }, 1*time.Second, stopCh) 787} 788 789// shouldResync deterimines if the listener needs a resync. If the listener's resyncPeriod is 0, 790// this always returns false. 791func (p *processorListener) shouldResync(now time.Time) bool { 792 p.resyncLock.Lock() 793 defer p.resyncLock.Unlock() 794 795 if p.resyncPeriod == 0 { 796 return false 797 } 798 799 return now.After(p.nextResync) || now.Equal(p.nextResync) 800} 801 802func (p *processorListener) determineNextResync(now time.Time) { 803 p.resyncLock.Lock() 804 defer p.resyncLock.Unlock() 805 806 p.nextResync = now.Add(p.resyncPeriod) 807} 808 809func (p *processorListener) setResyncPeriod(resyncPeriod time.Duration) { 810 p.resyncLock.Lock() 811 defer p.resyncLock.Unlock() 812 813 p.resyncPeriod = resyncPeriod 814} 815