1/*
2Copyright 2015 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package cache
18
19import (
20	"fmt"
21	"sync"
22	"time"
23
24	"k8s.io/apimachinery/pkg/runtime"
25	"k8s.io/apimachinery/pkg/util/clock"
26	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
27	"k8s.io/apimachinery/pkg/util/wait"
28	"k8s.io/client-go/util/retry"
29	"k8s.io/utils/buffer"
30
31	"k8s.io/klog"
32)
33
34// SharedInformer provides eventually consistent linkage of its
35// clients to the authoritative state of a given collection of
36// objects.  An object is identified by its API group, kind/resource,
37// namespace, and name; the `ObjectMeta.UID` is not part of an
38// object's ID as far as this contract is concerned.  One
39// SharedInformer provides linkage to objects of a particular API
40// group and kind/resource.  The linked object collection of a
41// SharedInformer may be further restricted to one namespace and/or by
42// label selector and/or field selector.
43//
44// The authoritative state of an object is what apiservers provide
45// access to, and an object goes through a strict sequence of states.
46// An object state is either "absent" or present with a
47// ResourceVersion and other appropriate content.
48//
49// A SharedInformer gets object states from apiservers using a
50// sequence of LIST and WATCH operations.  Through this sequence the
51// apiservers provide a sequence of "collection states" to the
52// informer, where each collection state defines the state of every
53// object of the collection.  No promise --- beyond what is implied by
54// other remarks here --- is made about how one informer's sequence of
55// collection states relates to a different informer's sequence of
56// collection states.
57//
58// A SharedInformer maintains a local cache, exposed by GetStore() and
59// by GetIndexer() in the case of an indexed informer, of the state of
60// each relevant object.  This cache is eventually consistent with the
61// authoritative state.  This means that, unless prevented by
62// persistent communication problems, if ever a particular object ID X
63// is authoritatively associated with a state S then for every
64// SharedInformer I whose collection includes (X, S) eventually either
65// (1) I's cache associates X with S or a later state of X, (2) I is
66// stopped, or (3) the authoritative state service for X terminates.
67// To be formally complete, we say that the absent state meets any
68// restriction by label selector or field selector.
69//
70// The local cache starts out empty, and gets populated and updated
71// during `Run()`.
72//
73// As a simple example, if a collection of objects is henceforeth
74// unchanging, a SharedInformer is created that links to that
75// collection, and that SharedInformer is `Run()` then that
76// SharedInformer's cache eventually holds an exact copy of that
77// collection (unless it is stopped too soon, the authoritative state
78// service ends, or communication problems between the two
79// persistently thwart achievement).
80//
81// As another simple example, if the local cache ever holds a
82// non-absent state for some object ID and the object is eventually
83// removed from the authoritative state then eventually the object is
84// removed from the local cache (unless the SharedInformer is stopped
85// too soon, the authoritative state service ends, or communication
86// problems persistently thwart the desired result).
87//
88// The keys in the Store are of the form namespace/name for namespaced
89// objects, and are simply the name for non-namespaced objects.
90// Clients can use `MetaNamespaceKeyFunc(obj)` to extract the key for
91// a given object, and `SplitMetaNamespaceKey(key)` to split a key
92// into its constituent parts.
93//
94// A client is identified here by a ResourceEventHandler.  For every
95// update to the SharedInformer's local cache and for every client
96// added before `Run()`, eventually either the SharedInformer is
97// stopped or the client is notified of the update.  A client added
98// after `Run()` starts gets a startup batch of notifications of
99// additions of the object existing in the cache at the time that
100// client was added; also, for every update to the SharedInformer's
101// local cache after that client was added, eventually either the
102// SharedInformer is stopped or that client is notified of that
103// update.  Client notifications happen after the corresponding cache
104// update and, in the case of a SharedIndexInformer, after the
105// corresponding index updates.  It is possible that additional cache
106// and index updates happen before such a prescribed notification.
107// For a given SharedInformer and client, the notifications are
108// delivered sequentially.  For a given SharedInformer, client, and
109// object ID, the notifications are delivered in order.
110//
111// A client must process each notification promptly; a SharedInformer
112// is not engineered to deal well with a large backlog of
113// notifications to deliver.  Lengthy processing should be passed off
114// to something else, for example through a
115// `client-go/util/workqueue`.
116//
117// Each query to an informer's local cache --- whether a single-object
118// lookup, a list operation, or a use of one of its indices --- is
119// answered entirely from one of the collection states received by
120// that informer.
121//
122// A delete notification exposes the last locally known non-absent
123// state, except that its ResourceVersion is replaced with a
124// ResourceVersion in which the object is actually absent.
125type SharedInformer interface {
126	// AddEventHandler adds an event handler to the shared informer using the shared informer's resync
127	// period.  Events to a single handler are delivered sequentially, but there is no coordination
128	// between different handlers.
129	AddEventHandler(handler ResourceEventHandler)
130	// AddEventHandlerWithResyncPeriod adds an event handler to the
131	// shared informer using the specified resync period.  The resync
132	// operation consists of delivering to the handler a create
133	// notification for every object in the informer's local cache; it
134	// does not add any interactions with the authoritative storage.
135	AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration)
136	// GetStore returns the informer's local cache as a Store.
137	GetStore() Store
138	// GetController gives back a synthetic interface that "votes" to start the informer
139	GetController() Controller
140	// Run starts and runs the shared informer, returning after it stops.
141	// The informer will be stopped when stopCh is closed.
142	Run(stopCh <-chan struct{})
143	// HasSynced returns true if the shared informer's store has been
144	// informed by at least one full LIST of the authoritative state
145	// of the informer's object collection.  This is unrelated to "resync".
146	HasSynced() bool
147	// LastSyncResourceVersion is the resource version observed when last synced with the underlying
148	// store. The value returned is not synchronized with access to the underlying store and is not
149	// thread-safe.
150	LastSyncResourceVersion() string
151}
152
153// SharedIndexInformer provides add and get Indexers ability based on SharedInformer.
154type SharedIndexInformer interface {
155	SharedInformer
156	// AddIndexers add indexers to the informer before it starts.
157	AddIndexers(indexers Indexers) error
158	GetIndexer() Indexer
159}
160
161// NewSharedInformer creates a new instance for the listwatcher.
162func NewSharedInformer(lw ListerWatcher, objType runtime.Object, resyncPeriod time.Duration) SharedInformer {
163	return NewSharedIndexInformer(lw, objType, resyncPeriod, Indexers{})
164}
165
166// NewSharedIndexInformer creates a new instance for the listwatcher.
167func NewSharedIndexInformer(lw ListerWatcher, objType runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
168	realClock := &clock.RealClock{}
169	sharedIndexInformer := &sharedIndexInformer{
170		processor:                       &sharedProcessor{clock: realClock},
171		indexer:                         NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
172		listerWatcher:                   lw,
173		objectType:                      objType,
174		resyncCheckPeriod:               defaultEventHandlerResyncPeriod,
175		defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
176		cacheMutationDetector:           NewCacheMutationDetector(fmt.Sprintf("%T", objType)),
177		clock:                           realClock,
178	}
179	return sharedIndexInformer
180}
181
182// InformerSynced is a function that can be used to determine if an informer has synced.  This is useful for determining if caches have synced.
183type InformerSynced func() bool
184
185const (
186	// syncedPollPeriod controls how often you look at the status of your sync funcs
187	syncedPollPeriod = 100 * time.Millisecond
188
189	// initialBufferSize is the initial number of event notifications that can be buffered.
190	initialBufferSize = 1024
191)
192
193// WaitForNamedCacheSync is a wrapper around WaitForCacheSync that generates log messages
194// indicating that the caller identified by name is waiting for syncs, followed by
195// either a successful or failed sync.
196func WaitForNamedCacheSync(controllerName string, stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool {
197	klog.Infof("Waiting for caches to sync for %s", controllerName)
198
199	if !WaitForCacheSync(stopCh, cacheSyncs...) {
200		utilruntime.HandleError(fmt.Errorf("unable to sync caches for %s", controllerName))
201		return false
202	}
203
204	klog.Infof("Caches are synced for %s ", controllerName)
205	return true
206}
207
208// WaitForCacheSync waits for caches to populate.  It returns true if it was successful, false
209// if the controller should shutdown
210// callers should prefer WaitForNamedCacheSync()
211func WaitForCacheSync(stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool {
212	err := wait.PollImmediateUntil(syncedPollPeriod,
213		func() (bool, error) {
214			for _, syncFunc := range cacheSyncs {
215				if !syncFunc() {
216					return false, nil
217				}
218			}
219			return true, nil
220		},
221		stopCh)
222	if err != nil {
223		klog.V(2).Infof("stop requested")
224		return false
225	}
226
227	klog.V(4).Infof("caches populated")
228	return true
229}
230
231type sharedIndexInformer struct {
232	indexer    Indexer
233	controller Controller
234
235	processor             *sharedProcessor
236	cacheMutationDetector MutationDetector
237
238	// This block is tracked to handle late initialization of the controller
239	listerWatcher ListerWatcher
240	objectType    runtime.Object
241
242	// resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call
243	// shouldResync to check if any of our listeners need a resync.
244	resyncCheckPeriod time.Duration
245	// defaultEventHandlerResyncPeriod is the default resync period for any handlers added via
246	// AddEventHandler (i.e. they don't specify one and just want to use the shared informer's default
247	// value).
248	defaultEventHandlerResyncPeriod time.Duration
249	// clock allows for testability
250	clock clock.Clock
251
252	started, stopped bool
253	startedLock      sync.Mutex
254
255	// blockDeltas gives a way to stop all event distribution so that a late event handler
256	// can safely join the shared informer.
257	blockDeltas sync.Mutex
258}
259
260// dummyController hides the fact that a SharedInformer is different from a dedicated one
261// where a caller can `Run`.  The run method is disconnected in this case, because higher
262// level logic will decide when to start the SharedInformer and related controller.
263// Because returning information back is always asynchronous, the legacy callers shouldn't
264// notice any change in behavior.
265type dummyController struct {
266	informer *sharedIndexInformer
267}
268
269func (v *dummyController) Run(stopCh <-chan struct{}) {
270}
271
272func (v *dummyController) HasSynced() bool {
273	return v.informer.HasSynced()
274}
275
276func (v *dummyController) LastSyncResourceVersion() string {
277	return ""
278}
279
280type updateNotification struct {
281	oldObj interface{}
282	newObj interface{}
283}
284
285type addNotification struct {
286	newObj interface{}
287}
288
289type deleteNotification struct {
290	oldObj interface{}
291}
292
293func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
294	defer utilruntime.HandleCrash()
295
296	fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)
297
298	cfg := &Config{
299		Queue:            fifo,
300		ListerWatcher:    s.listerWatcher,
301		ObjectType:       s.objectType,
302		FullResyncPeriod: s.resyncCheckPeriod,
303		RetryOnError:     false,
304		ShouldResync:     s.processor.shouldResync,
305
306		Process: s.HandleDeltas,
307	}
308
309	func() {
310		s.startedLock.Lock()
311		defer s.startedLock.Unlock()
312
313		s.controller = New(cfg)
314		s.controller.(*controller).clock = s.clock
315		s.started = true
316	}()
317
318	// Separate stop channel because Processor should be stopped strictly after controller
319	processorStopCh := make(chan struct{})
320	var wg wait.Group
321	defer wg.Wait()              // Wait for Processor to stop
322	defer close(processorStopCh) // Tell Processor to stop
323	wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
324	wg.StartWithChannel(processorStopCh, s.processor.run)
325
326	defer func() {
327		s.startedLock.Lock()
328		defer s.startedLock.Unlock()
329		s.stopped = true // Don't want any new listeners
330	}()
331	s.controller.Run(stopCh)
332}
333
334func (s *sharedIndexInformer) HasSynced() bool {
335	s.startedLock.Lock()
336	defer s.startedLock.Unlock()
337
338	if s.controller == nil {
339		return false
340	}
341	return s.controller.HasSynced()
342}
343
344func (s *sharedIndexInformer) LastSyncResourceVersion() string {
345	s.startedLock.Lock()
346	defer s.startedLock.Unlock()
347
348	if s.controller == nil {
349		return ""
350	}
351	return s.controller.LastSyncResourceVersion()
352}
353
354func (s *sharedIndexInformer) GetStore() Store {
355	return s.indexer
356}
357
358func (s *sharedIndexInformer) GetIndexer() Indexer {
359	return s.indexer
360}
361
362func (s *sharedIndexInformer) AddIndexers(indexers Indexers) error {
363	s.startedLock.Lock()
364	defer s.startedLock.Unlock()
365
366	if s.started {
367		return fmt.Errorf("informer has already started")
368	}
369
370	return s.indexer.AddIndexers(indexers)
371}
372
373func (s *sharedIndexInformer) GetController() Controller {
374	return &dummyController{informer: s}
375}
376
377func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) {
378	s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod)
379}
380
381func determineResyncPeriod(desired, check time.Duration) time.Duration {
382	if desired == 0 {
383		return desired
384	}
385	if check == 0 {
386		klog.Warningf("The specified resyncPeriod %v is invalid because this shared informer doesn't support resyncing", desired)
387		return 0
388	}
389	if desired < check {
390		klog.Warningf("The specified resyncPeriod %v is being increased to the minimum resyncCheckPeriod %v", desired, check)
391		return check
392	}
393	return desired
394}
395
396const minimumResyncPeriod = 1 * time.Second
397
398func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) {
399	s.startedLock.Lock()
400	defer s.startedLock.Unlock()
401
402	if s.stopped {
403		klog.V(2).Infof("Handler %v was not added to shared informer because it has stopped already", handler)
404		return
405	}
406
407	if resyncPeriod > 0 {
408		if resyncPeriod < minimumResyncPeriod {
409			klog.Warningf("resyncPeriod %d is too small. Changing it to the minimum allowed value of %d", resyncPeriod, minimumResyncPeriod)
410			resyncPeriod = minimumResyncPeriod
411		}
412
413		if resyncPeriod < s.resyncCheckPeriod {
414			if s.started {
415				klog.Warningf("resyncPeriod %d is smaller than resyncCheckPeriod %d and the informer has already started. Changing it to %d", resyncPeriod, s.resyncCheckPeriod, s.resyncCheckPeriod)
416				resyncPeriod = s.resyncCheckPeriod
417			} else {
418				// if the event handler's resyncPeriod is smaller than the current resyncCheckPeriod, update
419				// resyncCheckPeriod to match resyncPeriod and adjust the resync periods of all the listeners
420				// accordingly
421				s.resyncCheckPeriod = resyncPeriod
422				s.processor.resyncCheckPeriodChanged(resyncPeriod)
423			}
424		}
425	}
426
427	listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize)
428
429	if !s.started {
430		s.processor.addListener(listener)
431		return
432	}
433
434	// in order to safely join, we have to
435	// 1. stop sending add/update/delete notifications
436	// 2. do a list against the store
437	// 3. send synthetic "Add" events to the new handler
438	// 4. unblock
439	s.blockDeltas.Lock()
440	defer s.blockDeltas.Unlock()
441
442	s.processor.addListener(listener)
443	for _, item := range s.indexer.List() {
444		listener.add(addNotification{newObj: item})
445	}
446}
447
448func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
449	s.blockDeltas.Lock()
450	defer s.blockDeltas.Unlock()
451
452	// from oldest to newest
453	for _, d := range obj.(Deltas) {
454		switch d.Type {
455		case Sync, Added, Updated:
456			isSync := d.Type == Sync
457			s.cacheMutationDetector.AddObject(d.Object)
458			if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
459				if err := s.indexer.Update(d.Object); err != nil {
460					return err
461				}
462				s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
463			} else {
464				if err := s.indexer.Add(d.Object); err != nil {
465					return err
466				}
467				s.processor.distribute(addNotification{newObj: d.Object}, isSync)
468			}
469		case Deleted:
470			if err := s.indexer.Delete(d.Object); err != nil {
471				return err
472			}
473			s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
474		}
475	}
476	return nil
477}
478
479type sharedProcessor struct {
480	listenersStarted bool
481	listenersLock    sync.RWMutex
482	listeners        []*processorListener
483	syncingListeners []*processorListener
484	clock            clock.Clock
485	wg               wait.Group
486}
487
488func (p *sharedProcessor) addListener(listener *processorListener) {
489	p.listenersLock.Lock()
490	defer p.listenersLock.Unlock()
491
492	p.addListenerLocked(listener)
493	if p.listenersStarted {
494		p.wg.Start(listener.run)
495		p.wg.Start(listener.pop)
496	}
497}
498
499func (p *sharedProcessor) addListenerLocked(listener *processorListener) {
500	p.listeners = append(p.listeners, listener)
501	p.syncingListeners = append(p.syncingListeners, listener)
502}
503
504func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
505	p.listenersLock.RLock()
506	defer p.listenersLock.RUnlock()
507
508	if sync {
509		for _, listener := range p.syncingListeners {
510			listener.add(obj)
511		}
512	} else {
513		for _, listener := range p.listeners {
514			listener.add(obj)
515		}
516	}
517}
518
519func (p *sharedProcessor) run(stopCh <-chan struct{}) {
520	func() {
521		p.listenersLock.RLock()
522		defer p.listenersLock.RUnlock()
523		for _, listener := range p.listeners {
524			p.wg.Start(listener.run)
525			p.wg.Start(listener.pop)
526		}
527		p.listenersStarted = true
528	}()
529	<-stopCh
530	p.listenersLock.RLock()
531	defer p.listenersLock.RUnlock()
532	for _, listener := range p.listeners {
533		close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
534	}
535	p.wg.Wait() // Wait for all .pop() and .run() to stop
536}
537
538// shouldResync queries every listener to determine if any of them need a resync, based on each
539// listener's resyncPeriod.
540func (p *sharedProcessor) shouldResync() bool {
541	p.listenersLock.Lock()
542	defer p.listenersLock.Unlock()
543
544	p.syncingListeners = []*processorListener{}
545
546	resyncNeeded := false
547	now := p.clock.Now()
548	for _, listener := range p.listeners {
549		// need to loop through all the listeners to see if they need to resync so we can prepare any
550		// listeners that are going to be resyncing.
551		if listener.shouldResync(now) {
552			resyncNeeded = true
553			p.syncingListeners = append(p.syncingListeners, listener)
554			listener.determineNextResync(now)
555		}
556	}
557	return resyncNeeded
558}
559
560func (p *sharedProcessor) resyncCheckPeriodChanged(resyncCheckPeriod time.Duration) {
561	p.listenersLock.RLock()
562	defer p.listenersLock.RUnlock()
563
564	for _, listener := range p.listeners {
565		resyncPeriod := determineResyncPeriod(listener.requestedResyncPeriod, resyncCheckPeriod)
566		listener.setResyncPeriod(resyncPeriod)
567	}
568}
569
570type processorListener struct {
571	nextCh chan interface{}
572	addCh  chan interface{}
573
574	handler ResourceEventHandler
575
576	// pendingNotifications is an unbounded ring buffer that holds all notifications not yet distributed.
577	// There is one per listener, but a failing/stalled listener will have infinite pendingNotifications
578	// added until we OOM.
579	// TODO: This is no worse than before, since reflectors were backed by unbounded DeltaFIFOs, but
580	// we should try to do something better.
581	pendingNotifications buffer.RingGrowing
582
583	// requestedResyncPeriod is how frequently the listener wants a full resync from the shared informer
584	requestedResyncPeriod time.Duration
585	// resyncPeriod is how frequently the listener wants a full resync from the shared informer. This
586	// value may differ from requestedResyncPeriod if the shared informer adjusts it to align with the
587	// informer's overall resync check period.
588	resyncPeriod time.Duration
589	// nextResync is the earliest time the listener should get a full resync
590	nextResync time.Time
591	// resyncLock guards access to resyncPeriod and nextResync
592	resyncLock sync.Mutex
593}
594
595func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int) *processorListener {
596	ret := &processorListener{
597		nextCh:                make(chan interface{}),
598		addCh:                 make(chan interface{}),
599		handler:               handler,
600		pendingNotifications:  *buffer.NewRingGrowing(bufferSize),
601		requestedResyncPeriod: requestedResyncPeriod,
602		resyncPeriod:          resyncPeriod,
603	}
604
605	ret.determineNextResync(now)
606
607	return ret
608}
609
610func (p *processorListener) add(notification interface{}) {
611	p.addCh <- notification
612}
613
614func (p *processorListener) pop() {
615	defer utilruntime.HandleCrash()
616	defer close(p.nextCh) // Tell .run() to stop
617
618	var nextCh chan<- interface{}
619	var notification interface{}
620	for {
621		select {
622		case nextCh <- notification:
623			// Notification dispatched
624			var ok bool
625			notification, ok = p.pendingNotifications.ReadOne()
626			if !ok { // Nothing to pop
627				nextCh = nil // Disable this select case
628			}
629		case notificationToAdd, ok := <-p.addCh:
630			if !ok {
631				return
632			}
633			if notification == nil { // No notification to pop (and pendingNotifications is empty)
634				// Optimize the case - skip adding to pendingNotifications
635				notification = notificationToAdd
636				nextCh = p.nextCh
637			} else { // There is already a notification waiting to be dispatched
638				p.pendingNotifications.WriteOne(notificationToAdd)
639			}
640		}
641	}
642}
643
644func (p *processorListener) run() {
645	// this call blocks until the channel is closed.  When a panic happens during the notification
646	// we will catch it, **the offending item will be skipped!**, and after a short delay (one second)
647	// the next notification will be attempted.  This is usually better than the alternative of never
648	// delivering again.
649	stopCh := make(chan struct{})
650	wait.Until(func() {
651		// this gives us a few quick retries before a long pause and then a few more quick retries
652		err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
653			for next := range p.nextCh {
654				switch notification := next.(type) {
655				case updateNotification:
656					p.handler.OnUpdate(notification.oldObj, notification.newObj)
657				case addNotification:
658					p.handler.OnAdd(notification.newObj)
659				case deleteNotification:
660					p.handler.OnDelete(notification.oldObj)
661				default:
662					utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
663				}
664			}
665			// the only way to get here is if the p.nextCh is empty and closed
666			return true, nil
667		})
668
669		// the only way to get here is if the p.nextCh is empty and closed
670		if err == nil {
671			close(stopCh)
672		}
673	}, 1*time.Minute, stopCh)
674}
675
676// shouldResync deterimines if the listener needs a resync. If the listener's resyncPeriod is 0,
677// this always returns false.
678func (p *processorListener) shouldResync(now time.Time) bool {
679	p.resyncLock.Lock()
680	defer p.resyncLock.Unlock()
681
682	if p.resyncPeriod == 0 {
683		return false
684	}
685
686	return now.After(p.nextResync) || now.Equal(p.nextResync)
687}
688
689func (p *processorListener) determineNextResync(now time.Time) {
690	p.resyncLock.Lock()
691	defer p.resyncLock.Unlock()
692
693	p.nextResync = now.Add(p.resyncPeriod)
694}
695
696func (p *processorListener) setResyncPeriod(resyncPeriod time.Duration) {
697	p.resyncLock.Lock()
698	defer p.resyncLock.Unlock()
699
700	p.resyncPeriod = resyncPeriod
701}
702