1/*
2Copyright 2015 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package cache
18
19import (
20	"sync"
21	"time"
22
23	"k8s.io/apimachinery/pkg/runtime"
24	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
25	"k8s.io/apimachinery/pkg/util/wait"
26	"k8s.io/client-go/util/clock"
27)
28
29// Config contains all the settings for a Controller.
30type Config struct {
31	// The queue for your objects; either a FIFO or
32	// a DeltaFIFO. Your Process() function should accept
33	// the output of this Queue's Pop() method.
34	Queue
35
36	// Something that can list and watch your objects.
37	ListerWatcher
38
39	// Something that can process your objects.
40	Process ProcessFunc
41
42	// The type of your objects.
43	ObjectType runtime.Object
44
45	// Reprocess everything at least this often.
46	// Note that if it takes longer for you to clear the queue than this
47	// period, you will end up processing items in the order determined
48	// by FIFO.Replace(). Currently, this is random. If this is a
49	// problem, we can change that replacement policy to append new
50	// things to the end of the queue instead of replacing the entire
51	// queue.
52	FullResyncPeriod time.Duration
53
54	// ShouldResync, if specified, is invoked when the controller's reflector determines the next
55	// periodic sync should occur. If this returns true, it means the reflector should proceed with
56	// the resync.
57	ShouldResync ShouldResyncFunc
58
59	// If true, when Process() returns an error, re-enqueue the object.
60	// TODO: add interface to let you inject a delay/backoff or drop
61	//       the object completely if desired. Pass the object in
62	//       question to this interface as a parameter.
63	RetryOnError bool
64}
65
66// ShouldResyncFunc is a type of function that indicates if a reflector should perform a
67// resync or not. It can be used by a shared informer to support multiple event handlers with custom
68// resync periods.
69type ShouldResyncFunc func() bool
70
71// ProcessFunc processes a single object.
72type ProcessFunc func(obj interface{}) error
73
74// Controller is a generic controller framework.
75type controller struct {
76	config         Config
77	reflector      *Reflector
78	reflectorMutex sync.RWMutex
79	clock          clock.Clock
80}
81
82type Controller interface {
83	Run(stopCh <-chan struct{})
84	HasSynced() bool
85	LastSyncResourceVersion() string
86}
87
88// New makes a new Controller from the given Config.
89func New(c *Config) Controller {
90	ctlr := &controller{
91		config: *c,
92		clock:  &clock.RealClock{},
93	}
94	return ctlr
95}
96
97// Run begins processing items, and will continue until a value is sent down stopCh.
98// It's an error to call Run more than once.
99// Run blocks; call via go.
100func (c *controller) Run(stopCh <-chan struct{}) {
101	defer utilruntime.HandleCrash()
102	go func() {
103		<-stopCh
104		c.config.Queue.Close()
105	}()
106	r := NewReflector(
107		c.config.ListerWatcher,
108		c.config.ObjectType,
109		c.config.Queue,
110		c.config.FullResyncPeriod,
111	)
112	r.ShouldResync = c.config.ShouldResync
113	r.clock = c.clock
114
115	c.reflectorMutex.Lock()
116	c.reflector = r
117	c.reflectorMutex.Unlock()
118
119	r.RunUntil(stopCh)
120
121	wait.Until(c.processLoop, time.Second, stopCh)
122}
123
124// Returns true once this controller has completed an initial resource listing
125func (c *controller) HasSynced() bool {
126	return c.config.Queue.HasSynced()
127}
128
129func (c *controller) LastSyncResourceVersion() string {
130	if c.reflector == nil {
131		return ""
132	}
133	return c.reflector.LastSyncResourceVersion()
134}
135
136// processLoop drains the work queue.
137// TODO: Consider doing the processing in parallel. This will require a little thought
138// to make sure that we don't end up processing the same object multiple times
139// concurrently.
140//
141// TODO: Plumb through the stopCh here (and down to the queue) so that this can
142// actually exit when the controller is stopped. Or just give up on this stuff
143// ever being stoppable. Converting this whole package to use Context would
144// also be helpful.
145func (c *controller) processLoop() {
146	for {
147		obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
148		if err != nil {
149			if err == FIFOClosedError {
150				return
151			}
152			if c.config.RetryOnError {
153				// This is the safe way to re-enqueue.
154				c.config.Queue.AddIfNotPresent(obj)
155			}
156		}
157	}
158}
159
160// ResourceEventHandler can handle notifications for events that happen to a
161// resource. The events are informational only, so you can't return an
162// error.
163//  * OnAdd is called when an object is added.
164//  * OnUpdate is called when an object is modified. Note that oldObj is the
165//      last known state of the object-- it is possible that several changes
166//      were combined together, so you can't use this to see every single
167//      change. OnUpdate is also called when a re-list happens, and it will
168//      get called even if nothing changed. This is useful for periodically
169//      evaluating or syncing something.
170//  * OnDelete will get the final state of the item if it is known, otherwise
171//      it will get an object of type DeletedFinalStateUnknown. This can
172//      happen if the watch is closed and misses the delete event and we don't
173//      notice the deletion until the subsequent re-list.
174type ResourceEventHandler interface {
175	OnAdd(obj interface{})
176	OnUpdate(oldObj, newObj interface{})
177	OnDelete(obj interface{})
178}
179
180// ResourceEventHandlerFuncs is an adaptor to let you easily specify as many or
181// as few of the notification functions as you want while still implementing
182// ResourceEventHandler.
183type ResourceEventHandlerFuncs struct {
184	AddFunc    func(obj interface{})
185	UpdateFunc func(oldObj, newObj interface{})
186	DeleteFunc func(obj interface{})
187}
188
189// OnAdd calls AddFunc if it's not nil.
190func (r ResourceEventHandlerFuncs) OnAdd(obj interface{}) {
191	if r.AddFunc != nil {
192		r.AddFunc(obj)
193	}
194}
195
196// OnUpdate calls UpdateFunc if it's not nil.
197func (r ResourceEventHandlerFuncs) OnUpdate(oldObj, newObj interface{}) {
198	if r.UpdateFunc != nil {
199		r.UpdateFunc(oldObj, newObj)
200	}
201}
202
203// OnDelete calls DeleteFunc if it's not nil.
204func (r ResourceEventHandlerFuncs) OnDelete(obj interface{}) {
205	if r.DeleteFunc != nil {
206		r.DeleteFunc(obj)
207	}
208}
209
210// DeletionHandlingMetaNamespaceKeyFunc checks for
211// DeletedFinalStateUnknown objects before calling
212// MetaNamespaceKeyFunc.
213func DeletionHandlingMetaNamespaceKeyFunc(obj interface{}) (string, error) {
214	if d, ok := obj.(DeletedFinalStateUnknown); ok {
215		return d.Key, nil
216	}
217	return MetaNamespaceKeyFunc(obj)
218}
219
220// NewInformer returns a Store and a controller for populating the store
221// while also providing event notifications. You should only used the returned
222// Store for Get/List operations; Add/Modify/Deletes will cause the event
223// notifications to be faulty.
224//
225// Parameters:
226//  * lw is list and watch functions for the source of the resource you want to
227//    be informed of.
228//  * objType is an object of the type that you expect to receive.
229//  * resyncPeriod: if non-zero, will re-list this often (you will get OnUpdate
230//    calls, even if nothing changed). Otherwise, re-list will be delayed as
231//    long as possible (until the upstream source closes the watch or times out,
232//    or you stop the controller).
233//  * h is the object you want notifications sent to.
234//
235func NewInformer(
236	lw ListerWatcher,
237	objType runtime.Object,
238	resyncPeriod time.Duration,
239	h ResourceEventHandler,
240) (Store, Controller) {
241	// This will hold the client state, as we know it.
242	clientState := NewStore(DeletionHandlingMetaNamespaceKeyFunc)
243
244	// This will hold incoming changes. Note how we pass clientState in as a
245	// KeyLister, that way resync operations will result in the correct set
246	// of update/delete deltas.
247	fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, nil, clientState)
248
249	cfg := &Config{
250		Queue:            fifo,
251		ListerWatcher:    lw,
252		ObjectType:       objType,
253		FullResyncPeriod: resyncPeriod,
254		RetryOnError:     false,
255
256		Process: func(obj interface{}) error {
257			// from oldest to newest
258			for _, d := range obj.(Deltas) {
259				switch d.Type {
260				case Sync, Added, Updated:
261					if old, exists, err := clientState.Get(d.Object); err == nil && exists {
262						if err := clientState.Update(d.Object); err != nil {
263							return err
264						}
265						h.OnUpdate(old, d.Object)
266					} else {
267						if err := clientState.Add(d.Object); err != nil {
268							return err
269						}
270						h.OnAdd(d.Object)
271					}
272				case Deleted:
273					if err := clientState.Delete(d.Object); err != nil {
274						return err
275					}
276					h.OnDelete(d.Object)
277				}
278			}
279			return nil
280		},
281	}
282	return clientState, New(cfg)
283}
284
285// NewIndexerInformer returns a Indexer and a controller for populating the index
286// while also providing event notifications. You should only used the returned
287// Index for Get/List operations; Add/Modify/Deletes will cause the event
288// notifications to be faulty.
289//
290// Parameters:
291//  * lw is list and watch functions for the source of the resource you want to
292//    be informed of.
293//  * objType is an object of the type that you expect to receive.
294//  * resyncPeriod: if non-zero, will re-list this often (you will get OnUpdate
295//    calls, even if nothing changed). Otherwise, re-list will be delayed as
296//    long as possible (until the upstream source closes the watch or times out,
297//    or you stop the controller).
298//  * h is the object you want notifications sent to.
299//
300func NewIndexerInformer(
301	lw ListerWatcher,
302	objType runtime.Object,
303	resyncPeriod time.Duration,
304	h ResourceEventHandler,
305	indexers Indexers,
306) (Indexer, Controller) {
307	// This will hold the client state, as we know it.
308	clientState := NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers)
309
310	// This will hold incoming changes. Note how we pass clientState in as a
311	// KeyLister, that way resync operations will result in the correct set
312	// of update/delete deltas.
313	fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, nil, clientState)
314
315	cfg := &Config{
316		Queue:            fifo,
317		ListerWatcher:    lw,
318		ObjectType:       objType,
319		FullResyncPeriod: resyncPeriod,
320		RetryOnError:     false,
321
322		Process: func(obj interface{}) error {
323			// from oldest to newest
324			for _, d := range obj.(Deltas) {
325				switch d.Type {
326				case Sync, Added, Updated:
327					if old, exists, err := clientState.Get(d.Object); err == nil && exists {
328						if err := clientState.Update(d.Object); err != nil {
329							return err
330						}
331						h.OnUpdate(old, d.Object)
332					} else {
333						if err := clientState.Add(d.Object); err != nil {
334							return err
335						}
336						h.OnAdd(d.Object)
337					}
338				case Deleted:
339					if err := clientState.Delete(d.Object); err != nil {
340						return err
341					}
342					h.OnDelete(d.Object)
343				}
344			}
345			return nil
346		},
347	}
348	return clientState, New(cfg)
349}
350