1/*
2Copyright 2015 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package testing
18
19import (
20	"fmt"
21	"reflect"
22	"sort"
23	"sync"
24
25	jsonpatch "github.com/evanphx/json-patch"
26
27	"k8s.io/apimachinery/pkg/api/errors"
28	"k8s.io/apimachinery/pkg/api/meta"
29	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
30	"k8s.io/apimachinery/pkg/runtime"
31	"k8s.io/apimachinery/pkg/runtime/schema"
32	"k8s.io/apimachinery/pkg/types"
33	"k8s.io/apimachinery/pkg/util/json"
34	"k8s.io/apimachinery/pkg/util/strategicpatch"
35	"k8s.io/apimachinery/pkg/watch"
36	restclient "k8s.io/client-go/rest"
37)
38
39// ObjectTracker keeps track of objects. It is intended to be used to
40// fake calls to a server by returning objects based on their kind,
41// namespace and name.
42type ObjectTracker interface {
43	// Add adds an object to the tracker. If object being added
44	// is a list, its items are added separately.
45	Add(obj runtime.Object) error
46
47	// Get retrieves the object by its kind, namespace and name.
48	Get(gvr schema.GroupVersionResource, ns, name string) (runtime.Object, error)
49
50	// Create adds an object to the tracker in the specified namespace.
51	Create(gvr schema.GroupVersionResource, obj runtime.Object, ns string) error
52
53	// Update updates an existing object in the tracker in the specified namespace.
54	Update(gvr schema.GroupVersionResource, obj runtime.Object, ns string) error
55
56	// List retrieves all objects of a given kind in the given
57	// namespace. Only non-List kinds are accepted.
58	List(gvr schema.GroupVersionResource, gvk schema.GroupVersionKind, ns string) (runtime.Object, error)
59
60	// Delete deletes an existing object from the tracker. If object
61	// didn't exist in the tracker prior to deletion, Delete returns
62	// no error.
63	Delete(gvr schema.GroupVersionResource, ns, name string) error
64
65	// Watch watches objects from the tracker. Watch returns a channel
66	// which will push added / modified / deleted object.
67	Watch(gvr schema.GroupVersionResource, ns string) (watch.Interface, error)
68}
69
70// ObjectScheme abstracts the implementation of common operations on objects.
71type ObjectScheme interface {
72	runtime.ObjectCreater
73	runtime.ObjectTyper
74}
75
76// ObjectReaction returns a ReactionFunc that applies core.Action to
77// the given tracker.
78func ObjectReaction(tracker ObjectTracker) ReactionFunc {
79	return func(action Action) (bool, runtime.Object, error) {
80		ns := action.GetNamespace()
81		gvr := action.GetResource()
82		// Here and below we need to switch on implementation types,
83		// not on interfaces, as some interfaces are identical
84		// (e.g. UpdateAction and CreateAction), so if we use them,
85		// updates and creates end up matching the same case branch.
86		switch action := action.(type) {
87
88		case ListActionImpl:
89			obj, err := tracker.List(gvr, action.GetKind(), ns)
90			return true, obj, err
91
92		case GetActionImpl:
93			obj, err := tracker.Get(gvr, ns, action.GetName())
94			return true, obj, err
95
96		case CreateActionImpl:
97			objMeta, err := meta.Accessor(action.GetObject())
98			if err != nil {
99				return true, nil, err
100			}
101			if action.GetSubresource() == "" {
102				err = tracker.Create(gvr, action.GetObject(), ns)
103			} else {
104				// TODO: Currently we're handling subresource creation as an update
105				// on the enclosing resource. This works for some subresources but
106				// might not be generic enough.
107				err = tracker.Update(gvr, action.GetObject(), ns)
108			}
109			if err != nil {
110				return true, nil, err
111			}
112			obj, err := tracker.Get(gvr, ns, objMeta.GetName())
113			return true, obj, err
114
115		case UpdateActionImpl:
116			objMeta, err := meta.Accessor(action.GetObject())
117			if err != nil {
118				return true, nil, err
119			}
120			err = tracker.Update(gvr, action.GetObject(), ns)
121			if err != nil {
122				return true, nil, err
123			}
124			obj, err := tracker.Get(gvr, ns, objMeta.GetName())
125			return true, obj, err
126
127		case DeleteActionImpl:
128			err := tracker.Delete(gvr, ns, action.GetName())
129			if err != nil {
130				return true, nil, err
131			}
132			return true, nil, nil
133
134		case PatchActionImpl:
135			obj, err := tracker.Get(gvr, ns, action.GetName())
136			if err != nil {
137				return true, nil, err
138			}
139
140			old, err := json.Marshal(obj)
141			if err != nil {
142				return true, nil, err
143			}
144
145			// reset the object in preparation to unmarshal, since unmarshal does not guarantee that fields
146			// in obj that are removed by patch are cleared
147			value := reflect.ValueOf(obj)
148			value.Elem().Set(reflect.New(value.Type().Elem()).Elem())
149
150			switch action.GetPatchType() {
151			case types.JSONPatchType:
152				patch, err := jsonpatch.DecodePatch(action.GetPatch())
153				if err != nil {
154					return true, nil, err
155				}
156				modified, err := patch.Apply(old)
157				if err != nil {
158					return true, nil, err
159				}
160
161				if err = json.Unmarshal(modified, obj); err != nil {
162					return true, nil, err
163				}
164			case types.MergePatchType:
165				modified, err := jsonpatch.MergePatch(old, action.GetPatch())
166				if err != nil {
167					return true, nil, err
168				}
169
170				if err := json.Unmarshal(modified, obj); err != nil {
171					return true, nil, err
172				}
173			case types.StrategicMergePatchType:
174				mergedByte, err := strategicpatch.StrategicMergePatch(old, action.GetPatch(), obj)
175				if err != nil {
176					return true, nil, err
177				}
178				if err = json.Unmarshal(mergedByte, obj); err != nil {
179					return true, nil, err
180				}
181			default:
182				return true, nil, fmt.Errorf("PatchType is not supported")
183			}
184
185			if err = tracker.Update(gvr, obj, ns); err != nil {
186				return true, nil, err
187			}
188
189			return true, obj, nil
190
191		default:
192			return false, nil, fmt.Errorf("no reaction implemented for %s", action)
193		}
194	}
195}
196
197type tracker struct {
198	scheme  ObjectScheme
199	decoder runtime.Decoder
200	lock    sync.RWMutex
201	objects map[schema.GroupVersionResource]map[types.NamespacedName]runtime.Object
202	// The value type of watchers is a map of which the key is either a namespace or
203	// all/non namespace aka "" and its value is list of fake watchers.
204	// Manipulations on resources will broadcast the notification events into the
205	// watchers' channel. Note that too many unhandled events (currently 100,
206	// see apimachinery/pkg/watch.DefaultChanSize) will cause a panic.
207	watchers map[schema.GroupVersionResource]map[string][]*watch.RaceFreeFakeWatcher
208}
209
210var _ ObjectTracker = &tracker{}
211
212// NewObjectTracker returns an ObjectTracker that can be used to keep track
213// of objects for the fake clientset. Mostly useful for unit tests.
214func NewObjectTracker(scheme ObjectScheme, decoder runtime.Decoder) ObjectTracker {
215	return &tracker{
216		scheme:   scheme,
217		decoder:  decoder,
218		objects:  make(map[schema.GroupVersionResource]map[types.NamespacedName]runtime.Object),
219		watchers: make(map[schema.GroupVersionResource]map[string][]*watch.RaceFreeFakeWatcher),
220	}
221}
222
223func (t *tracker) List(gvr schema.GroupVersionResource, gvk schema.GroupVersionKind, ns string) (runtime.Object, error) {
224	// Heuristic for list kind: original kind + List suffix. Might
225	// not always be true but this tracker has a pretty limited
226	// understanding of the actual API model.
227	listGVK := gvk
228	listGVK.Kind = listGVK.Kind + "List"
229	// GVK does have the concept of "internal version". The scheme recognizes
230	// the runtime.APIVersionInternal, but not the empty string.
231	if listGVK.Version == "" {
232		listGVK.Version = runtime.APIVersionInternal
233	}
234
235	list, err := t.scheme.New(listGVK)
236	if err != nil {
237		return nil, err
238	}
239
240	if !meta.IsListType(list) {
241		return nil, fmt.Errorf("%q is not a list type", listGVK.Kind)
242	}
243
244	t.lock.RLock()
245	defer t.lock.RUnlock()
246
247	objs, ok := t.objects[gvr]
248	if !ok {
249		return list, nil
250	}
251
252	matchingObjs, err := filterByNamespace(objs, ns)
253	if err != nil {
254		return nil, err
255	}
256	if err := meta.SetList(list, matchingObjs); err != nil {
257		return nil, err
258	}
259	return list.DeepCopyObject(), nil
260}
261
262func (t *tracker) Watch(gvr schema.GroupVersionResource, ns string) (watch.Interface, error) {
263	t.lock.Lock()
264	defer t.lock.Unlock()
265
266	fakewatcher := watch.NewRaceFreeFake()
267
268	if _, exists := t.watchers[gvr]; !exists {
269		t.watchers[gvr] = make(map[string][]*watch.RaceFreeFakeWatcher)
270	}
271	t.watchers[gvr][ns] = append(t.watchers[gvr][ns], fakewatcher)
272	return fakewatcher, nil
273}
274
275func (t *tracker) Get(gvr schema.GroupVersionResource, ns, name string) (runtime.Object, error) {
276	errNotFound := errors.NewNotFound(gvr.GroupResource(), name)
277
278	t.lock.RLock()
279	defer t.lock.RUnlock()
280
281	objs, ok := t.objects[gvr]
282	if !ok {
283		return nil, errNotFound
284	}
285
286	matchingObj, ok := objs[types.NamespacedName{Namespace: ns, Name: name}]
287	if !ok {
288		return nil, errNotFound
289	}
290
291	// Only one object should match in the tracker if it works
292	// correctly, as Add/Update methods enforce kind/namespace/name
293	// uniqueness.
294	obj := matchingObj.DeepCopyObject()
295	if status, ok := obj.(*metav1.Status); ok {
296		if status.Status != metav1.StatusSuccess {
297			return nil, &errors.StatusError{ErrStatus: *status}
298		}
299	}
300
301	return obj, nil
302}
303
304func (t *tracker) Add(obj runtime.Object) error {
305	if meta.IsListType(obj) {
306		return t.addList(obj, false)
307	}
308	objMeta, err := meta.Accessor(obj)
309	if err != nil {
310		return err
311	}
312	gvks, _, err := t.scheme.ObjectKinds(obj)
313	if err != nil {
314		return err
315	}
316
317	if partial, ok := obj.(*metav1.PartialObjectMetadata); ok && len(partial.TypeMeta.APIVersion) > 0 {
318		gvks = []schema.GroupVersionKind{partial.TypeMeta.GroupVersionKind()}
319	}
320
321	if len(gvks) == 0 {
322		return fmt.Errorf("no registered kinds for %v", obj)
323	}
324	for _, gvk := range gvks {
325		// NOTE: UnsafeGuessKindToResource is a heuristic and default match. The
326		// actual registration in apiserver can specify arbitrary route for a
327		// gvk. If a test uses such objects, it cannot preset the tracker with
328		// objects via Add(). Instead, it should trigger the Create() function
329		// of the tracker, where an arbitrary gvr can be specified.
330		gvr, _ := meta.UnsafeGuessKindToResource(gvk)
331		// Resource doesn't have the concept of "__internal" version, just set it to "".
332		if gvr.Version == runtime.APIVersionInternal {
333			gvr.Version = ""
334		}
335
336		err := t.add(gvr, obj, objMeta.GetNamespace(), false)
337		if err != nil {
338			return err
339		}
340	}
341	return nil
342}
343
344func (t *tracker) Create(gvr schema.GroupVersionResource, obj runtime.Object, ns string) error {
345	return t.add(gvr, obj, ns, false)
346}
347
348func (t *tracker) Update(gvr schema.GroupVersionResource, obj runtime.Object, ns string) error {
349	return t.add(gvr, obj, ns, true)
350}
351
352func (t *tracker) getWatches(gvr schema.GroupVersionResource, ns string) []*watch.RaceFreeFakeWatcher {
353	watches := []*watch.RaceFreeFakeWatcher{}
354	if t.watchers[gvr] != nil {
355		if w := t.watchers[gvr][ns]; w != nil {
356			watches = append(watches, w...)
357		}
358		if ns != metav1.NamespaceAll {
359			if w := t.watchers[gvr][metav1.NamespaceAll]; w != nil {
360				watches = append(watches, w...)
361			}
362		}
363	}
364	return watches
365}
366
367func (t *tracker) add(gvr schema.GroupVersionResource, obj runtime.Object, ns string, replaceExisting bool) error {
368	t.lock.Lock()
369	defer t.lock.Unlock()
370
371	gr := gvr.GroupResource()
372
373	// To avoid the object from being accidentally modified by caller
374	// after it's been added to the tracker, we always store the deep
375	// copy.
376	obj = obj.DeepCopyObject()
377
378	newMeta, err := meta.Accessor(obj)
379	if err != nil {
380		return err
381	}
382
383	// Propagate namespace to the new object if hasn't already been set.
384	if len(newMeta.GetNamespace()) == 0 {
385		newMeta.SetNamespace(ns)
386	}
387
388	if ns != newMeta.GetNamespace() {
389		msg := fmt.Sprintf("request namespace does not match object namespace, request: %q object: %q", ns, newMeta.GetNamespace())
390		return errors.NewBadRequest(msg)
391	}
392
393	_, ok := t.objects[gvr]
394	if !ok {
395		t.objects[gvr] = make(map[types.NamespacedName]runtime.Object)
396	}
397
398	namespacedName := types.NamespacedName{Namespace: newMeta.GetNamespace(), Name: newMeta.GetName()}
399	if _, ok = t.objects[gvr][namespacedName]; ok {
400		if replaceExisting {
401			for _, w := range t.getWatches(gvr, ns) {
402				w.Modify(obj)
403			}
404			t.objects[gvr][namespacedName] = obj
405			return nil
406		}
407		return errors.NewAlreadyExists(gr, newMeta.GetName())
408	}
409
410	if replaceExisting {
411		// Tried to update but no matching object was found.
412		return errors.NewNotFound(gr, newMeta.GetName())
413	}
414
415	t.objects[gvr][namespacedName] = obj
416
417	for _, w := range t.getWatches(gvr, ns) {
418		w.Add(obj)
419	}
420
421	return nil
422}
423
424func (t *tracker) addList(obj runtime.Object, replaceExisting bool) error {
425	list, err := meta.ExtractList(obj)
426	if err != nil {
427		return err
428	}
429	errs := runtime.DecodeList(list, t.decoder)
430	if len(errs) > 0 {
431		return errs[0]
432	}
433	for _, obj := range list {
434		if err := t.Add(obj); err != nil {
435			return err
436		}
437	}
438	return nil
439}
440
441func (t *tracker) Delete(gvr schema.GroupVersionResource, ns, name string) error {
442	t.lock.Lock()
443	defer t.lock.Unlock()
444
445	objs, ok := t.objects[gvr]
446	if !ok {
447		return errors.NewNotFound(gvr.GroupResource(), name)
448	}
449
450	namespacedName := types.NamespacedName{Namespace: ns, Name: name}
451	obj, ok := objs[namespacedName]
452	if !ok {
453		return errors.NewNotFound(gvr.GroupResource(), name)
454	}
455
456	delete(objs, namespacedName)
457	for _, w := range t.getWatches(gvr, ns) {
458		w.Delete(obj)
459	}
460	return nil
461}
462
463// filterByNamespace returns all objects in the collection that
464// match provided namespace. Empty namespace matches
465// non-namespaced objects.
466func filterByNamespace(objs map[types.NamespacedName]runtime.Object, ns string) ([]runtime.Object, error) {
467	var res []runtime.Object
468
469	for _, obj := range objs {
470		acc, err := meta.Accessor(obj)
471		if err != nil {
472			return nil, err
473		}
474		if ns != "" && acc.GetNamespace() != ns {
475			continue
476		}
477		res = append(res, obj)
478	}
479
480	// Sort res to get deterministic order.
481	sort.Slice(res, func(i, j int) bool {
482		acc1, _ := meta.Accessor(res[i])
483		acc2, _ := meta.Accessor(res[j])
484		if acc1.GetNamespace() != acc2.GetNamespace() {
485			return acc1.GetNamespace() < acc2.GetNamespace()
486		}
487		return acc1.GetName() < acc2.GetName()
488	})
489	return res, nil
490}
491
492func DefaultWatchReactor(watchInterface watch.Interface, err error) WatchReactionFunc {
493	return func(action Action) (bool, watch.Interface, error) {
494		return true, watchInterface, err
495	}
496}
497
498// SimpleReactor is a Reactor.  Each reaction function is attached to a given verb,resource tuple.  "*" in either field matches everything for that value.
499// For instance, *,pods matches all verbs on pods.  This allows for easier composition of reaction functions
500type SimpleReactor struct {
501	Verb     string
502	Resource string
503
504	Reaction ReactionFunc
505}
506
507func (r *SimpleReactor) Handles(action Action) bool {
508	verbCovers := r.Verb == "*" || r.Verb == action.GetVerb()
509	if !verbCovers {
510		return false
511	}
512	resourceCovers := r.Resource == "*" || r.Resource == action.GetResource().Resource
513	if !resourceCovers {
514		return false
515	}
516
517	return true
518}
519
520func (r *SimpleReactor) React(action Action) (bool, runtime.Object, error) {
521	return r.Reaction(action)
522}
523
524// SimpleWatchReactor is a WatchReactor.  Each reaction function is attached to a given resource.  "*" matches everything for that value.
525// For instance, *,pods matches all verbs on pods.  This allows for easier composition of reaction functions
526type SimpleWatchReactor struct {
527	Resource string
528
529	Reaction WatchReactionFunc
530}
531
532func (r *SimpleWatchReactor) Handles(action Action) bool {
533	resourceCovers := r.Resource == "*" || r.Resource == action.GetResource().Resource
534	if !resourceCovers {
535		return false
536	}
537
538	return true
539}
540
541func (r *SimpleWatchReactor) React(action Action) (bool, watch.Interface, error) {
542	return r.Reaction(action)
543}
544
545// SimpleProxyReactor is a ProxyReactor.  Each reaction function is attached to a given resource.  "*" matches everything for that value.
546// For instance, *,pods matches all verbs on pods.  This allows for easier composition of reaction functions.
547type SimpleProxyReactor struct {
548	Resource string
549
550	Reaction ProxyReactionFunc
551}
552
553func (r *SimpleProxyReactor) Handles(action Action) bool {
554	resourceCovers := r.Resource == "*" || r.Resource == action.GetResource().Resource
555	if !resourceCovers {
556		return false
557	}
558
559	return true
560}
561
562func (r *SimpleProxyReactor) React(action Action) (bool, restclient.ResponseWrapper, error) {
563	return r.Reaction(action)
564}
565