1/*
2Copyright 2015 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package cache
18
19import (
20	"fmt"
21	"sync"
22	"time"
23
24	"k8s.io/apimachinery/pkg/api/meta"
25	"k8s.io/apimachinery/pkg/runtime"
26	"k8s.io/apimachinery/pkg/util/clock"
27	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
28	"k8s.io/apimachinery/pkg/util/wait"
29	"k8s.io/utils/buffer"
30
31	"k8s.io/klog/v2"
32)
33
34// SharedInformer provides eventually consistent linkage of its
35// clients to the authoritative state of a given collection of
36// objects.  An object is identified by its API group, kind/resource,
37// namespace (if any), and name; the `ObjectMeta.UID` is not part of
38// an object's ID as far as this contract is concerned.  One
39// SharedInformer provides linkage to objects of a particular API
40// group and kind/resource.  The linked object collection of a
41// SharedInformer may be further restricted to one namespace (if
42// applicable) and/or by label selector and/or field selector.
43//
44// The authoritative state of an object is what apiservers provide
45// access to, and an object goes through a strict sequence of states.
46// An object state is either (1) present with a ResourceVersion and
47// other appropriate content or (2) "absent".
48//
49// A SharedInformer maintains a local cache --- exposed by GetStore(),
50// by GetIndexer() in the case of an indexed informer, and possibly by
51// machinery involved in creating and/or accessing the informer --- of
52// the state of each relevant object.  This cache is eventually
53// consistent with the authoritative state.  This means that, unless
54// prevented by persistent communication problems, if ever a
55// particular object ID X is authoritatively associated with a state S
56// then for every SharedInformer I whose collection includes (X, S)
57// eventually either (1) I's cache associates X with S or a later
58// state of X, (2) I is stopped, or (3) the authoritative state
59// service for X terminates.  To be formally complete, we say that the
60// absent state meets any restriction by label selector or field
61// selector.
62//
63// For a given informer and relevant object ID X, the sequence of
64// states that appears in the informer's cache is a subsequence of the
65// states authoritatively associated with X.  That is, some states
66// might never appear in the cache but ordering among the appearing
67// states is correct.  Note, however, that there is no promise about
68// ordering between states seen for different objects.
69//
70// The local cache starts out empty, and gets populated and updated
71// during `Run()`.
72//
73// As a simple example, if a collection of objects is henceforth
74// unchanging, a SharedInformer is created that links to that
75// collection, and that SharedInformer is `Run()` then that
76// SharedInformer's cache eventually holds an exact copy of that
77// collection (unless it is stopped too soon, the authoritative state
78// service ends, or communication problems between the two
79// persistently thwart achievement).
80//
81// As another simple example, if the local cache ever holds a
82// non-absent state for some object ID and the object is eventually
83// removed from the authoritative state then eventually the object is
84// removed from the local cache (unless the SharedInformer is stopped
85// too soon, the authoritative state service ends, or communication
86// problems persistently thwart the desired result).
87//
88// The keys in the Store are of the form namespace/name for namespaced
89// objects, and are simply the name for non-namespaced objects.
90// Clients can use `MetaNamespaceKeyFunc(obj)` to extract the key for
91// a given object, and `SplitMetaNamespaceKey(key)` to split a key
92// into its constituent parts.
93//
94// Every query against the local cache is answered entirely from one
95// snapshot of the cache's state.  Thus, the result of a `List` call
96// will not contain two entries with the same namespace and name.
97//
98// A client is identified here by a ResourceEventHandler.  For every
99// update to the SharedInformer's local cache and for every client
100// added before `Run()`, eventually either the SharedInformer is
101// stopped or the client is notified of the update.  A client added
102// after `Run()` starts gets a startup batch of notifications of
103// additions of the objects existing in the cache at the time that
104// client was added; also, for every update to the SharedInformer's
105// local cache after that client was added, eventually either the
106// SharedInformer is stopped or that client is notified of that
107// update.  Client notifications happen after the corresponding cache
108// update and, in the case of a SharedIndexInformer, after the
109// corresponding index updates.  It is possible that additional cache
110// and index updates happen before such a prescribed notification.
111// For a given SharedInformer and client, the notifications are
112// delivered sequentially.  For a given SharedInformer, client, and
113// object ID, the notifications are delivered in order.  Because
114// `ObjectMeta.UID` has no role in identifying objects, it is possible
115// that when (1) object O1 with ID (e.g. namespace and name) X and
116// `ObjectMeta.UID` U1 in the SharedInformer's local cache is deleted
117// and later (2) another object O2 with ID X and ObjectMeta.UID U2 is
118// created the informer's clients are not notified of (1) and (2) but
119// rather are notified only of an update from O1 to O2. Clients that
120// need to detect such cases might do so by comparing the `ObjectMeta.UID`
121// field of the old and the new object in the code that handles update
122// notifications (i.e. `OnUpdate` method of ResourceEventHandler).
123//
124// A client must process each notification promptly; a SharedInformer
125// is not engineered to deal well with a large backlog of
126// notifications to deliver.  Lengthy processing should be passed off
127// to something else, for example through a
128// `client-go/util/workqueue`.
129//
130// A delete notification exposes the last locally known non-absent
131// state, except that its ResourceVersion is replaced with a
132// ResourceVersion in which the object is actually absent.
133type SharedInformer interface {
134	// AddEventHandler adds an event handler to the shared informer using the shared informer's resync
135	// period.  Events to a single handler are delivered sequentially, but there is no coordination
136	// between different handlers.
137	AddEventHandler(handler ResourceEventHandler)
138	// AddEventHandlerWithResyncPeriod adds an event handler to the
139	// shared informer with the requested resync period; zero means
140	// this handler does not care about resyncs.  The resync operation
141	// consists of delivering to the handler an update notification
142	// for every object in the informer's local cache; it does not add
143	// any interactions with the authoritative storage.  Some
144	// informers do no resyncs at all, not even for handlers added
145	// with a non-zero resyncPeriod.  For an informer that does
146	// resyncs, and for each handler that requests resyncs, that
147	// informer develops a nominal resync period that is no shorter
148	// than the requested period but may be longer.  The actual time
149	// between any two resyncs may be longer than the nominal period
150	// because the implementation takes time to do work and there may
151	// be competing load and scheduling noise.
152	AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration)
153	// GetStore returns the informer's local cache as a Store.
154	GetStore() Store
155	// GetController is deprecated, it does nothing useful
156	GetController() Controller
157	// Run starts and runs the shared informer, returning after it stops.
158	// The informer will be stopped when stopCh is closed.
159	Run(stopCh <-chan struct{})
160	// HasSynced returns true if the shared informer's store has been
161	// informed by at least one full LIST of the authoritative state
162	// of the informer's object collection.  This is unrelated to "resync".
163	HasSynced() bool
164	// LastSyncResourceVersion is the resource version observed when last synced with the underlying
165	// store. The value returned is not synchronized with access to the underlying store and is not
166	// thread-safe.
167	LastSyncResourceVersion() string
168
169	// The WatchErrorHandler is called whenever ListAndWatch drops the
170	// connection with an error. After calling this handler, the informer
171	// will backoff and retry.
172	//
173	// The default implementation looks at the error type and tries to log
174	// the error message at an appropriate level.
175	//
176	// There's only one handler, so if you call this multiple times, last one
177	// wins; calling after the informer has been started returns an error.
178	//
179	// The handler is intended for visibility, not to e.g. pause the consumers.
180	// The handler should return quickly - any expensive processing should be
181	// offloaded.
182	SetWatchErrorHandler(handler WatchErrorHandler) error
183}
184
185// SharedIndexInformer provides add and get Indexers ability based on SharedInformer.
186type SharedIndexInformer interface {
187	SharedInformer
188	// AddIndexers add indexers to the informer before it starts.
189	AddIndexers(indexers Indexers) error
190	GetIndexer() Indexer
191}
192
193// NewSharedInformer creates a new instance for the listwatcher.
194func NewSharedInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration) SharedInformer {
195	return NewSharedIndexInformer(lw, exampleObject, defaultEventHandlerResyncPeriod, Indexers{})
196}
197
198// NewSharedIndexInformer creates a new instance for the listwatcher.
199// The created informer will not do resyncs if the given
200// defaultEventHandlerResyncPeriod is zero.  Otherwise: for each
201// handler that with a non-zero requested resync period, whether added
202// before or after the informer starts, the nominal resync period is
203// the requested resync period rounded up to a multiple of the
204// informer's resync checking period.  Such an informer's resync
205// checking period is established when the informer starts running,
206// and is the maximum of (a) the minimum of the resync periods
207// requested before the informer starts and the
208// defaultEventHandlerResyncPeriod given here and (b) the constant
209// `minimumResyncPeriod` defined in this file.
210func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
211	realClock := &clock.RealClock{}
212	sharedIndexInformer := &sharedIndexInformer{
213		processor:                       &sharedProcessor{clock: realClock},
214		indexer:                         NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
215		listerWatcher:                   lw,
216		objectType:                      exampleObject,
217		resyncCheckPeriod:               defaultEventHandlerResyncPeriod,
218		defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
219		cacheMutationDetector:           NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)),
220		clock:                           realClock,
221	}
222	return sharedIndexInformer
223}
224
225// InformerSynced is a function that can be used to determine if an informer has synced.  This is useful for determining if caches have synced.
226type InformerSynced func() bool
227
228const (
229	// syncedPollPeriod controls how often you look at the status of your sync funcs
230	syncedPollPeriod = 100 * time.Millisecond
231
232	// initialBufferSize is the initial number of event notifications that can be buffered.
233	initialBufferSize = 1024
234)
235
236// WaitForNamedCacheSync is a wrapper around WaitForCacheSync that generates log messages
237// indicating that the caller identified by name is waiting for syncs, followed by
238// either a successful or failed sync.
239func WaitForNamedCacheSync(controllerName string, stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool {
240	klog.Infof("Waiting for caches to sync for %s", controllerName)
241
242	if !WaitForCacheSync(stopCh, cacheSyncs...) {
243		utilruntime.HandleError(fmt.Errorf("unable to sync caches for %s", controllerName))
244		return false
245	}
246
247	klog.Infof("Caches are synced for %s ", controllerName)
248	return true
249}
250
251// WaitForCacheSync waits for caches to populate.  It returns true if it was successful, false
252// if the controller should shutdown
253// callers should prefer WaitForNamedCacheSync()
254func WaitForCacheSync(stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool {
255	err := wait.PollImmediateUntil(syncedPollPeriod,
256		func() (bool, error) {
257			for _, syncFunc := range cacheSyncs {
258				if !syncFunc() {
259					return false, nil
260				}
261			}
262			return true, nil
263		},
264		stopCh)
265	if err != nil {
266		klog.V(2).Infof("stop requested")
267		return false
268	}
269
270	klog.V(4).Infof("caches populated")
271	return true
272}
273
274// `*sharedIndexInformer` implements SharedIndexInformer and has three
275// main components.  One is an indexed local cache, `indexer Indexer`.
276// The second main component is a Controller that pulls
277// objects/notifications using the ListerWatcher and pushes them into
278// a DeltaFIFO --- whose knownObjects is the informer's local cache
279// --- while concurrently Popping Deltas values from that fifo and
280// processing them with `sharedIndexInformer::HandleDeltas`.  Each
281// invocation of HandleDeltas, which is done with the fifo's lock
282// held, processes each Delta in turn.  For each Delta this both
283// updates the local cache and stuffs the relevant notification into
284// the sharedProcessor.  The third main component is that
285// sharedProcessor, which is responsible for relaying those
286// notifications to each of the informer's clients.
287type sharedIndexInformer struct {
288	indexer    Indexer
289	controller Controller
290
291	processor             *sharedProcessor
292	cacheMutationDetector MutationDetector
293
294	listerWatcher ListerWatcher
295
296	// objectType is an example object of the type this informer is
297	// expected to handle.  Only the type needs to be right, except
298	// that when that is `unstructured.Unstructured` the object's
299	// `"apiVersion"` and `"kind"` must also be right.
300	objectType runtime.Object
301
302	// resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call
303	// shouldResync to check if any of our listeners need a resync.
304	resyncCheckPeriod time.Duration
305	// defaultEventHandlerResyncPeriod is the default resync period for any handlers added via
306	// AddEventHandler (i.e. they don't specify one and just want to use the shared informer's default
307	// value).
308	defaultEventHandlerResyncPeriod time.Duration
309	// clock allows for testability
310	clock clock.Clock
311
312	started, stopped bool
313	startedLock      sync.Mutex
314
315	// blockDeltas gives a way to stop all event distribution so that a late event handler
316	// can safely join the shared informer.
317	blockDeltas sync.Mutex
318
319	// Called whenever the ListAndWatch drops the connection with an error.
320	watchErrorHandler WatchErrorHandler
321}
322
323// dummyController hides the fact that a SharedInformer is different from a dedicated one
324// where a caller can `Run`.  The run method is disconnected in this case, because higher
325// level logic will decide when to start the SharedInformer and related controller.
326// Because returning information back is always asynchronous, the legacy callers shouldn't
327// notice any change in behavior.
328type dummyController struct {
329	informer *sharedIndexInformer
330}
331
332func (v *dummyController) Run(stopCh <-chan struct{}) {
333}
334
335func (v *dummyController) HasSynced() bool {
336	return v.informer.HasSynced()
337}
338
339func (v *dummyController) LastSyncResourceVersion() string {
340	return ""
341}
342
343type updateNotification struct {
344	oldObj interface{}
345	newObj interface{}
346}
347
348type addNotification struct {
349	newObj interface{}
350}
351
352type deleteNotification struct {
353	oldObj interface{}
354}
355
356func (s *sharedIndexInformer) SetWatchErrorHandler(handler WatchErrorHandler) error {
357	s.startedLock.Lock()
358	defer s.startedLock.Unlock()
359
360	if s.started {
361		return fmt.Errorf("informer has already started")
362	}
363
364	s.watchErrorHandler = handler
365	return nil
366}
367
368func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
369	defer utilruntime.HandleCrash()
370
371	fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
372		KnownObjects:          s.indexer,
373		EmitDeltaTypeReplaced: true,
374	})
375
376	cfg := &Config{
377		Queue:            fifo,
378		ListerWatcher:    s.listerWatcher,
379		ObjectType:       s.objectType,
380		FullResyncPeriod: s.resyncCheckPeriod,
381		RetryOnError:     false,
382		ShouldResync:     s.processor.shouldResync,
383
384		Process:           s.HandleDeltas,
385		WatchErrorHandler: s.watchErrorHandler,
386	}
387
388	func() {
389		s.startedLock.Lock()
390		defer s.startedLock.Unlock()
391
392		s.controller = New(cfg)
393		s.controller.(*controller).clock = s.clock
394		s.started = true
395	}()
396
397	// Separate stop channel because Processor should be stopped strictly after controller
398	processorStopCh := make(chan struct{})
399	var wg wait.Group
400	defer wg.Wait()              // Wait for Processor to stop
401	defer close(processorStopCh) // Tell Processor to stop
402	wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
403	wg.StartWithChannel(processorStopCh, s.processor.run)
404
405	defer func() {
406		s.startedLock.Lock()
407		defer s.startedLock.Unlock()
408		s.stopped = true // Don't want any new listeners
409	}()
410	s.controller.Run(stopCh)
411}
412
413func (s *sharedIndexInformer) HasSynced() bool {
414	s.startedLock.Lock()
415	defer s.startedLock.Unlock()
416
417	if s.controller == nil {
418		return false
419	}
420	return s.controller.HasSynced()
421}
422
423func (s *sharedIndexInformer) LastSyncResourceVersion() string {
424	s.startedLock.Lock()
425	defer s.startedLock.Unlock()
426
427	if s.controller == nil {
428		return ""
429	}
430	return s.controller.LastSyncResourceVersion()
431}
432
433func (s *sharedIndexInformer) GetStore() Store {
434	return s.indexer
435}
436
437func (s *sharedIndexInformer) GetIndexer() Indexer {
438	return s.indexer
439}
440
441func (s *sharedIndexInformer) AddIndexers(indexers Indexers) error {
442	s.startedLock.Lock()
443	defer s.startedLock.Unlock()
444
445	if s.started {
446		return fmt.Errorf("informer has already started")
447	}
448
449	return s.indexer.AddIndexers(indexers)
450}
451
452func (s *sharedIndexInformer) GetController() Controller {
453	return &dummyController{informer: s}
454}
455
456func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) {
457	s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod)
458}
459
460func determineResyncPeriod(desired, check time.Duration) time.Duration {
461	if desired == 0 {
462		return desired
463	}
464	if check == 0 {
465		klog.Warningf("The specified resyncPeriod %v is invalid because this shared informer doesn't support resyncing", desired)
466		return 0
467	}
468	if desired < check {
469		klog.Warningf("The specified resyncPeriod %v is being increased to the minimum resyncCheckPeriod %v", desired, check)
470		return check
471	}
472	return desired
473}
474
475const minimumResyncPeriod = 1 * time.Second
476
477func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) {
478	s.startedLock.Lock()
479	defer s.startedLock.Unlock()
480
481	if s.stopped {
482		klog.V(2).Infof("Handler %v was not added to shared informer because it has stopped already", handler)
483		return
484	}
485
486	if resyncPeriod > 0 {
487		if resyncPeriod < minimumResyncPeriod {
488			klog.Warningf("resyncPeriod %v is too small. Changing it to the minimum allowed value of %v", resyncPeriod, minimumResyncPeriod)
489			resyncPeriod = minimumResyncPeriod
490		}
491
492		if resyncPeriod < s.resyncCheckPeriod {
493			if s.started {
494				klog.Warningf("resyncPeriod %v is smaller than resyncCheckPeriod %v and the informer has already started. Changing it to %v", resyncPeriod, s.resyncCheckPeriod, s.resyncCheckPeriod)
495				resyncPeriod = s.resyncCheckPeriod
496			} else {
497				// if the event handler's resyncPeriod is smaller than the current resyncCheckPeriod, update
498				// resyncCheckPeriod to match resyncPeriod and adjust the resync periods of all the listeners
499				// accordingly
500				s.resyncCheckPeriod = resyncPeriod
501				s.processor.resyncCheckPeriodChanged(resyncPeriod)
502			}
503		}
504	}
505
506	listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize)
507
508	if !s.started {
509		s.processor.addListener(listener)
510		return
511	}
512
513	// in order to safely join, we have to
514	// 1. stop sending add/update/delete notifications
515	// 2. do a list against the store
516	// 3. send synthetic "Add" events to the new handler
517	// 4. unblock
518	s.blockDeltas.Lock()
519	defer s.blockDeltas.Unlock()
520
521	s.processor.addListener(listener)
522	for _, item := range s.indexer.List() {
523		listener.add(addNotification{newObj: item})
524	}
525}
526
527func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
528	s.blockDeltas.Lock()
529	defer s.blockDeltas.Unlock()
530
531	// from oldest to newest
532	for _, d := range obj.(Deltas) {
533		switch d.Type {
534		case Sync, Replaced, Added, Updated:
535			s.cacheMutationDetector.AddObject(d.Object)
536			if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
537				if err := s.indexer.Update(d.Object); err != nil {
538					return err
539				}
540
541				isSync := false
542				switch {
543				case d.Type == Sync:
544					// Sync events are only propagated to listeners that requested resync
545					isSync = true
546				case d.Type == Replaced:
547					if accessor, err := meta.Accessor(d.Object); err == nil {
548						if oldAccessor, err := meta.Accessor(old); err == nil {
549							// Replaced events that didn't change resourceVersion are treated as resync events
550							// and only propagated to listeners that requested resync
551							isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
552						}
553					}
554				}
555				s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
556			} else {
557				if err := s.indexer.Add(d.Object); err != nil {
558					return err
559				}
560				s.processor.distribute(addNotification{newObj: d.Object}, false)
561			}
562		case Deleted:
563			if err := s.indexer.Delete(d.Object); err != nil {
564				return err
565			}
566			s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
567		}
568	}
569	return nil
570}
571
572// sharedProcessor has a collection of processorListener and can
573// distribute a notification object to its listeners.  There are two
574// kinds of distribute operations.  The sync distributions go to a
575// subset of the listeners that (a) is recomputed in the occasional
576// calls to shouldResync and (b) every listener is initially put in.
577// The non-sync distributions go to every listener.
578type sharedProcessor struct {
579	listenersStarted bool
580	listenersLock    sync.RWMutex
581	listeners        []*processorListener
582	syncingListeners []*processorListener
583	clock            clock.Clock
584	wg               wait.Group
585}
586
587func (p *sharedProcessor) addListener(listener *processorListener) {
588	p.listenersLock.Lock()
589	defer p.listenersLock.Unlock()
590
591	p.addListenerLocked(listener)
592	if p.listenersStarted {
593		p.wg.Start(listener.run)
594		p.wg.Start(listener.pop)
595	}
596}
597
598func (p *sharedProcessor) addListenerLocked(listener *processorListener) {
599	p.listeners = append(p.listeners, listener)
600	p.syncingListeners = append(p.syncingListeners, listener)
601}
602
603func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
604	p.listenersLock.RLock()
605	defer p.listenersLock.RUnlock()
606
607	if sync {
608		for _, listener := range p.syncingListeners {
609			listener.add(obj)
610		}
611	} else {
612		for _, listener := range p.listeners {
613			listener.add(obj)
614		}
615	}
616}
617
618func (p *sharedProcessor) run(stopCh <-chan struct{}) {
619	func() {
620		p.listenersLock.RLock()
621		defer p.listenersLock.RUnlock()
622		for _, listener := range p.listeners {
623			p.wg.Start(listener.run)
624			p.wg.Start(listener.pop)
625		}
626		p.listenersStarted = true
627	}()
628	<-stopCh
629	p.listenersLock.RLock()
630	defer p.listenersLock.RUnlock()
631	for _, listener := range p.listeners {
632		close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
633	}
634	p.wg.Wait() // Wait for all .pop() and .run() to stop
635}
636
637// shouldResync queries every listener to determine if any of them need a resync, based on each
638// listener's resyncPeriod.
639func (p *sharedProcessor) shouldResync() bool {
640	p.listenersLock.Lock()
641	defer p.listenersLock.Unlock()
642
643	p.syncingListeners = []*processorListener{}
644
645	resyncNeeded := false
646	now := p.clock.Now()
647	for _, listener := range p.listeners {
648		// need to loop through all the listeners to see if they need to resync so we can prepare any
649		// listeners that are going to be resyncing.
650		if listener.shouldResync(now) {
651			resyncNeeded = true
652			p.syncingListeners = append(p.syncingListeners, listener)
653			listener.determineNextResync(now)
654		}
655	}
656	return resyncNeeded
657}
658
659func (p *sharedProcessor) resyncCheckPeriodChanged(resyncCheckPeriod time.Duration) {
660	p.listenersLock.RLock()
661	defer p.listenersLock.RUnlock()
662
663	for _, listener := range p.listeners {
664		resyncPeriod := determineResyncPeriod(listener.requestedResyncPeriod, resyncCheckPeriod)
665		listener.setResyncPeriod(resyncPeriod)
666	}
667}
668
669// processorListener relays notifications from a sharedProcessor to
670// one ResourceEventHandler --- using two goroutines, two unbuffered
671// channels, and an unbounded ring buffer.  The `add(notification)`
672// function sends the given notification to `addCh`.  One goroutine
673// runs `pop()`, which pumps notifications from `addCh` to `nextCh`
674// using storage in the ring buffer while `nextCh` is not keeping up.
675// Another goroutine runs `run()`, which receives notifications from
676// `nextCh` and synchronously invokes the appropriate handler method.
677//
678// processorListener also keeps track of the adjusted requested resync
679// period of the listener.
680type processorListener struct {
681	nextCh chan interface{}
682	addCh  chan interface{}
683
684	handler ResourceEventHandler
685
686	// pendingNotifications is an unbounded ring buffer that holds all notifications not yet distributed.
687	// There is one per listener, but a failing/stalled listener will have infinite pendingNotifications
688	// added until we OOM.
689	// TODO: This is no worse than before, since reflectors were backed by unbounded DeltaFIFOs, but
690	// we should try to do something better.
691	pendingNotifications buffer.RingGrowing
692
693	// requestedResyncPeriod is how frequently the listener wants a
694	// full resync from the shared informer, but modified by two
695	// adjustments.  One is imposing a lower bound,
696	// `minimumResyncPeriod`.  The other is another lower bound, the
697	// sharedProcessor's `resyncCheckPeriod`, that is imposed (a) only
698	// in AddEventHandlerWithResyncPeriod invocations made after the
699	// sharedProcessor starts and (b) only if the informer does
700	// resyncs at all.
701	requestedResyncPeriod time.Duration
702	// resyncPeriod is the threshold that will be used in the logic
703	// for this listener.  This value differs from
704	// requestedResyncPeriod only when the sharedIndexInformer does
705	// not do resyncs, in which case the value here is zero.  The
706	// actual time between resyncs depends on when the
707	// sharedProcessor's `shouldResync` function is invoked and when
708	// the sharedIndexInformer processes `Sync` type Delta objects.
709	resyncPeriod time.Duration
710	// nextResync is the earliest time the listener should get a full resync
711	nextResync time.Time
712	// resyncLock guards access to resyncPeriod and nextResync
713	resyncLock sync.Mutex
714}
715
716func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int) *processorListener {
717	ret := &processorListener{
718		nextCh:                make(chan interface{}),
719		addCh:                 make(chan interface{}),
720		handler:               handler,
721		pendingNotifications:  *buffer.NewRingGrowing(bufferSize),
722		requestedResyncPeriod: requestedResyncPeriod,
723		resyncPeriod:          resyncPeriod,
724	}
725
726	ret.determineNextResync(now)
727
728	return ret
729}
730
731func (p *processorListener) add(notification interface{}) {
732	p.addCh <- notification
733}
734
735func (p *processorListener) pop() {
736	defer utilruntime.HandleCrash()
737	defer close(p.nextCh) // Tell .run() to stop
738
739	var nextCh chan<- interface{}
740	var notification interface{}
741	for {
742		select {
743		case nextCh <- notification:
744			// Notification dispatched
745			var ok bool
746			notification, ok = p.pendingNotifications.ReadOne()
747			if !ok { // Nothing to pop
748				nextCh = nil // Disable this select case
749			}
750		case notificationToAdd, ok := <-p.addCh:
751			if !ok {
752				return
753			}
754			if notification == nil { // No notification to pop (and pendingNotifications is empty)
755				// Optimize the case - skip adding to pendingNotifications
756				notification = notificationToAdd
757				nextCh = p.nextCh
758			} else { // There is already a notification waiting to be dispatched
759				p.pendingNotifications.WriteOne(notificationToAdd)
760			}
761		}
762	}
763}
764
765func (p *processorListener) run() {
766	// this call blocks until the channel is closed.  When a panic happens during the notification
767	// we will catch it, **the offending item will be skipped!**, and after a short delay (one second)
768	// the next notification will be attempted.  This is usually better than the alternative of never
769	// delivering again.
770	stopCh := make(chan struct{})
771	wait.Until(func() {
772		for next := range p.nextCh {
773			switch notification := next.(type) {
774			case updateNotification:
775				p.handler.OnUpdate(notification.oldObj, notification.newObj)
776			case addNotification:
777				p.handler.OnAdd(notification.newObj)
778			case deleteNotification:
779				p.handler.OnDelete(notification.oldObj)
780			default:
781				utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
782			}
783		}
784		// the only way to get here is if the p.nextCh is empty and closed
785		close(stopCh)
786	}, 1*time.Second, stopCh)
787}
788
789// shouldResync deterimines if the listener needs a resync. If the listener's resyncPeriod is 0,
790// this always returns false.
791func (p *processorListener) shouldResync(now time.Time) bool {
792	p.resyncLock.Lock()
793	defer p.resyncLock.Unlock()
794
795	if p.resyncPeriod == 0 {
796		return false
797	}
798
799	return now.After(p.nextResync) || now.Equal(p.nextResync)
800}
801
802func (p *processorListener) determineNextResync(now time.Time) {
803	p.resyncLock.Lock()
804	defer p.resyncLock.Unlock()
805
806	p.nextResync = now.Add(p.resyncPeriod)
807}
808
809func (p *processorListener) setResyncPeriod(resyncPeriod time.Duration) {
810	p.resyncLock.Lock()
811	defer p.resyncLock.Unlock()
812
813	p.resyncPeriod = resyncPeriod
814}
815