1// Copyright 2017 Istio Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package crd
16
17import (
18	"context"
19	"errors"
20	"reflect"
21	"sync"
22	"sync/atomic"
23	"testing"
24	"time"
25
26	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
28	"k8s.io/apimachinery/pkg/runtime"
29	"k8s.io/apimachinery/pkg/watch"
30	"k8s.io/client-go/discovery"
31	"k8s.io/client-go/discovery/fake"
32	"k8s.io/client-go/dynamic"
33	"k8s.io/client-go/rest"
34	k8stesting "k8s.io/client-go/testing"
35
36	"istio.io/istio/mixer/pkg/config/store"
37	"istio.io/pkg/probe"
38)
39
40// The "retryTimeout" used by the test.
41const testingRetryTimeout = 10 * time.Millisecond
42
43// The timeout for "waitFor" function, waiting for the expected event to come.
44const waitForTimeout = time.Second
45
46const apiGroupVersion = ConfigAPIGroup + "/" + ConfigAPIVersion
47
48func createFakeDiscovery(_ *rest.Config) (discovery.DiscoveryInterface, error) {
49	return &fake.FakeDiscovery{
50		Fake: &k8stesting.Fake{
51			Resources: []*metav1.APIResourceList{
52				{
53					GroupVersion: apiGroupVersion,
54					APIResources: []metav1.APIResource{
55						{Name: "handlers", SingularName: "handler", Kind: "Handler", Namespaced: true},
56						{Name: "actions", SingularName: "action", Kind: "Action", Namespaced: true},
57					},
58				},
59			},
60		},
61	}, nil
62}
63
64type dummyListerWatcherBuilder struct {
65	mu       sync.RWMutex
66	data     map[store.Key]*unstructured.Unstructured
67	watchers map[string]*watch.RaceFreeFakeWatcher
68}
69
70func (f *fakeDynamicResource) List(ctx context.Context, opts metav1.ListOptions) (*unstructured.UnstructuredList, error) {
71	list := &unstructured.UnstructuredList{}
72	f.d.mu.RLock()
73	for k, v := range f.d.data {
74		if k.Kind == f.res.Kind {
75			list.Items = append(list.Items, *v)
76		}
77	}
78	f.d.mu.RUnlock()
79	return list, nil
80}
81
82func (f *fakeDynamicResource) Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) {
83	return f.w, nil
84}
85
86type fakeDynamicResource struct {
87	d *dummyListerWatcherBuilder
88	dynamic.ResourceInterface
89	w   watch.Interface
90	res metav1.APIResource
91}
92
93func (d *dummyListerWatcherBuilder) build(res metav1.APIResource) dynamic.ResourceInterface {
94	w := watch.NewRaceFreeFake()
95	d.mu.Lock()
96	d.watchers[res.Kind] = w
97	d.mu.Unlock()
98
99	return &fakeDynamicResource{d: d, w: w, res: res}
100}
101
102// nolint: unparam
103func (d *dummyListerWatcherBuilder) put(key store.Key, spec map[string]interface{}) error {
104	res := &unstructured.Unstructured{}
105	res.SetKind(key.Kind)
106	res.SetAPIVersion(apiGroupVersion)
107	res.SetName(key.Name)
108	res.SetNamespace(key.Namespace)
109	res.Object["spec"] = spec
110
111	d.mu.Lock()
112	defer d.mu.Unlock()
113	_, existed := d.data[key]
114	d.data[key] = res
115	w, ok := d.watchers[key.Kind]
116	if !ok {
117		return nil
118	}
119	if existed {
120		w.Modify(res)
121	} else {
122		w.Add(res)
123	}
124	return nil
125}
126
127func (d *dummyListerWatcherBuilder) delete(key store.Key) {
128	d.mu.Lock()
129	defer d.mu.Unlock()
130	value, ok := d.data[key]
131	if !ok {
132		return
133	}
134	delete(d.data, key)
135	w, ok := d.watchers[key.Kind]
136	if !ok {
137		return
138	}
139	w.Delete(value)
140}
141
142func getTempClient() (*Store, string, *dummyListerWatcherBuilder) {
143	ns := "istio-mixer-testing"
144
145	lw := &dummyListerWatcherBuilder{
146		data:     map[store.Key]*unstructured.Unstructured{},
147		watchers: map[string]*watch.RaceFreeFakeWatcher{},
148	}
149	client := &Store{
150		conf:             &rest.Config{},
151		donec:            make(chan struct{}),
152		retryTimeout:     testingRetryTimeout,
153		discoveryBuilder: createFakeDiscovery,
154		listerWatcherBuilder: func(*rest.Config) (listerWatcherBuilderInterface, error) {
155			return lw, nil
156		},
157		Probe:         probe.NewProbe(),
158		retryInterval: 1 * time.Millisecond,
159	}
160	return client, ns, lw
161}
162
163func waitFor(wch <-chan store.BackendEvent, ct store.ChangeType, key store.Key) error {
164	timeout := time.After(waitForTimeout)
165	for {
166		select {
167		case ev := <-wch:
168			if ev.Key == key && ev.Type == ct {
169				return nil
170			}
171		case <-timeout:
172			return context.DeadlineExceeded
173		}
174	}
175}
176
177func TestStore(t *testing.T) {
178	s, ns, lw := getTempClient()
179	if err := s.Init([]string{"Handler", "Action"}); err != nil {
180		t.Fatal(err.Error())
181	}
182	defer s.Stop()
183
184	wch, err := s.Watch()
185	if err != nil {
186		t.Fatal(err.Error())
187	}
188	k := store.Key{Kind: "Handler", Namespace: ns, Name: "default"}
189	if _, err = s.Get(k); err != store.ErrNotFound {
190		t.Errorf("Got %v, Want ErrNotFound", err)
191	}
192	h := map[string]interface{}{"name": "default", "adapter": "noop"}
193	if err = lw.put(k, h); err != nil {
194		t.Errorf("Got %v, Want nil", err)
195	}
196	if err = waitFor(wch, store.Update, k); err != nil {
197		t.Errorf("Got %v, Want nil", err)
198	}
199	h2, err := s.Get(k)
200	if err != nil {
201		t.Errorf("Got %v, Want nil", err)
202	}
203	if !reflect.DeepEqual(h, h2.Spec) {
204		t.Errorf("Got %+v, Want %+v", h2.Spec, h)
205	}
206	want := map[store.Key]*store.BackEndResource{k: h2}
207	if lst := s.List(); !reflect.DeepEqual(lst, want) {
208		t.Errorf("Got %+v, Want %+v", lst, want)
209	}
210	h["adapter"] = "noop2"
211	if err = lw.put(k, h); err != nil {
212		t.Errorf("Got %v, Want nil", err)
213	}
214	h2, err = s.Get(k)
215	if err != nil {
216		t.Errorf("Got %v, Want nil", err)
217	}
218	if !reflect.DeepEqual(h, h2.Spec) {
219		t.Errorf("Got %+v, Want %+v", h2.Spec, h)
220	}
221	lw.delete(k)
222	if err = waitFor(wch, store.Delete, k); err != nil {
223		t.Errorf("Got %v, Want nil", err)
224	}
225	if _, err := s.Get(k); err != store.ErrNotFound {
226		t.Errorf("Got %v, Want ErrNotFound", err)
227	}
228}
229
230func TestStoreWrongKind(t *testing.T) {
231	s, ns, lw := getTempClient()
232	if err := s.Init([]string{"Action"}); err != nil {
233		t.Fatal(err.Error())
234	}
235	defer s.Stop()
236
237	k := store.Key{Kind: "Handler", Namespace: ns, Name: "default"}
238	h := map[string]interface{}{"name": "default", "adapter": "noop"}
239	if err := lw.put(k, h); err != nil {
240		t.Error("Got nil, Want error")
241	}
242
243	if _, err := s.Get(k); err == nil {
244		t.Errorf("Got nil, Want error")
245	}
246}
247
248func TestStoreNamespaces(t *testing.T) {
249	s, ns, lw := getTempClient()
250	otherNS := "other-namespace"
251	s.ns = map[string]bool{ns: true, otherNS: true}
252	if err := s.Init([]string{"Action", "Handler"}); err != nil {
253		t.Fatal(err)
254	}
255	defer s.Stop()
256
257	wch, err := s.Watch()
258	if err != nil {
259		t.Fatal(err)
260	}
261	k1 := store.Key{Kind: "Handler", Namespace: ns, Name: "default"}
262	k2 := store.Key{Kind: "Handler", Namespace: otherNS, Name: "default"}
263	k3 := store.Key{Kind: "Handler", Namespace: "irrelevant-namespace", Name: "default"}
264	h := map[string]interface{}{"name": "default", "adapter": "noop"}
265	for _, k := range []store.Key{k1, k2, k3} {
266		if err = lw.put(k, h); err != nil {
267			t.Errorf("Got %v, Want nil", err)
268		}
269	}
270	if err = waitFor(wch, store.Update, k3); err == nil {
271		t.Error("Got nil, Want error")
272	}
273	list := s.List()
274	for _, c := range []struct {
275		key store.Key
276		ok  bool
277	}{
278		{k1, true},
279		{k2, true},
280		{k3, false},
281	} {
282		if _, ok := list[c.key]; ok != c.ok {
283			t.Errorf("For key %s, Got %v, Want %v", c.key, ok, c.ok)
284		}
285		if _, err = s.Get(c.key); (err == nil) != c.ok {
286			t.Errorf("For key %s, Got %v error, Want %v", c.key, err, c.ok)
287		}
288	}
289}
290
291func TestStoreFailToInit(t *testing.T) {
292	s, _, _ := getTempClient()
293	s.discoveryBuilder = func(*rest.Config) (discovery.DiscoveryInterface, error) {
294		return nil, errors.New("dummy")
295	}
296	if err := s.Init([]string{"Handler", "Action"}); err.Error() != "dummy" {
297		t.Errorf("Got %v, Want dummy error", err)
298	}
299	s.discoveryBuilder = createFakeDiscovery
300	s.listerWatcherBuilder = func(*rest.Config) (listerWatcherBuilderInterface, error) {
301		return nil, errors.New("dummy2")
302	}
303	if err := s.Init([]string{"Handler", "Action"}); err.Error() != "dummy2" {
304		t.Errorf("Got %v, Want dummy2 error", err)
305	}
306	s.Stop()
307}
308
309func TestCriticalCrdsAreReady(t *testing.T) {
310	fakeDiscovery := &fake.FakeDiscovery{
311		Fake: &k8stesting.Fake{
312			Resources: []*metav1.APIResourceList{
313				{GroupVersion: apiGroupVersion},
314			},
315		},
316	}
317	var callCount int32
318	fakeDiscovery.AddReactor("get", "resource", func(k8stesting.Action) (bool, runtime.Object, error) {
319		atomic.AddInt32(&callCount, 1)
320		fakeDiscovery.Resources[0].APIResources = append(
321			fakeDiscovery.Resources[0].APIResources,
322			metav1.APIResource{Name: "handlers", SingularName: "handler", Kind: "Handler", Namespaced: true},
323			metav1.APIResource{Name: "actions", SingularName: "action", Kind: "Action", Namespaced: true},
324		)
325		return true, nil, nil
326	})
327
328	s, _, _ := getTempClient()
329	s.discoveryBuilder = func(*rest.Config) (discovery.DiscoveryInterface, error) {
330		return fakeDiscovery, nil
331	}
332	s.criticalKinds = []string{"Handler", "Action"}
333	s.bgRetryInterval = 1 * time.Millisecond
334	err := s.Init([]string{"Handler", "Action", "Whatever"})
335	if err != nil {
336		t.Errorf("Got error %v from Init", err)
337	}
338	count := atomic.LoadInt32(&callCount)
339	if count != 1 {
340		t.Errorf("callCount is not expected, got %v wang 1", count)
341	}
342	s.Stop()
343}
344
345func TestCriticalCrdsAreNotReadyRetryTimeout(t *testing.T) {
346	fakeDiscovery := &fake.FakeDiscovery{
347		Fake: &k8stesting.Fake{
348			Resources: []*metav1.APIResourceList{
349				{GroupVersion: apiGroupVersion},
350			},
351		},
352	}
353	var callCount int32
354	fakeDiscovery.AddReactor("get", "resource", func(k8stesting.Action) (bool, runtime.Object, error) {
355		atomic.AddInt32(&callCount, 1)
356		return true, nil, nil
357	})
358
359	s, _, _ := getTempClient()
360	s.discoveryBuilder = func(*rest.Config) (discovery.DiscoveryInterface, error) {
361		return fakeDiscovery, nil
362	}
363	s.criticalKinds = []string{"Handler"}
364	s.retryTimeout = 2 * time.Second
365	s.retryInterval = time.Second
366	err := s.Init([]string{"Handler", "Action"})
367	errorMsg := "failed to discover critical kinds: [Handler]"
368	if err == nil {
369		t.Errorf("got no error from Init, want Init to fail")
370	} else if err.Error() != errorMsg {
371		t.Errorf("got Init error message %v, want %v", err.Error(), errorMsg)
372	}
373	count := atomic.LoadInt32(&callCount)
374	if count < 1 || count > 3 {
375		t.Errorf("got callCount %v, want call count to be more than 1 and less than 3 times", count)
376	}
377	s.Stop()
378}
379
380func TestCriticalCrdsRetryMakeSucceed(t *testing.T) {
381	fakeDiscovery := &fake.FakeDiscovery{
382		Fake: &k8stesting.Fake{
383			Resources: []*metav1.APIResourceList{
384				{GroupVersion: apiGroupVersion},
385			},
386		},
387	}
388	var callCount int32
389	// Gradually increase the number of API resources.
390	fakeDiscovery.AddReactor("get", "resource", func(k8stesting.Action) (bool, runtime.Object, error) {
391		count := atomic.AddInt32(&callCount, 1)
392		if count == 2 {
393			fakeDiscovery.Resources[0].APIResources = append(
394				fakeDiscovery.Resources[0].APIResources,
395				metav1.APIResource{Name: "handlers", SingularName: "handler", Kind: "Handler", Namespaced: true},
396			)
397		} else if count == 3 {
398			fakeDiscovery.Resources[0].APIResources = append(
399				fakeDiscovery.Resources[0].APIResources,
400				metav1.APIResource{Name: "actions", SingularName: "action", Kind: "Action", Namespaced: true},
401			)
402		}
403		return true, nil, nil
404	})
405
406	s, _, _ := getTempClient()
407	s.discoveryBuilder = func(*rest.Config) (discovery.DiscoveryInterface, error) {
408		return fakeDiscovery, nil
409	}
410	// Should set a longer timeout to avoid early quitting retry loop due to lack of computational power.
411	s.retryTimeout = 2 * time.Second
412	s.retryInterval = 10 * time.Millisecond
413	s.criticalKinds = []string{"Handler", "Action"}
414	err := s.Init([]string{"Handler", "Action"})
415	if err != nil {
416		t.Errorf("Got %v, Want nil", err)
417	}
418	count := atomic.LoadInt32(&callCount)
419	if count != 3 {
420		t.Errorf("Got %d, Want 3", count)
421	}
422	s.Stop()
423}
424
425func TestCrdsRetryAsynchronously(t *testing.T) {
426	fakeDiscovery := &fake.FakeDiscovery{
427		Fake: &k8stesting.Fake{
428			Resources: []*metav1.APIResourceList{
429				{
430					GroupVersion: apiGroupVersion,
431					APIResources: []metav1.APIResource{
432						{Name: "handlers", SingularName: "handler", Kind: "Handler", Namespaced: true},
433					},
434				},
435			},
436		},
437	}
438	var count int32
439	// Gradually increase the number of API resources.
440	fakeDiscovery.AddReactor("get", "resource", func(k8stesting.Action) (bool, runtime.Object, error) {
441		if atomic.LoadInt32(&count) != 0 {
442			fakeDiscovery.Resources[0].APIResources = append(
443				fakeDiscovery.Resources[0].APIResources,
444				metav1.APIResource{Name: "actions", SingularName: "action", Kind: "Action", Namespaced: true},
445			)
446		}
447		return true, nil, nil
448	})
449	s, ns, lw := getTempClient()
450	s.bgRetryInterval = 1 * time.Millisecond
451	s.discoveryBuilder = func(*rest.Config) (discovery.DiscoveryInterface, error) {
452		return fakeDiscovery, nil
453	}
454	k1 := store.Key{Kind: "Handler", Namespace: ns, Name: "default"}
455	if err := lw.put(k1, map[string]interface{}{"adapter": "noop"}); err != nil {
456		t.Fatal(err)
457	}
458	if err := s.Init([]string{"Handler", "Action"}); err != nil {
459		t.Fatal(err)
460	}
461	defer s.Stop()
462	s.cacheMutex.Lock()
463	ncaches := len(s.caches)
464	s.cacheMutex.Unlock()
465	if ncaches != 1 {
466		t.Errorf("Has %d caches, Want 1 caches", ncaches)
467	}
468	wch, err := s.Watch()
469	if err != nil {
470		t.Fatal(err)
471	}
472	atomic.StoreInt32(&count, 1)
473
474	after := time.After(time.Second)
475	tick := time.Tick(time.Millisecond)
476loop:
477	for {
478		select {
479		case <-after:
480			break loop
481		case <-tick:
482			s.cacheMutex.Lock()
483			ncaches = len(s.caches)
484			s.cacheMutex.Unlock()
485			if ncaches > 1 {
486				break loop
487			}
488		}
489	}
490	if ncaches != 2 {
491		t.Fatalf("Has %d caches, Want 2 caches", ncaches)
492	}
493
494	k2 := store.Key{Kind: "Action", Namespace: ns, Name: "default"}
495	if err = lw.put(k2, map[string]interface{}{"test": "value"}); err != nil {
496		t.Error(err)
497	}
498	if err = waitFor(wch, store.Update, k2); err != nil {
499		t.Errorf("Got %v, Want nil", err)
500	}
501}
502
503func TestCrdsRetryAsynchronouslyStoreClose(t *testing.T) {
504	fakeDiscovery := &fake.FakeDiscovery{
505		Fake: &k8stesting.Fake{
506			Resources: []*metav1.APIResourceList{
507				{GroupVersion: apiGroupVersion},
508			},
509		},
510	}
511	callCount := 0
512	mutex := sync.RWMutex{}
513	fakeDiscovery.AddReactor("get", "resource", func(k8stesting.Action) (bool, runtime.Object, error) {
514		mutex.Lock()
515		callCount++
516		mutex.Unlock()
517		return true, nil, nil
518	})
519
520	s, _, _ := getTempClient()
521	s.discoveryBuilder = func(*rest.Config) (discovery.DiscoveryInterface, error) {
522		return fakeDiscovery, nil
523	}
524	s.bgRetryInterval = 10 * time.Millisecond
525	s.Init([]string{"Handler", "Action"})
526
527	// Close store, which should shut down the background retry.
528	// With 10ms retry interval and 30ms before shutdown, at most 5 discovery calls would be made.
529	time.Sleep(30 * time.Millisecond)
530	s.Stop()
531	time.Sleep(30 * time.Millisecond)
532	mutex.RLock()
533	if callCount > 5 {
534		t.Errorf("got %v, want no more than 5 calls", callCount)
535	}
536	mutex.RUnlock()
537}
538