1/*
2Copyright 2015 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package cache
18
19import (
20	"fmt"
21	"sync"
22	"time"
23
24	"k8s.io/apimachinery/pkg/api/meta"
25	"k8s.io/apimachinery/pkg/runtime"
26	"k8s.io/apimachinery/pkg/util/clock"
27	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
28	"k8s.io/apimachinery/pkg/util/wait"
29	"k8s.io/utils/buffer"
30
31	"k8s.io/klog"
32)
33
34// SharedInformer provides eventually consistent linkage of its
35// clients to the authoritative state of a given collection of
36// objects.  An object is identified by its API group, kind/resource,
37// namespace, and name; the `ObjectMeta.UID` is not part of an
38// object's ID as far as this contract is concerned.  One
39// SharedInformer provides linkage to objects of a particular API
40// group and kind/resource.  The linked object collection of a
41// SharedInformer may be further restricted to one namespace and/or by
42// label selector and/or field selector.
43//
44// The authoritative state of an object is what apiservers provide
45// access to, and an object goes through a strict sequence of states.
46// An object state is either "absent" or present with a
47// ResourceVersion and other appropriate content.
48//
49// A SharedInformer maintains a local cache, exposed by GetStore() and
50// by GetIndexer() in the case of an indexed informer, of the state of
51// each relevant object.  This cache is eventually consistent with the
52// authoritative state.  This means that, unless prevented by
53// persistent communication problems, if ever a particular object ID X
54// is authoritatively associated with a state S then for every
55// SharedInformer I whose collection includes (X, S) eventually either
56// (1) I's cache associates X with S or a later state of X, (2) I is
57// stopped, or (3) the authoritative state service for X terminates.
58// To be formally complete, we say that the absent state meets any
59// restriction by label selector or field selector.
60//
61// For a given informer and relevant object ID X, the sequence of
62// states that appears in the informer's cache is a subsequence of the
63// states authoritatively associated with X.  That is, some states
64// might never appear in the cache but ordering among the appearing
65// states is correct.  Note, however, that there is no promise about
66// ordering between states seen for different objects.
67//
68// The local cache starts out empty, and gets populated and updated
69// during `Run()`.
70//
71// As a simple example, if a collection of objects is henceforth
72// unchanging, a SharedInformer is created that links to that
73// collection, and that SharedInformer is `Run()` then that
74// SharedInformer's cache eventually holds an exact copy of that
75// collection (unless it is stopped too soon, the authoritative state
76// service ends, or communication problems between the two
77// persistently thwart achievement).
78//
79// As another simple example, if the local cache ever holds a
80// non-absent state for some object ID and the object is eventually
81// removed from the authoritative state then eventually the object is
82// removed from the local cache (unless the SharedInformer is stopped
83// too soon, the authoritative state service ends, or communication
84// problems persistently thwart the desired result).
85//
86// The keys in the Store are of the form namespace/name for namespaced
87// objects, and are simply the name for non-namespaced objects.
88// Clients can use `MetaNamespaceKeyFunc(obj)` to extract the key for
89// a given object, and `SplitMetaNamespaceKey(key)` to split a key
90// into its constituent parts.
91//
92// Every query against the local cache is answered entirely from one
93// snapshot of the cache's state.  Thus, the result of a `List` call
94// will not contain two entries with the same namespace and name.
95//
96// A client is identified here by a ResourceEventHandler.  For every
97// update to the SharedInformer's local cache and for every client
98// added before `Run()`, eventually either the SharedInformer is
99// stopped or the client is notified of the update.  A client added
100// after `Run()` starts gets a startup batch of notifications of
101// additions of the object existing in the cache at the time that
102// client was added; also, for every update to the SharedInformer's
103// local cache after that client was added, eventually either the
104// SharedInformer is stopped or that client is notified of that
105// update.  Client notifications happen after the corresponding cache
106// update and, in the case of a SharedIndexInformer, after the
107// corresponding index updates.  It is possible that additional cache
108// and index updates happen before such a prescribed notification.
109// For a given SharedInformer and client, the notifications are
110// delivered sequentially.  For a given SharedInformer, client, and
111// object ID, the notifications are delivered in order.  Because
112// `ObjectMeta.UID` has no role in identifying objects, it is possible
113// that when (1) object O1 with ID (e.g. namespace and name) X and
114// `ObjectMeta.UID` U1 in the SharedInformer's local cache is deleted
115// and later (2) another object O2 with ID X and ObjectMeta.UID U2 is
116// created the informer's clients are not notified of (1) and (2) but
117// rather are notified only of an update from O1 to O2. Clients that
118// need to detect such cases might do so by comparing the `ObjectMeta.UID`
119// field of the old and the new object in the code that handles update
120// notifications (i.e. `OnUpdate` method of ResourceEventHandler).
121//
122// A client must process each notification promptly; a SharedInformer
123// is not engineered to deal well with a large backlog of
124// notifications to deliver.  Lengthy processing should be passed off
125// to something else, for example through a
126// `client-go/util/workqueue`.
127//
128// A delete notification exposes the last locally known non-absent
129// state, except that its ResourceVersion is replaced with a
130// ResourceVersion in which the object is actually absent.
131type SharedInformer interface {
132	// AddEventHandler adds an event handler to the shared informer using the shared informer's resync
133	// period.  Events to a single handler are delivered sequentially, but there is no coordination
134	// between different handlers.
135	AddEventHandler(handler ResourceEventHandler)
136	// AddEventHandlerWithResyncPeriod adds an event handler to the
137	// shared informer with the requested resync period; zero means
138	// this handler does not care about resyncs.  The resync operation
139	// consists of delivering to the handler an update notification
140	// for every object in the informer's local cache; it does not add
141	// any interactions with the authoritative storage.  Some
142	// informers do no resyncs at all, not even for handlers added
143	// with a non-zero resyncPeriod.  For an informer that does
144	// resyncs, and for each handler that requests resyncs, that
145	// informer develops a nominal resync period that is no shorter
146	// than the requested period but may be longer.  The actual time
147	// between any two resyncs may be longer than the nominal period
148	// because the implementation takes time to do work and there may
149	// be competing load and scheduling noise.
150	AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration)
151	// GetStore returns the informer's local cache as a Store.
152	GetStore() Store
153	// GetController is deprecated, it does nothing useful
154	GetController() Controller
155	// Run starts and runs the shared informer, returning after it stops.
156	// The informer will be stopped when stopCh is closed.
157	Run(stopCh <-chan struct{})
158	// HasSynced returns true if the shared informer's store has been
159	// informed by at least one full LIST of the authoritative state
160	// of the informer's object collection.  This is unrelated to "resync".
161	HasSynced() bool
162	// LastSyncResourceVersion is the resource version observed when last synced with the underlying
163	// store. The value returned is not synchronized with access to the underlying store and is not
164	// thread-safe.
165	LastSyncResourceVersion() string
166}
167
168// SharedIndexInformer provides add and get Indexers ability based on SharedInformer.
169type SharedIndexInformer interface {
170	SharedInformer
171	// AddIndexers add indexers to the informer before it starts.
172	AddIndexers(indexers Indexers) error
173	GetIndexer() Indexer
174}
175
176// NewSharedInformer creates a new instance for the listwatcher.
177func NewSharedInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration) SharedInformer {
178	return NewSharedIndexInformer(lw, exampleObject, defaultEventHandlerResyncPeriod, Indexers{})
179}
180
181// NewSharedIndexInformer creates a new instance for the listwatcher.
182// The created informer will not do resyncs if the given
183// defaultEventHandlerResyncPeriod is zero.  Otherwise: for each
184// handler that with a non-zero requested resync period, whether added
185// before or after the informer starts, the nominal resync period is
186// the requested resync period rounded up to a multiple of the
187// informer's resync checking period.  Such an informer's resync
188// checking period is established when the informer starts running,
189// and is the maximum of (a) the minimum of the resync periods
190// requested before the informer starts and the
191// defaultEventHandlerResyncPeriod given here and (b) the constant
192// `minimumResyncPeriod` defined in this file.
193func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
194	realClock := &clock.RealClock{}
195	sharedIndexInformer := &sharedIndexInformer{
196		processor:                       &sharedProcessor{clock: realClock},
197		indexer:                         NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
198		listerWatcher:                   lw,
199		objectType:                      exampleObject,
200		resyncCheckPeriod:               defaultEventHandlerResyncPeriod,
201		defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
202		cacheMutationDetector:           NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)),
203		clock:                           realClock,
204	}
205	return sharedIndexInformer
206}
207
208// InformerSynced is a function that can be used to determine if an informer has synced.  This is useful for determining if caches have synced.
209type InformerSynced func() bool
210
211const (
212	// syncedPollPeriod controls how often you look at the status of your sync funcs
213	syncedPollPeriod = 100 * time.Millisecond
214
215	// initialBufferSize is the initial number of event notifications that can be buffered.
216	initialBufferSize = 1024
217)
218
219// WaitForNamedCacheSync is a wrapper around WaitForCacheSync that generates log messages
220// indicating that the caller identified by name is waiting for syncs, followed by
221// either a successful or failed sync.
222func WaitForNamedCacheSync(controllerName string, stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool {
223	klog.Infof("Waiting for caches to sync for %s", controllerName)
224
225	if !WaitForCacheSync(stopCh, cacheSyncs...) {
226		utilruntime.HandleError(fmt.Errorf("unable to sync caches for %s", controllerName))
227		return false
228	}
229
230	klog.Infof("Caches are synced for %s ", controllerName)
231	return true
232}
233
234// WaitForCacheSync waits for caches to populate.  It returns true if it was successful, false
235// if the controller should shutdown
236// callers should prefer WaitForNamedCacheSync()
237func WaitForCacheSync(stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool {
238	err := wait.PollImmediateUntil(syncedPollPeriod,
239		func() (bool, error) {
240			for _, syncFunc := range cacheSyncs {
241				if !syncFunc() {
242					return false, nil
243				}
244			}
245			return true, nil
246		},
247		stopCh)
248	if err != nil {
249		klog.V(2).Infof("stop requested")
250		return false
251	}
252
253	klog.V(4).Infof("caches populated")
254	return true
255}
256
257// `*sharedIndexInformer` implements SharedIndexInformer and has three
258// main components.  One is an indexed local cache, `indexer Indexer`.
259// The second main component is a Controller that pulls
260// objects/notifications using the ListerWatcher and pushes them into
261// a DeltaFIFO --- whose knownObjects is the informer's local cache
262// --- while concurrently Popping Deltas values from that fifo and
263// processing them with `sharedIndexInformer::HandleDeltas`.  Each
264// invocation of HandleDeltas, which is done with the fifo's lock
265// held, processes each Delta in turn.  For each Delta this both
266// updates the local cache and stuffs the relevant notification into
267// the sharedProcessor.  The third main component is that
268// sharedProcessor, which is responsible for relaying those
269// notifications to each of the informer's clients.
270type sharedIndexInformer struct {
271	indexer    Indexer
272	controller Controller
273
274	processor             *sharedProcessor
275	cacheMutationDetector MutationDetector
276
277	listerWatcher ListerWatcher
278
279	// objectType is an example object of the type this informer is
280	// expected to handle.  Only the type needs to be right, except
281	// that when that is `unstructured.Unstructured` the object's
282	// `"apiVersion"` and `"kind"` must also be right.
283	objectType runtime.Object
284
285	// resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call
286	// shouldResync to check if any of our listeners need a resync.
287	resyncCheckPeriod time.Duration
288	// defaultEventHandlerResyncPeriod is the default resync period for any handlers added via
289	// AddEventHandler (i.e. they don't specify one and just want to use the shared informer's default
290	// value).
291	defaultEventHandlerResyncPeriod time.Duration
292	// clock allows for testability
293	clock clock.Clock
294
295	started, stopped bool
296	startedLock      sync.Mutex
297
298	// blockDeltas gives a way to stop all event distribution so that a late event handler
299	// can safely join the shared informer.
300	blockDeltas sync.Mutex
301}
302
303// dummyController hides the fact that a SharedInformer is different from a dedicated one
304// where a caller can `Run`.  The run method is disconnected in this case, because higher
305// level logic will decide when to start the SharedInformer and related controller.
306// Because returning information back is always asynchronous, the legacy callers shouldn't
307// notice any change in behavior.
308type dummyController struct {
309	informer *sharedIndexInformer
310}
311
312func (v *dummyController) Run(stopCh <-chan struct{}) {
313}
314
315func (v *dummyController) HasSynced() bool {
316	return v.informer.HasSynced()
317}
318
319func (v *dummyController) LastSyncResourceVersion() string {
320	return ""
321}
322
323type updateNotification struct {
324	oldObj interface{}
325	newObj interface{}
326}
327
328type addNotification struct {
329	newObj interface{}
330}
331
332type deleteNotification struct {
333	oldObj interface{}
334}
335
336func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
337	defer utilruntime.HandleCrash()
338
339	fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
340		KnownObjects:          s.indexer,
341		EmitDeltaTypeReplaced: true,
342	})
343
344	cfg := &Config{
345		Queue:            fifo,
346		ListerWatcher:    s.listerWatcher,
347		ObjectType:       s.objectType,
348		FullResyncPeriod: s.resyncCheckPeriod,
349		RetryOnError:     false,
350		ShouldResync:     s.processor.shouldResync,
351
352		Process: s.HandleDeltas,
353	}
354
355	func() {
356		s.startedLock.Lock()
357		defer s.startedLock.Unlock()
358
359		s.controller = New(cfg)
360		s.controller.(*controller).clock = s.clock
361		s.started = true
362	}()
363
364	// Separate stop channel because Processor should be stopped strictly after controller
365	processorStopCh := make(chan struct{})
366	var wg wait.Group
367	defer wg.Wait()              // Wait for Processor to stop
368	defer close(processorStopCh) // Tell Processor to stop
369	wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
370	wg.StartWithChannel(processorStopCh, s.processor.run)
371
372	defer func() {
373		s.startedLock.Lock()
374		defer s.startedLock.Unlock()
375		s.stopped = true // Don't want any new listeners
376	}()
377	s.controller.Run(stopCh)
378}
379
380func (s *sharedIndexInformer) HasSynced() bool {
381	s.startedLock.Lock()
382	defer s.startedLock.Unlock()
383
384	if s.controller == nil {
385		return false
386	}
387	return s.controller.HasSynced()
388}
389
390func (s *sharedIndexInformer) LastSyncResourceVersion() string {
391	s.startedLock.Lock()
392	defer s.startedLock.Unlock()
393
394	if s.controller == nil {
395		return ""
396	}
397	return s.controller.LastSyncResourceVersion()
398}
399
400func (s *sharedIndexInformer) GetStore() Store {
401	return s.indexer
402}
403
404func (s *sharedIndexInformer) GetIndexer() Indexer {
405	return s.indexer
406}
407
408func (s *sharedIndexInformer) AddIndexers(indexers Indexers) error {
409	s.startedLock.Lock()
410	defer s.startedLock.Unlock()
411
412	if s.started {
413		return fmt.Errorf("informer has already started")
414	}
415
416	return s.indexer.AddIndexers(indexers)
417}
418
419func (s *sharedIndexInformer) GetController() Controller {
420	return &dummyController{informer: s}
421}
422
423func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) {
424	s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod)
425}
426
427func determineResyncPeriod(desired, check time.Duration) time.Duration {
428	if desired == 0 {
429		return desired
430	}
431	if check == 0 {
432		klog.Warningf("The specified resyncPeriod %v is invalid because this shared informer doesn't support resyncing", desired)
433		return 0
434	}
435	if desired < check {
436		klog.Warningf("The specified resyncPeriod %v is being increased to the minimum resyncCheckPeriod %v", desired, check)
437		return check
438	}
439	return desired
440}
441
442const minimumResyncPeriod = 1 * time.Second
443
444func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) {
445	s.startedLock.Lock()
446	defer s.startedLock.Unlock()
447
448	if s.stopped {
449		klog.V(2).Infof("Handler %v was not added to shared informer because it has stopped already", handler)
450		return
451	}
452
453	if resyncPeriod > 0 {
454		if resyncPeriod < minimumResyncPeriod {
455			klog.Warningf("resyncPeriod %d is too small. Changing it to the minimum allowed value of %d", resyncPeriod, minimumResyncPeriod)
456			resyncPeriod = minimumResyncPeriod
457		}
458
459		if resyncPeriod < s.resyncCheckPeriod {
460			if s.started {
461				klog.Warningf("resyncPeriod %d is smaller than resyncCheckPeriod %d and the informer has already started. Changing it to %d", resyncPeriod, s.resyncCheckPeriod, s.resyncCheckPeriod)
462				resyncPeriod = s.resyncCheckPeriod
463			} else {
464				// if the event handler's resyncPeriod is smaller than the current resyncCheckPeriod, update
465				// resyncCheckPeriod to match resyncPeriod and adjust the resync periods of all the listeners
466				// accordingly
467				s.resyncCheckPeriod = resyncPeriod
468				s.processor.resyncCheckPeriodChanged(resyncPeriod)
469			}
470		}
471	}
472
473	listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize)
474
475	if !s.started {
476		s.processor.addListener(listener)
477		return
478	}
479
480	// in order to safely join, we have to
481	// 1. stop sending add/update/delete notifications
482	// 2. do a list against the store
483	// 3. send synthetic "Add" events to the new handler
484	// 4. unblock
485	s.blockDeltas.Lock()
486	defer s.blockDeltas.Unlock()
487
488	s.processor.addListener(listener)
489	for _, item := range s.indexer.List() {
490		listener.add(addNotification{newObj: item})
491	}
492}
493
494func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
495	s.blockDeltas.Lock()
496	defer s.blockDeltas.Unlock()
497
498	// from oldest to newest
499	for _, d := range obj.(Deltas) {
500		switch d.Type {
501		case Sync, Replaced, Added, Updated:
502			s.cacheMutationDetector.AddObject(d.Object)
503			if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
504				if err := s.indexer.Update(d.Object); err != nil {
505					return err
506				}
507
508				isSync := false
509				switch {
510				case d.Type == Sync:
511					// Sync events are only propagated to listeners that requested resync
512					isSync = true
513				case d.Type == Replaced:
514					if accessor, err := meta.Accessor(d.Object); err == nil {
515						if oldAccessor, err := meta.Accessor(old); err == nil {
516							// Replaced events that didn't change resourceVersion are treated as resync events
517							// and only propagated to listeners that requested resync
518							isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
519						}
520					}
521				}
522				s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
523			} else {
524				if err := s.indexer.Add(d.Object); err != nil {
525					return err
526				}
527				s.processor.distribute(addNotification{newObj: d.Object}, false)
528			}
529		case Deleted:
530			if err := s.indexer.Delete(d.Object); err != nil {
531				return err
532			}
533			s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
534		}
535	}
536	return nil
537}
538
539// sharedProcessor has a collection of processorListener and can
540// distribute a notification object to its listeners.  There are two
541// kinds of distribute operations.  The sync distributions go to a
542// subset of the listeners that (a) is recomputed in the occasional
543// calls to shouldResync and (b) every listener is initially put in.
544// The non-sync distributions go to every listener.
545type sharedProcessor struct {
546	listenersStarted bool
547	listenersLock    sync.RWMutex
548	listeners        []*processorListener
549	syncingListeners []*processorListener
550	clock            clock.Clock
551	wg               wait.Group
552}
553
554func (p *sharedProcessor) addListener(listener *processorListener) {
555	p.listenersLock.Lock()
556	defer p.listenersLock.Unlock()
557
558	p.addListenerLocked(listener)
559	if p.listenersStarted {
560		p.wg.Start(listener.run)
561		p.wg.Start(listener.pop)
562	}
563}
564
565func (p *sharedProcessor) addListenerLocked(listener *processorListener) {
566	p.listeners = append(p.listeners, listener)
567	p.syncingListeners = append(p.syncingListeners, listener)
568}
569
570func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
571	p.listenersLock.RLock()
572	defer p.listenersLock.RUnlock()
573
574	if sync {
575		for _, listener := range p.syncingListeners {
576			listener.add(obj)
577		}
578	} else {
579		for _, listener := range p.listeners {
580			listener.add(obj)
581		}
582	}
583}
584
585func (p *sharedProcessor) run(stopCh <-chan struct{}) {
586	func() {
587		p.listenersLock.RLock()
588		defer p.listenersLock.RUnlock()
589		for _, listener := range p.listeners {
590			p.wg.Start(listener.run)
591			p.wg.Start(listener.pop)
592		}
593		p.listenersStarted = true
594	}()
595	<-stopCh
596	p.listenersLock.RLock()
597	defer p.listenersLock.RUnlock()
598	for _, listener := range p.listeners {
599		close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
600	}
601	p.wg.Wait() // Wait for all .pop() and .run() to stop
602}
603
604// shouldResync queries every listener to determine if any of them need a resync, based on each
605// listener's resyncPeriod.
606func (p *sharedProcessor) shouldResync() bool {
607	p.listenersLock.Lock()
608	defer p.listenersLock.Unlock()
609
610	p.syncingListeners = []*processorListener{}
611
612	resyncNeeded := false
613	now := p.clock.Now()
614	for _, listener := range p.listeners {
615		// need to loop through all the listeners to see if they need to resync so we can prepare any
616		// listeners that are going to be resyncing.
617		if listener.shouldResync(now) {
618			resyncNeeded = true
619			p.syncingListeners = append(p.syncingListeners, listener)
620			listener.determineNextResync(now)
621		}
622	}
623	return resyncNeeded
624}
625
626func (p *sharedProcessor) resyncCheckPeriodChanged(resyncCheckPeriod time.Duration) {
627	p.listenersLock.RLock()
628	defer p.listenersLock.RUnlock()
629
630	for _, listener := range p.listeners {
631		resyncPeriod := determineResyncPeriod(listener.requestedResyncPeriod, resyncCheckPeriod)
632		listener.setResyncPeriod(resyncPeriod)
633	}
634}
635
636// processorListener relays notifications from a sharedProcessor to
637// one ResourceEventHandler --- using two goroutines, two unbuffered
638// channels, and an unbounded ring buffer.  The `add(notification)`
639// function sends the given notification to `addCh`.  One goroutine
640// runs `pop()`, which pumps notifications from `addCh` to `nextCh`
641// using storage in the ring buffer while `nextCh` is not keeping up.
642// Another goroutine runs `run()`, which receives notifications from
643// `nextCh` and synchronously invokes the appropriate handler method.
644//
645// processorListener also keeps track of the adjusted requested resync
646// period of the listener.
647type processorListener struct {
648	nextCh chan interface{}
649	addCh  chan interface{}
650
651	handler ResourceEventHandler
652
653	// pendingNotifications is an unbounded ring buffer that holds all notifications not yet distributed.
654	// There is one per listener, but a failing/stalled listener will have infinite pendingNotifications
655	// added until we OOM.
656	// TODO: This is no worse than before, since reflectors were backed by unbounded DeltaFIFOs, but
657	// we should try to do something better.
658	pendingNotifications buffer.RingGrowing
659
660	// requestedResyncPeriod is how frequently the listener wants a
661	// full resync from the shared informer, but modified by two
662	// adjustments.  One is imposing a lower bound,
663	// `minimumResyncPeriod`.  The other is another lower bound, the
664	// sharedProcessor's `resyncCheckPeriod`, that is imposed (a) only
665	// in AddEventHandlerWithResyncPeriod invocations made after the
666	// sharedProcessor starts and (b) only if the informer does
667	// resyncs at all.
668	requestedResyncPeriod time.Duration
669	// resyncPeriod is the threshold that will be used in the logic
670	// for this listener.  This value differs from
671	// requestedResyncPeriod only when the sharedIndexInformer does
672	// not do resyncs, in which case the value here is zero.  The
673	// actual time between resyncs depends on when the
674	// sharedProcessor's `shouldResync` function is invoked and when
675	// the sharedIndexInformer processes `Sync` type Delta objects.
676	resyncPeriod time.Duration
677	// nextResync is the earliest time the listener should get a full resync
678	nextResync time.Time
679	// resyncLock guards access to resyncPeriod and nextResync
680	resyncLock sync.Mutex
681}
682
683func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int) *processorListener {
684	ret := &processorListener{
685		nextCh:                make(chan interface{}),
686		addCh:                 make(chan interface{}),
687		handler:               handler,
688		pendingNotifications:  *buffer.NewRingGrowing(bufferSize),
689		requestedResyncPeriod: requestedResyncPeriod,
690		resyncPeriod:          resyncPeriod,
691	}
692
693	ret.determineNextResync(now)
694
695	return ret
696}
697
698func (p *processorListener) add(notification interface{}) {
699	p.addCh <- notification
700}
701
702func (p *processorListener) pop() {
703	defer utilruntime.HandleCrash()
704	defer close(p.nextCh) // Tell .run() to stop
705
706	var nextCh chan<- interface{}
707	var notification interface{}
708	for {
709		select {
710		case nextCh <- notification:
711			// Notification dispatched
712			var ok bool
713			notification, ok = p.pendingNotifications.ReadOne()
714			if !ok { // Nothing to pop
715				nextCh = nil // Disable this select case
716			}
717		case notificationToAdd, ok := <-p.addCh:
718			if !ok {
719				return
720			}
721			if notification == nil { // No notification to pop (and pendingNotifications is empty)
722				// Optimize the case - skip adding to pendingNotifications
723				notification = notificationToAdd
724				nextCh = p.nextCh
725			} else { // There is already a notification waiting to be dispatched
726				p.pendingNotifications.WriteOne(notificationToAdd)
727			}
728		}
729	}
730}
731
732func (p *processorListener) run() {
733	// this call blocks until the channel is closed.  When a panic happens during the notification
734	// we will catch it, **the offending item will be skipped!**, and after a short delay (one second)
735	// the next notification will be attempted.  This is usually better than the alternative of never
736	// delivering again.
737	stopCh := make(chan struct{})
738	wait.Until(func() {
739		for next := range p.nextCh {
740			switch notification := next.(type) {
741			case updateNotification:
742				p.handler.OnUpdate(notification.oldObj, notification.newObj)
743			case addNotification:
744				p.handler.OnAdd(notification.newObj)
745			case deleteNotification:
746				p.handler.OnDelete(notification.oldObj)
747			default:
748				utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
749			}
750		}
751		// the only way to get here is if the p.nextCh is empty and closed
752		close(stopCh)
753	}, 1*time.Second, stopCh)
754}
755
756// shouldResync deterimines if the listener needs a resync. If the listener's resyncPeriod is 0,
757// this always returns false.
758func (p *processorListener) shouldResync(now time.Time) bool {
759	p.resyncLock.Lock()
760	defer p.resyncLock.Unlock()
761
762	if p.resyncPeriod == 0 {
763		return false
764	}
765
766	return now.After(p.nextResync) || now.Equal(p.nextResync)
767}
768
769func (p *processorListener) determineNextResync(now time.Time) {
770	p.resyncLock.Lock()
771	defer p.resyncLock.Unlock()
772
773	p.nextResync = now.Add(p.resyncPeriod)
774}
775
776func (p *processorListener) setResyncPeriod(resyncPeriod time.Duration) {
777	p.resyncLock.Lock()
778	defer p.resyncLock.Unlock()
779
780	p.resyncPeriod = resyncPeriod
781}
782