1/*
2Copyright 2015 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package cache
18
19import (
20	"fmt"
21	"sync"
22	"time"
23
24	"k8s.io/apimachinery/pkg/runtime"
25	"k8s.io/apimachinery/pkg/util/clock"
26	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
27	"k8s.io/apimachinery/pkg/util/wait"
28	"k8s.io/client-go/util/retry"
29	"k8s.io/utils/buffer"
30
31	"k8s.io/klog"
32)
33
34// SharedInformer provides eventually consistent linkage of its
35// clients to the authoritative state of a given collection of
36// objects.  An object is identified by its API group, kind/resource,
37// namespace, and name.  One SharedInfomer provides linkage to objects
38// of a particular API group and kind/resource.  The linked object
39// collection of a SharedInformer may be further restricted to one
40// namespace and/or by label selector and/or field selector.
41//
42// The authoritative state of an object is what apiservers provide
43// access to, and an object goes through a strict sequence of states.
44// A state is either "absent" or present with a ResourceVersion and
45// other appropriate content.
46//
47// A SharedInformer maintains a local cache, exposed by Store(), of
48// the state of each relevant object.  This cache is eventually
49// consistent with the authoritative state.  This means that, unless
50// prevented by persistent communication problems, if ever a
51// particular object ID X is authoritatively associated with a state S
52// then for every SharedInformer I whose collection includes (X, S)
53// eventually either (1) I's cache associates X with S or a later
54// state of X, (2) I is stopped, or (3) the authoritative state
55// service for X terminates.  To be formally complete, we say that the
56// absent state meets any restriction by label selector or field
57// selector.
58//
59// As a simple example, if a collection of objects is henceforeth
60// unchanging and a SharedInformer is created that links to that
61// collection then that SharedInformer's cache eventually holds an
62// exact copy of that collection (unless it is stopped too soon, the
63// authoritative state service ends, or communication problems between
64// the two persistently thwart achievement).
65//
66// As another simple example, if the local cache ever holds a
67// non-absent state for some object ID and the object is eventually
68// removed from the authoritative state then eventually the object is
69// removed from the local cache (unless the SharedInformer is stopped
70// too soon, the authoritative state service emnds, or communication
71// problems persistently thwart the desired result).
72//
73// The keys in Store() are of the form namespace/name for namespaced
74// objects, and are simply the name for non-namespaced objects.
75//
76// A client is identified here by a ResourceEventHandler.  For every
77// update to the SharedInformer's local cache and for every client,
78// eventually either the SharedInformer is stopped or the client is
79// notified of the update.  These notifications happen after the
80// corresponding cache update and, in the case of a
81// SharedIndexInformer, after the corresponding index updates.  It is
82// possible that additional cache and index updates happen before such
83// a prescribed notification.  For a given SharedInformer and client,
84// all notifications are delivered sequentially.  For a given
85// SharedInformer, client, and object ID, the notifications are
86// delivered in order.
87//
88// A delete notification exposes the last locally known non-absent
89// state, except that its ResourceVersion is replaced with a
90// ResourceVersion in which the object is actually absent.
91type SharedInformer interface {
92	// AddEventHandler adds an event handler to the shared informer using the shared informer's resync
93	// period.  Events to a single handler are delivered sequentially, but there is no coordination
94	// between different handlers.
95	AddEventHandler(handler ResourceEventHandler)
96	// AddEventHandlerWithResyncPeriod adds an event handler to the
97	// shared informer using the specified resync period.  The resync
98	// operation consists of delivering to the handler a create
99	// notification for every object in the informer's local cache; it
100	// does not add any interactions with the authoritative storage.
101	AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration)
102	// GetStore returns the informer's local cache as a Store.
103	GetStore() Store
104	// GetController gives back a synthetic interface that "votes" to start the informer
105	GetController() Controller
106	// Run starts and runs the shared informer, returning after it stops.
107	// The informer will be stopped when stopCh is closed.
108	Run(stopCh <-chan struct{})
109	// HasSynced returns true if the shared informer's store has been
110	// informed by at least one full LIST of the authoritative state
111	// of the informer's object collection.  This is unrelated to "resync".
112	HasSynced() bool
113	// LastSyncResourceVersion is the resource version observed when last synced with the underlying
114	// store. The value returned is not synchronized with access to the underlying store and is not
115	// thread-safe.
116	LastSyncResourceVersion() string
117}
118
119type SharedIndexInformer interface {
120	SharedInformer
121	// AddIndexers add indexers to the informer before it starts.
122	AddIndexers(indexers Indexers) error
123	GetIndexer() Indexer
124}
125
126// NewSharedInformer creates a new instance for the listwatcher.
127func NewSharedInformer(lw ListerWatcher, objType runtime.Object, resyncPeriod time.Duration) SharedInformer {
128	return NewSharedIndexInformer(lw, objType, resyncPeriod, Indexers{})
129}
130
131// NewSharedIndexInformer creates a new instance for the listwatcher.
132func NewSharedIndexInformer(lw ListerWatcher, objType runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
133	realClock := &clock.RealClock{}
134	sharedIndexInformer := &sharedIndexInformer{
135		processor:                       &sharedProcessor{clock: realClock},
136		indexer:                         NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
137		listerWatcher:                   lw,
138		objectType:                      objType,
139		resyncCheckPeriod:               defaultEventHandlerResyncPeriod,
140		defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
141		cacheMutationDetector:           NewCacheMutationDetector(fmt.Sprintf("%T", objType)),
142		clock:                           realClock,
143	}
144	return sharedIndexInformer
145}
146
147// InformerSynced is a function that can be used to determine if an informer has synced.  This is useful for determining if caches have synced.
148type InformerSynced func() bool
149
150const (
151	// syncedPollPeriod controls how often you look at the status of your sync funcs
152	syncedPollPeriod = 100 * time.Millisecond
153
154	// initialBufferSize is the initial number of event notifications that can be buffered.
155	initialBufferSize = 1024
156)
157
158// WaitForCacheSync waits for caches to populate.  It returns true if it was successful, false
159// if the controller should shutdown
160func WaitForCacheSync(stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool {
161	err := wait.PollUntil(syncedPollPeriod,
162		func() (bool, error) {
163			for _, syncFunc := range cacheSyncs {
164				if !syncFunc() {
165					return false, nil
166				}
167			}
168			return true, nil
169		},
170		stopCh)
171	if err != nil {
172		klog.V(2).Infof("stop requested")
173		return false
174	}
175
176	klog.V(4).Infof("caches populated")
177	return true
178}
179
180type sharedIndexInformer struct {
181	indexer    Indexer
182	controller Controller
183
184	processor             *sharedProcessor
185	cacheMutationDetector CacheMutationDetector
186
187	// This block is tracked to handle late initialization of the controller
188	listerWatcher ListerWatcher
189	objectType    runtime.Object
190
191	// resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call
192	// shouldResync to check if any of our listeners need a resync.
193	resyncCheckPeriod time.Duration
194	// defaultEventHandlerResyncPeriod is the default resync period for any handlers added via
195	// AddEventHandler (i.e. they don't specify one and just want to use the shared informer's default
196	// value).
197	defaultEventHandlerResyncPeriod time.Duration
198	// clock allows for testability
199	clock clock.Clock
200
201	started, stopped bool
202	startedLock      sync.Mutex
203
204	// blockDeltas gives a way to stop all event distribution so that a late event handler
205	// can safely join the shared informer.
206	blockDeltas sync.Mutex
207}
208
209// dummyController hides the fact that a SharedInformer is different from a dedicated one
210// where a caller can `Run`.  The run method is disconnected in this case, because higher
211// level logic will decide when to start the SharedInformer and related controller.
212// Because returning information back is always asynchronous, the legacy callers shouldn't
213// notice any change in behavior.
214type dummyController struct {
215	informer *sharedIndexInformer
216}
217
218func (v *dummyController) Run(stopCh <-chan struct{}) {
219}
220
221func (v *dummyController) HasSynced() bool {
222	return v.informer.HasSynced()
223}
224
225func (c *dummyController) LastSyncResourceVersion() string {
226	return ""
227}
228
229type updateNotification struct {
230	oldObj interface{}
231	newObj interface{}
232}
233
234type addNotification struct {
235	newObj interface{}
236}
237
238type deleteNotification struct {
239	oldObj interface{}
240}
241
242func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
243	defer utilruntime.HandleCrash()
244
245	fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)
246
247	cfg := &Config{
248		Queue:            fifo,
249		ListerWatcher:    s.listerWatcher,
250		ObjectType:       s.objectType,
251		FullResyncPeriod: s.resyncCheckPeriod,
252		RetryOnError:     false,
253		ShouldResync:     s.processor.shouldResync,
254
255		Process: s.HandleDeltas,
256	}
257
258	func() {
259		s.startedLock.Lock()
260		defer s.startedLock.Unlock()
261
262		s.controller = New(cfg)
263		s.controller.(*controller).clock = s.clock
264		s.started = true
265	}()
266
267	// Separate stop channel because Processor should be stopped strictly after controller
268	processorStopCh := make(chan struct{})
269	var wg wait.Group
270	defer wg.Wait()              // Wait for Processor to stop
271	defer close(processorStopCh) // Tell Processor to stop
272	wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
273	wg.StartWithChannel(processorStopCh, s.processor.run)
274
275	defer func() {
276		s.startedLock.Lock()
277		defer s.startedLock.Unlock()
278		s.stopped = true // Don't want any new listeners
279	}()
280	s.controller.Run(stopCh)
281}
282
283func (s *sharedIndexInformer) HasSynced() bool {
284	s.startedLock.Lock()
285	defer s.startedLock.Unlock()
286
287	if s.controller == nil {
288		return false
289	}
290	return s.controller.HasSynced()
291}
292
293func (s *sharedIndexInformer) LastSyncResourceVersion() string {
294	s.startedLock.Lock()
295	defer s.startedLock.Unlock()
296
297	if s.controller == nil {
298		return ""
299	}
300	return s.controller.LastSyncResourceVersion()
301}
302
303func (s *sharedIndexInformer) GetStore() Store {
304	return s.indexer
305}
306
307func (s *sharedIndexInformer) GetIndexer() Indexer {
308	return s.indexer
309}
310
311func (s *sharedIndexInformer) AddIndexers(indexers Indexers) error {
312	s.startedLock.Lock()
313	defer s.startedLock.Unlock()
314
315	if s.started {
316		return fmt.Errorf("informer has already started")
317	}
318
319	return s.indexer.AddIndexers(indexers)
320}
321
322func (s *sharedIndexInformer) GetController() Controller {
323	return &dummyController{informer: s}
324}
325
326func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) {
327	s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod)
328}
329
330func determineResyncPeriod(desired, check time.Duration) time.Duration {
331	if desired == 0 {
332		return desired
333	}
334	if check == 0 {
335		klog.Warningf("The specified resyncPeriod %v is invalid because this shared informer doesn't support resyncing", desired)
336		return 0
337	}
338	if desired < check {
339		klog.Warningf("The specified resyncPeriod %v is being increased to the minimum resyncCheckPeriod %v", desired, check)
340		return check
341	}
342	return desired
343}
344
345const minimumResyncPeriod = 1 * time.Second
346
347func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) {
348	s.startedLock.Lock()
349	defer s.startedLock.Unlock()
350
351	if s.stopped {
352		klog.V(2).Infof("Handler %v was not added to shared informer because it has stopped already", handler)
353		return
354	}
355
356	if resyncPeriod > 0 {
357		if resyncPeriod < minimumResyncPeriod {
358			klog.Warningf("resyncPeriod %d is too small. Changing it to the minimum allowed value of %d", resyncPeriod, minimumResyncPeriod)
359			resyncPeriod = minimumResyncPeriod
360		}
361
362		if resyncPeriod < s.resyncCheckPeriod {
363			if s.started {
364				klog.Warningf("resyncPeriod %d is smaller than resyncCheckPeriod %d and the informer has already started. Changing it to %d", resyncPeriod, s.resyncCheckPeriod, s.resyncCheckPeriod)
365				resyncPeriod = s.resyncCheckPeriod
366			} else {
367				// if the event handler's resyncPeriod is smaller than the current resyncCheckPeriod, update
368				// resyncCheckPeriod to match resyncPeriod and adjust the resync periods of all the listeners
369				// accordingly
370				s.resyncCheckPeriod = resyncPeriod
371				s.processor.resyncCheckPeriodChanged(resyncPeriod)
372			}
373		}
374	}
375
376	listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize)
377
378	if !s.started {
379		s.processor.addListener(listener)
380		return
381	}
382
383	// in order to safely join, we have to
384	// 1. stop sending add/update/delete notifications
385	// 2. do a list against the store
386	// 3. send synthetic "Add" events to the new handler
387	// 4. unblock
388	s.blockDeltas.Lock()
389	defer s.blockDeltas.Unlock()
390
391	s.processor.addListener(listener)
392	for _, item := range s.indexer.List() {
393		listener.add(addNotification{newObj: item})
394	}
395}
396
397func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
398	s.blockDeltas.Lock()
399	defer s.blockDeltas.Unlock()
400
401	// from oldest to newest
402	for _, d := range obj.(Deltas) {
403		switch d.Type {
404		case Sync, Added, Updated:
405			isSync := d.Type == Sync
406			s.cacheMutationDetector.AddObject(d.Object)
407			if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
408				if err := s.indexer.Update(d.Object); err != nil {
409					return err
410				}
411				s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
412			} else {
413				if err := s.indexer.Add(d.Object); err != nil {
414					return err
415				}
416				s.processor.distribute(addNotification{newObj: d.Object}, isSync)
417			}
418		case Deleted:
419			if err := s.indexer.Delete(d.Object); err != nil {
420				return err
421			}
422			s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
423		}
424	}
425	return nil
426}
427
428type sharedProcessor struct {
429	listenersStarted bool
430	listenersLock    sync.RWMutex
431	listeners        []*processorListener
432	syncingListeners []*processorListener
433	clock            clock.Clock
434	wg               wait.Group
435}
436
437func (p *sharedProcessor) addListener(listener *processorListener) {
438	p.listenersLock.Lock()
439	defer p.listenersLock.Unlock()
440
441	p.addListenerLocked(listener)
442	if p.listenersStarted {
443		p.wg.Start(listener.run)
444		p.wg.Start(listener.pop)
445	}
446}
447
448func (p *sharedProcessor) addListenerLocked(listener *processorListener) {
449	p.listeners = append(p.listeners, listener)
450	p.syncingListeners = append(p.syncingListeners, listener)
451}
452
453func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
454	p.listenersLock.RLock()
455	defer p.listenersLock.RUnlock()
456
457	if sync {
458		for _, listener := range p.syncingListeners {
459			listener.add(obj)
460		}
461	} else {
462		for _, listener := range p.listeners {
463			listener.add(obj)
464		}
465	}
466}
467
468func (p *sharedProcessor) run(stopCh <-chan struct{}) {
469	func() {
470		p.listenersLock.RLock()
471		defer p.listenersLock.RUnlock()
472		for _, listener := range p.listeners {
473			p.wg.Start(listener.run)
474			p.wg.Start(listener.pop)
475		}
476		p.listenersStarted = true
477	}()
478	<-stopCh
479	p.listenersLock.RLock()
480	defer p.listenersLock.RUnlock()
481	for _, listener := range p.listeners {
482		close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
483	}
484	p.wg.Wait() // Wait for all .pop() and .run() to stop
485}
486
487// shouldResync queries every listener to determine if any of them need a resync, based on each
488// listener's resyncPeriod.
489func (p *sharedProcessor) shouldResync() bool {
490	p.listenersLock.Lock()
491	defer p.listenersLock.Unlock()
492
493	p.syncingListeners = []*processorListener{}
494
495	resyncNeeded := false
496	now := p.clock.Now()
497	for _, listener := range p.listeners {
498		// need to loop through all the listeners to see if they need to resync so we can prepare any
499		// listeners that are going to be resyncing.
500		if listener.shouldResync(now) {
501			resyncNeeded = true
502			p.syncingListeners = append(p.syncingListeners, listener)
503			listener.determineNextResync(now)
504		}
505	}
506	return resyncNeeded
507}
508
509func (p *sharedProcessor) resyncCheckPeriodChanged(resyncCheckPeriod time.Duration) {
510	p.listenersLock.RLock()
511	defer p.listenersLock.RUnlock()
512
513	for _, listener := range p.listeners {
514		resyncPeriod := determineResyncPeriod(listener.requestedResyncPeriod, resyncCheckPeriod)
515		listener.setResyncPeriod(resyncPeriod)
516	}
517}
518
519type processorListener struct {
520	nextCh chan interface{}
521	addCh  chan interface{}
522
523	handler ResourceEventHandler
524
525	// pendingNotifications is an unbounded ring buffer that holds all notifications not yet distributed.
526	// There is one per listener, but a failing/stalled listener will have infinite pendingNotifications
527	// added until we OOM.
528	// TODO: This is no worse than before, since reflectors were backed by unbounded DeltaFIFOs, but
529	// we should try to do something better.
530	pendingNotifications buffer.RingGrowing
531
532	// requestedResyncPeriod is how frequently the listener wants a full resync from the shared informer
533	requestedResyncPeriod time.Duration
534	// resyncPeriod is how frequently the listener wants a full resync from the shared informer. This
535	// value may differ from requestedResyncPeriod if the shared informer adjusts it to align with the
536	// informer's overall resync check period.
537	resyncPeriod time.Duration
538	// nextResync is the earliest time the listener should get a full resync
539	nextResync time.Time
540	// resyncLock guards access to resyncPeriod and nextResync
541	resyncLock sync.Mutex
542}
543
544func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int) *processorListener {
545	ret := &processorListener{
546		nextCh:                make(chan interface{}),
547		addCh:                 make(chan interface{}),
548		handler:               handler,
549		pendingNotifications:  *buffer.NewRingGrowing(bufferSize),
550		requestedResyncPeriod: requestedResyncPeriod,
551		resyncPeriod:          resyncPeriod,
552	}
553
554	ret.determineNextResync(now)
555
556	return ret
557}
558
559func (p *processorListener) add(notification interface{}) {
560	p.addCh <- notification
561}
562
563func (p *processorListener) pop() {
564	defer utilruntime.HandleCrash()
565	defer close(p.nextCh) // Tell .run() to stop
566
567	var nextCh chan<- interface{}
568	var notification interface{}
569	for {
570		select {
571		case nextCh <- notification:
572			// Notification dispatched
573			var ok bool
574			notification, ok = p.pendingNotifications.ReadOne()
575			if !ok { // Nothing to pop
576				nextCh = nil // Disable this select case
577			}
578		case notificationToAdd, ok := <-p.addCh:
579			if !ok {
580				return
581			}
582			if notification == nil { // No notification to pop (and pendingNotifications is empty)
583				// Optimize the case - skip adding to pendingNotifications
584				notification = notificationToAdd
585				nextCh = p.nextCh
586			} else { // There is already a notification waiting to be dispatched
587				p.pendingNotifications.WriteOne(notificationToAdd)
588			}
589		}
590	}
591}
592
593func (p *processorListener) run() {
594	// this call blocks until the channel is closed.  When a panic happens during the notification
595	// we will catch it, **the offending item will be skipped!**, and after a short delay (one second)
596	// the next notification will be attempted.  This is usually better than the alternative of never
597	// delivering again.
598	stopCh := make(chan struct{})
599	wait.Until(func() {
600		// this gives us a few quick retries before a long pause and then a few more quick retries
601		err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
602			for next := range p.nextCh {
603				switch notification := next.(type) {
604				case updateNotification:
605					p.handler.OnUpdate(notification.oldObj, notification.newObj)
606				case addNotification:
607					p.handler.OnAdd(notification.newObj)
608				case deleteNotification:
609					p.handler.OnDelete(notification.oldObj)
610				default:
611					utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
612				}
613			}
614			// the only way to get here is if the p.nextCh is empty and closed
615			return true, nil
616		})
617
618		// the only way to get here is if the p.nextCh is empty and closed
619		if err == nil {
620			close(stopCh)
621		}
622	}, 1*time.Minute, stopCh)
623}
624
625// shouldResync deterimines if the listener needs a resync. If the listener's resyncPeriod is 0,
626// this always returns false.
627func (p *processorListener) shouldResync(now time.Time) bool {
628	p.resyncLock.Lock()
629	defer p.resyncLock.Unlock()
630
631	if p.resyncPeriod == 0 {
632		return false
633	}
634
635	return now.After(p.nextResync) || now.Equal(p.nextResync)
636}
637
638func (p *processorListener) determineNextResync(now time.Time) {
639	p.resyncLock.Lock()
640	defer p.resyncLock.Unlock()
641
642	p.nextResync = now.Add(p.resyncPeriod)
643}
644
645func (p *processorListener) setResyncPeriod(resyncPeriod time.Duration) {
646	p.resyncLock.Lock()
647	defer p.resyncLock.Unlock()
648
649	p.resyncPeriod = resyncPeriod
650}
651