1/*
2Copyright 2017 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package watch
18
19import (
20	"context"
21	"reflect"
22	goruntime "runtime"
23	"sort"
24	"testing"
25	"time"
26
27	"github.com/davecgh/go-spew/spew"
28
29	corev1 "k8s.io/api/core/v1"
30	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
31	"k8s.io/apimachinery/pkg/runtime"
32	"k8s.io/apimachinery/pkg/runtime/schema"
33	"k8s.io/apimachinery/pkg/util/diff"
34	"k8s.io/apimachinery/pkg/watch"
35	fakeclientset "k8s.io/client-go/kubernetes/fake"
36	testcore "k8s.io/client-go/testing"
37	"k8s.io/client-go/tools/cache"
38)
39
40// TestEventProcessorExit is expected to timeout if the event processor fails
41// to exit when stopped.
42func TestEventProcessorExit(t *testing.T) {
43	event := watch.Event{}
44
45	tests := []struct {
46		name  string
47		write func(e *eventProcessor)
48	}{
49		{
50			name: "exit on blocked read",
51			write: func(e *eventProcessor) {
52				e.push(event)
53			},
54		},
55		{
56			name: "exit on blocked write",
57			write: func(e *eventProcessor) {
58				e.push(event)
59				e.push(event)
60			},
61		},
62	}
63	for _, test := range tests {
64		t.Run(test.name, func(t *testing.T) {
65			out := make(chan watch.Event)
66			e := newEventProcessor(out)
67
68			test.write(e)
69
70			exited := make(chan struct{})
71			go func() {
72				e.run()
73				close(exited)
74			}()
75
76			<-out
77			e.stop()
78			goruntime.Gosched()
79			<-exited
80		})
81	}
82}
83
84type apiInt int
85
86func (apiInt) GetObjectKind() schema.ObjectKind { return nil }
87func (apiInt) DeepCopyObject() runtime.Object   { return nil }
88
89func TestEventProcessorOrdersEvents(t *testing.T) {
90	out := make(chan watch.Event)
91	e := newEventProcessor(out)
92	go e.run()
93
94	numProcessed := 0
95	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
96	go func() {
97		for i := 0; i < 1000; i++ {
98			e := <-out
99			if got, want := int(e.Object.(apiInt)), i; got != want {
100				t.Errorf("unexpected event: got=%d, want=%d", got, want)
101			}
102			numProcessed++
103		}
104		cancel()
105	}()
106
107	for i := 0; i < 1000; i++ {
108		e.push(watch.Event{Object: apiInt(i)})
109	}
110
111	<-ctx.Done()
112	e.stop()
113
114	if numProcessed != 1000 {
115		t.Errorf("unexpected number of events processed: %d", numProcessed)
116	}
117
118}
119
120type byEventTypeAndName []watch.Event
121
122func (a byEventTypeAndName) Len() int      { return len(a) }
123func (a byEventTypeAndName) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
124func (a byEventTypeAndName) Less(i, j int) bool {
125	if a[i].Type < a[j].Type {
126		return true
127	}
128
129	if a[i].Type > a[j].Type {
130		return false
131	}
132
133	return a[i].Object.(*corev1.Secret).Name < a[j].Object.(*corev1.Secret).Name
134}
135
136func TestNewInformerWatcher(t *testing.T) {
137	// Make sure there are no 2 same types of events on a secret with the same name or that might be flaky.
138	tt := []struct {
139		name    string
140		objects []runtime.Object
141		events  []watch.Event
142	}{
143		{
144			name: "basic test",
145			objects: []runtime.Object{
146				&corev1.Secret{
147					ObjectMeta: metav1.ObjectMeta{
148						Name: "pod-1",
149					},
150					StringData: map[string]string{
151						"foo-1": "initial",
152					},
153				},
154				&corev1.Secret{
155					ObjectMeta: metav1.ObjectMeta{
156						Name: "pod-2",
157					},
158					StringData: map[string]string{
159						"foo-2": "initial",
160					},
161				},
162				&corev1.Secret{
163					ObjectMeta: metav1.ObjectMeta{
164						Name: "pod-3",
165					},
166					StringData: map[string]string{
167						"foo-3": "initial",
168					},
169				},
170			},
171			events: []watch.Event{
172				{
173					Type: watch.Added,
174					Object: &corev1.Secret{
175						ObjectMeta: metav1.ObjectMeta{
176							Name: "pod-4",
177						},
178						StringData: map[string]string{
179							"foo-4": "initial",
180						},
181					},
182				},
183				{
184					Type: watch.Modified,
185					Object: &corev1.Secret{
186						ObjectMeta: metav1.ObjectMeta{
187							Name: "pod-2",
188						},
189						StringData: map[string]string{
190							"foo-2": "new",
191						},
192					},
193				},
194				{
195					Type: watch.Deleted,
196					Object: &corev1.Secret{
197						ObjectMeta: metav1.ObjectMeta{
198							Name: "pod-3",
199						},
200					},
201				},
202			},
203		},
204	}
205
206	for _, tc := range tt {
207		t.Run(tc.name, func(t *testing.T) {
208			var expected []watch.Event
209			for _, o := range tc.objects {
210				expected = append(expected, watch.Event{
211					Type:   watch.Added,
212					Object: o.DeepCopyObject(),
213				})
214			}
215			for _, e := range tc.events {
216				expected = append(expected, *e.DeepCopy())
217			}
218
219			fake := fakeclientset.NewSimpleClientset(tc.objects...)
220			fakeWatch := watch.NewFakeWithChanSize(len(tc.events), false)
221			fake.PrependWatchReactor("secrets", testcore.DefaultWatchReactor(fakeWatch, nil))
222
223			for _, e := range tc.events {
224				fakeWatch.Action(e.Type, e.Object)
225			}
226
227			lw := &cache.ListWatch{
228				ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
229					return fake.CoreV1().Secrets("").List(options)
230				},
231				WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
232					return fake.CoreV1().Secrets("").Watch(options)
233				},
234			}
235			_, _, w, done := NewIndexerInformerWatcher(lw, &corev1.Secret{})
236
237			var result []watch.Event
238		loop:
239			for {
240				var event watch.Event
241				var ok bool
242				select {
243				case event, ok = <-w.ResultChan():
244					if !ok {
245						t.Errorf("Failed to read event: channel is already closed!")
246						return
247					}
248
249					result = append(result, *event.DeepCopy())
250				case <-time.After(time.Second * 1):
251					// All the events are buffered -> this means we are done
252					// Also the one sec will make sure that we would detect RetryWatcher's incorrect behaviour after last event
253					break loop
254				}
255			}
256
257			// Informers don't guarantee event order so we need to sort these arrays to compare them
258			sort.Sort(byEventTypeAndName(expected))
259			sort.Sort(byEventTypeAndName(result))
260
261			if !reflect.DeepEqual(expected, result) {
262				t.Error(spew.Errorf("\nexpected: %#v,\ngot:      %#v,\ndiff: %s", expected, result, diff.ObjectReflectDiff(expected, result)))
263				return
264			}
265
266			// Fill in some data to test watch closing while there are some events to be read
267			for _, e := range tc.events {
268				fakeWatch.Action(e.Type, e.Object)
269			}
270
271			// Stop before reading all the data to make sure the informer can deal with closed channel
272			w.Stop()
273
274			<-done
275		})
276	}
277
278}
279