1/*
2Copyright 2015 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package cache
18
19import (
20	"fmt"
21	"math/rand"
22	"sync"
23	"testing"
24	"time"
25
26	"k8s.io/api/core/v1"
27	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28	"k8s.io/apimachinery/pkg/runtime"
29	"k8s.io/apimachinery/pkg/util/sets"
30	"k8s.io/apimachinery/pkg/util/wait"
31	"k8s.io/apimachinery/pkg/watch"
32	fcache "k8s.io/client-go/tools/cache/testing"
33
34	"github.com/google/gofuzz"
35)
36
37func Example() {
38	// source simulates an apiserver object endpoint.
39	source := fcache.NewFakeControllerSource()
40
41	// This will hold the downstream state, as we know it.
42	downstream := NewStore(DeletionHandlingMetaNamespaceKeyFunc)
43
44	// This will hold incoming changes. Note how we pass downstream in as a
45	// KeyLister, that way resync operations will result in the correct set
46	// of update/delete deltas.
47	fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, downstream)
48
49	// Let's do threadsafe output to get predictable test results.
50	deletionCounter := make(chan string, 1000)
51
52	cfg := &Config{
53		Queue:            fifo,
54		ListerWatcher:    source,
55		ObjectType:       &v1.Pod{},
56		FullResyncPeriod: time.Millisecond * 100,
57		RetryOnError:     false,
58
59		// Let's implement a simple controller that just deletes
60		// everything that comes in.
61		Process: func(obj interface{}) error {
62			// Obj is from the Pop method of the Queue we make above.
63			newest := obj.(Deltas).Newest()
64
65			if newest.Type != Deleted {
66				// Update our downstream store.
67				err := downstream.Add(newest.Object)
68				if err != nil {
69					return err
70				}
71
72				// Delete this object.
73				source.Delete(newest.Object.(runtime.Object))
74			} else {
75				// Update our downstream store.
76				err := downstream.Delete(newest.Object)
77				if err != nil {
78					return err
79				}
80
81				// fifo's KeyOf is easiest, because it handles
82				// DeletedFinalStateUnknown markers.
83				key, err := fifo.KeyOf(newest.Object)
84				if err != nil {
85					return err
86				}
87
88				// Report this deletion.
89				deletionCounter <- key
90			}
91			return nil
92		},
93	}
94
95	// Create the controller and run it until we close stop.
96	stop := make(chan struct{})
97	defer close(stop)
98	go New(cfg).Run(stop)
99
100	// Let's add a few objects to the source.
101	testIDs := []string{"a-hello", "b-controller", "c-framework"}
102	for _, name := range testIDs {
103		// Note that these pods are not valid-- the fake source doesn't
104		// call validation or anything.
105		source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: name}})
106	}
107
108	// Let's wait for the controller to process the things we just added.
109	outputSet := sets.String{}
110	for i := 0; i < len(testIDs); i++ {
111		outputSet.Insert(<-deletionCounter)
112	}
113
114	for _, key := range outputSet.List() {
115		fmt.Println(key)
116	}
117	// Output:
118	// a-hello
119	// b-controller
120	// c-framework
121}
122
123func ExampleNewInformer() {
124	// source simulates an apiserver object endpoint.
125	source := fcache.NewFakeControllerSource()
126
127	// Let's do threadsafe output to get predictable test results.
128	deletionCounter := make(chan string, 1000)
129
130	// Make a controller that immediately deletes anything added to it, and
131	// logs anything deleted.
132	_, controller := NewInformer(
133		source,
134		&v1.Pod{},
135		time.Millisecond*100,
136		ResourceEventHandlerFuncs{
137			AddFunc: func(obj interface{}) {
138				source.Delete(obj.(runtime.Object))
139			},
140			DeleteFunc: func(obj interface{}) {
141				key, err := DeletionHandlingMetaNamespaceKeyFunc(obj)
142				if err != nil {
143					key = "oops something went wrong with the key"
144				}
145
146				// Report this deletion.
147				deletionCounter <- key
148			},
149		},
150	)
151
152	// Run the controller and run it until we close stop.
153	stop := make(chan struct{})
154	defer close(stop)
155	go controller.Run(stop)
156
157	// Let's add a few objects to the source.
158	testIDs := []string{"a-hello", "b-controller", "c-framework"}
159	for _, name := range testIDs {
160		// Note that these pods are not valid-- the fake source doesn't
161		// call validation or anything.
162		source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: name}})
163	}
164
165	// Let's wait for the controller to process the things we just added.
166	outputSet := sets.String{}
167	for i := 0; i < len(testIDs); i++ {
168		outputSet.Insert(<-deletionCounter)
169	}
170
171	for _, key := range outputSet.List() {
172		fmt.Println(key)
173	}
174	// Output:
175	// a-hello
176	// b-controller
177	// c-framework
178}
179
180func TestHammerController(t *testing.T) {
181	// This test executes a bunch of requests through the fake source and
182	// controller framework to make sure there's no locking/threading
183	// errors. If an error happens, it should hang forever or trigger the
184	// race detector.
185
186	// source simulates an apiserver object endpoint.
187	source := fcache.NewFakeControllerSource()
188
189	// Let's do threadsafe output to get predictable test results.
190	outputSetLock := sync.Mutex{}
191	// map of key to operations done on the key
192	outputSet := map[string][]string{}
193
194	recordFunc := func(eventType string, obj interface{}) {
195		key, err := DeletionHandlingMetaNamespaceKeyFunc(obj)
196		if err != nil {
197			t.Errorf("something wrong with key: %v", err)
198			key = "oops something went wrong with the key"
199		}
200
201		// Record some output when items are deleted.
202		outputSetLock.Lock()
203		defer outputSetLock.Unlock()
204		outputSet[key] = append(outputSet[key], eventType)
205	}
206
207	// Make a controller which just logs all the changes it gets.
208	_, controller := NewInformer(
209		source,
210		&v1.Pod{},
211		time.Millisecond*100,
212		ResourceEventHandlerFuncs{
213			AddFunc:    func(obj interface{}) { recordFunc("add", obj) },
214			UpdateFunc: func(oldObj, newObj interface{}) { recordFunc("update", newObj) },
215			DeleteFunc: func(obj interface{}) { recordFunc("delete", obj) },
216		},
217	)
218
219	if controller.HasSynced() {
220		t.Errorf("Expected HasSynced() to return false before we started the controller")
221	}
222
223	// Run the controller and run it until we close stop.
224	stop := make(chan struct{})
225	go controller.Run(stop)
226
227	// Let's wait for the controller to do its initial sync
228	wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
229		return controller.HasSynced(), nil
230	})
231	if !controller.HasSynced() {
232		t.Errorf("Expected HasSynced() to return true after the initial sync")
233	}
234
235	wg := sync.WaitGroup{}
236	const threads = 3
237	wg.Add(threads)
238	for i := 0; i < threads; i++ {
239		go func() {
240			defer wg.Done()
241			// Let's add a few objects to the source.
242			currentNames := sets.String{}
243			rs := rand.NewSource(rand.Int63())
244			f := fuzz.New().NilChance(.5).NumElements(0, 2).RandSource(rs)
245			for i := 0; i < 100; i++ {
246				var name string
247				var isNew bool
248				if currentNames.Len() == 0 || rand.Intn(3) == 1 {
249					f.Fuzz(&name)
250					isNew = true
251				} else {
252					l := currentNames.List()
253					name = l[rand.Intn(len(l))]
254				}
255
256				pod := &v1.Pod{}
257				f.Fuzz(pod)
258				pod.ObjectMeta.Name = name
259				pod.ObjectMeta.Namespace = "default"
260				// Add, update, or delete randomly.
261				// Note that these pods are not valid-- the fake source doesn't
262				// call validation or perform any other checking.
263				if isNew {
264					currentNames.Insert(name)
265					source.Add(pod)
266					continue
267				}
268				switch rand.Intn(2) {
269				case 0:
270					currentNames.Insert(name)
271					source.Modify(pod)
272				case 1:
273					currentNames.Delete(name)
274					source.Delete(pod)
275				}
276			}
277		}()
278	}
279	wg.Wait()
280
281	// Let's wait for the controller to finish processing the things we just added.
282	// TODO: look in the queue to see how many items need to be processed.
283	time.Sleep(100 * time.Millisecond)
284	close(stop)
285
286	// TODO: Verify that no goroutines were leaked here and that everything shut
287	// down cleanly.
288
289	outputSetLock.Lock()
290	t.Logf("got: %#v", outputSet)
291}
292
293func TestUpdate(t *testing.T) {
294	// This test is going to exercise the various paths that result in a
295	// call to update.
296
297	// source simulates an apiserver object endpoint.
298	source := fcache.NewFakeControllerSource()
299
300	const (
301		FROM = "from"
302		TO   = "to"
303	)
304
305	// These are the transitions we expect to see; because this is
306	// asynchronous, there are a lot of valid possibilities.
307	type pair struct{ from, to string }
308	allowedTransitions := map[pair]bool{
309		{FROM, TO}: true,
310
311		// Because a resync can happen when we've already observed one
312		// of the above but before the item is deleted.
313		{TO, TO}: true,
314		// Because a resync could happen before we observe an update.
315		{FROM, FROM}: true,
316	}
317
318	pod := func(name, check string, final bool) *v1.Pod {
319		p := &v1.Pod{
320			ObjectMeta: metav1.ObjectMeta{
321				Name:   name,
322				Labels: map[string]string{"check": check},
323			},
324		}
325		if final {
326			p.Labels["final"] = "true"
327		}
328		return p
329	}
330	deletePod := func(p *v1.Pod) bool {
331		return p.Labels["final"] == "true"
332	}
333
334	tests := []func(string){
335		func(name string) {
336			name = "a-" + name
337			source.Add(pod(name, FROM, false))
338			source.Modify(pod(name, TO, true))
339		},
340	}
341
342	const threads = 3
343
344	var testDoneWG sync.WaitGroup
345	testDoneWG.Add(threads * len(tests))
346
347	// Make a controller that deletes things once it observes an update.
348	// It calls Done() on the wait group on deletions so we can tell when
349	// everything we've added has been deleted.
350	watchCh := make(chan struct{})
351	_, controller := NewInformer(
352		&testLW{
353			WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
354				watch, err := source.Watch(options)
355				close(watchCh)
356				return watch, err
357			},
358			ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
359				return source.List(options)
360			},
361		},
362		&v1.Pod{},
363		0,
364		ResourceEventHandlerFuncs{
365			UpdateFunc: func(oldObj, newObj interface{}) {
366				o, n := oldObj.(*v1.Pod), newObj.(*v1.Pod)
367				from, to := o.Labels["check"], n.Labels["check"]
368				if !allowedTransitions[pair{from, to}] {
369					t.Errorf("observed transition %q -> %q for %v", from, to, n.Name)
370				}
371				if deletePod(n) {
372					source.Delete(n)
373				}
374			},
375			DeleteFunc: func(obj interface{}) {
376				testDoneWG.Done()
377			},
378		},
379	)
380
381	// Run the controller and run it until we close stop.
382	// Once Run() is called, calls to testDoneWG.Done() might start, so
383	// all testDoneWG.Add() calls must happen before this point
384	stop := make(chan struct{})
385	go controller.Run(stop)
386	<-watchCh
387
388	// run every test a few times, in parallel
389	var wg sync.WaitGroup
390	wg.Add(threads * len(tests))
391	for i := 0; i < threads; i++ {
392		for j, f := range tests {
393			go func(name string, f func(string)) {
394				defer wg.Done()
395				f(name)
396			}(fmt.Sprintf("%v-%v", i, j), f)
397		}
398	}
399	wg.Wait()
400
401	// Let's wait for the controller to process the things we just added.
402	testDoneWG.Wait()
403	close(stop)
404}
405
406func TestPanicPropagated(t *testing.T) {
407	// source simulates an apiserver object endpoint.
408	source := fcache.NewFakeControllerSource()
409
410	// Make a controller that just panic if the AddFunc is called.
411	_, controller := NewInformer(
412		source,
413		&v1.Pod{},
414		time.Millisecond*100,
415		ResourceEventHandlerFuncs{
416			AddFunc: func(obj interface{}) {
417				// Create a panic.
418				panic("Just panic.")
419			},
420		},
421	)
422
423	// Run the controller and run it until we close stop.
424	stop := make(chan struct{})
425	defer close(stop)
426
427	propagated := make(chan interface{})
428	go func() {
429		defer func() {
430			if r := recover(); r != nil {
431				propagated <- r
432			}
433		}()
434		controller.Run(stop)
435	}()
436	// Let's add a object to the source. It will trigger a panic.
437	source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test"}})
438
439	// Check if the panic propagated up.
440	select {
441	case p := <-propagated:
442		if p == "Just panic." {
443			t.Logf("Test Passed")
444		} else {
445			t.Errorf("unrecognized panic in controller run: %v", p)
446		}
447	case <-time.After(wait.ForeverTestTimeout):
448		t.Errorf("timeout: the panic failed to propagate from the controller run method!")
449	}
450}
451