1/*
2Copyright 2015 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package testing
18
19import (
20	"fmt"
21	"sync"
22
23	jsonpatch "github.com/evanphx/json-patch"
24	"k8s.io/apimachinery/pkg/api/errors"
25	"k8s.io/apimachinery/pkg/api/meta"
26	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27	"k8s.io/apimachinery/pkg/runtime"
28	"k8s.io/apimachinery/pkg/runtime/schema"
29	"k8s.io/apimachinery/pkg/types"
30	"k8s.io/apimachinery/pkg/util/json"
31	"k8s.io/apimachinery/pkg/util/strategicpatch"
32	"k8s.io/apimachinery/pkg/watch"
33	restclient "k8s.io/client-go/rest"
34)
35
36// ObjectTracker keeps track of objects. It is intended to be used to
37// fake calls to a server by returning objects based on their kind,
38// namespace and name.
39type ObjectTracker interface {
40	// Add adds an object to the tracker. If object being added
41	// is a list, its items are added separately.
42	Add(obj runtime.Object) error
43
44	// Get retrieves the object by its kind, namespace and name.
45	Get(gvr schema.GroupVersionResource, ns, name string) (runtime.Object, error)
46
47	// Create adds an object to the tracker in the specified namespace.
48	Create(gvr schema.GroupVersionResource, obj runtime.Object, ns string) error
49
50	// Update updates an existing object in the tracker in the specified namespace.
51	Update(gvr schema.GroupVersionResource, obj runtime.Object, ns string) error
52
53	// List retrieves all objects of a given kind in the given
54	// namespace. Only non-List kinds are accepted.
55	List(gvr schema.GroupVersionResource, gvk schema.GroupVersionKind, ns string) (runtime.Object, error)
56
57	// Delete deletes an existing object from the tracker. If object
58	// didn't exist in the tracker prior to deletion, Delete returns
59	// no error.
60	Delete(gvr schema.GroupVersionResource, ns, name string) error
61
62	// Watch watches objects from the tracker. Watch returns a channel
63	// which will push added / modified / deleted object.
64	Watch(gvr schema.GroupVersionResource, ns string) (watch.Interface, error)
65}
66
67// ObjectScheme abstracts the implementation of common operations on objects.
68type ObjectScheme interface {
69	runtime.ObjectCreater
70	runtime.ObjectTyper
71}
72
73// ObjectReaction returns a ReactionFunc that applies core.Action to
74// the given tracker.
75func ObjectReaction(tracker ObjectTracker) ReactionFunc {
76	return func(action Action) (bool, runtime.Object, error) {
77		ns := action.GetNamespace()
78		gvr := action.GetResource()
79		// Here and below we need to switch on implementation types,
80		// not on interfaces, as some interfaces are identical
81		// (e.g. UpdateAction and CreateAction), so if we use them,
82		// updates and creates end up matching the same case branch.
83		switch action := action.(type) {
84
85		case ListActionImpl:
86			obj, err := tracker.List(gvr, action.GetKind(), ns)
87			return true, obj, err
88
89		case GetActionImpl:
90			obj, err := tracker.Get(gvr, ns, action.GetName())
91			return true, obj, err
92
93		case CreateActionImpl:
94			objMeta, err := meta.Accessor(action.GetObject())
95			if err != nil {
96				return true, nil, err
97			}
98			if action.GetSubresource() == "" {
99				err = tracker.Create(gvr, action.GetObject(), ns)
100			} else {
101				// TODO: Currently we're handling subresource creation as an update
102				// on the enclosing resource. This works for some subresources but
103				// might not be generic enough.
104				err = tracker.Update(gvr, action.GetObject(), ns)
105			}
106			if err != nil {
107				return true, nil, err
108			}
109			obj, err := tracker.Get(gvr, ns, objMeta.GetName())
110			return true, obj, err
111
112		case UpdateActionImpl:
113			objMeta, err := meta.Accessor(action.GetObject())
114			if err != nil {
115				return true, nil, err
116			}
117			err = tracker.Update(gvr, action.GetObject(), ns)
118			if err != nil {
119				return true, nil, err
120			}
121			obj, err := tracker.Get(gvr, ns, objMeta.GetName())
122			return true, obj, err
123
124		case DeleteActionImpl:
125			err := tracker.Delete(gvr, ns, action.GetName())
126			if err != nil {
127				return true, nil, err
128			}
129			return true, nil, nil
130
131		case PatchActionImpl:
132			obj, err := tracker.Get(gvr, ns, action.GetName())
133			if err != nil {
134				return true, nil, err
135			}
136
137			old, err := json.Marshal(obj)
138			if err != nil {
139				return true, nil, err
140			}
141
142			switch action.GetPatchType() {
143			case types.JSONPatchType:
144				patch, err := jsonpatch.DecodePatch(action.GetPatch())
145				if err != nil {
146					return true, nil, err
147				}
148				modified, err := patch.Apply(old)
149				if err != nil {
150					return true, nil, err
151				}
152				if err = json.Unmarshal(modified, obj); err != nil {
153					return true, nil, err
154				}
155			case types.MergePatchType:
156				modified, err := jsonpatch.MergePatch(old, action.GetPatch())
157				if err != nil {
158					return true, nil, err
159				}
160
161				if err := json.Unmarshal(modified, obj); err != nil {
162					return true, nil, err
163				}
164			case types.StrategicMergePatchType:
165				mergedByte, err := strategicpatch.StrategicMergePatch(old, action.GetPatch(), obj)
166				if err != nil {
167					return true, nil, err
168				}
169				if err = json.Unmarshal(mergedByte, obj); err != nil {
170					return true, nil, err
171				}
172			default:
173				return true, nil, fmt.Errorf("PatchType is not supported")
174			}
175
176			if err = tracker.Update(gvr, obj, ns); err != nil {
177				return true, nil, err
178			}
179
180			return true, obj, nil
181
182		default:
183			return false, nil, fmt.Errorf("no reaction implemented for %s", action)
184		}
185	}
186}
187
188type tracker struct {
189	scheme  ObjectScheme
190	decoder runtime.Decoder
191	lock    sync.RWMutex
192	objects map[schema.GroupVersionResource][]runtime.Object
193	// The value type of watchers is a map of which the key is either a namespace or
194	// all/non namespace aka "" and its value is list of fake watchers.
195	// Manipulations on resources will broadcast the notification events into the
196	// watchers' channel. Note that too many unhandled events (currently 100,
197	// see apimachinery/pkg/watch.DefaultChanSize) will cause a panic.
198	watchers map[schema.GroupVersionResource]map[string][]*watch.RaceFreeFakeWatcher
199}
200
201var _ ObjectTracker = &tracker{}
202
203// NewObjectTracker returns an ObjectTracker that can be used to keep track
204// of objects for the fake clientset. Mostly useful for unit tests.
205func NewObjectTracker(scheme ObjectScheme, decoder runtime.Decoder) ObjectTracker {
206	return &tracker{
207		scheme:   scheme,
208		decoder:  decoder,
209		objects:  make(map[schema.GroupVersionResource][]runtime.Object),
210		watchers: make(map[schema.GroupVersionResource]map[string][]*watch.RaceFreeFakeWatcher),
211	}
212}
213
214func (t *tracker) List(gvr schema.GroupVersionResource, gvk schema.GroupVersionKind, ns string) (runtime.Object, error) {
215	// Heuristic for list kind: original kind + List suffix. Might
216	// not always be true but this tracker has a pretty limited
217	// understanding of the actual API model.
218	listGVK := gvk
219	listGVK.Kind = listGVK.Kind + "List"
220	// GVK does have the concept of "internal version". The scheme recognizes
221	// the runtime.APIVersionInternal, but not the empty string.
222	if listGVK.Version == "" {
223		listGVK.Version = runtime.APIVersionInternal
224	}
225
226	list, err := t.scheme.New(listGVK)
227	if err != nil {
228		return nil, err
229	}
230
231	if !meta.IsListType(list) {
232		return nil, fmt.Errorf("%q is not a list type", listGVK.Kind)
233	}
234
235	t.lock.RLock()
236	defer t.lock.RUnlock()
237
238	objs, ok := t.objects[gvr]
239	if !ok {
240		return list, nil
241	}
242
243	matchingObjs, err := filterByNamespaceAndName(objs, ns, "")
244	if err != nil {
245		return nil, err
246	}
247	if err := meta.SetList(list, matchingObjs); err != nil {
248		return nil, err
249	}
250	return list.DeepCopyObject(), nil
251}
252
253func (t *tracker) Watch(gvr schema.GroupVersionResource, ns string) (watch.Interface, error) {
254	t.lock.Lock()
255	defer t.lock.Unlock()
256
257	fakewatcher := watch.NewRaceFreeFake()
258
259	if _, exists := t.watchers[gvr]; !exists {
260		t.watchers[gvr] = make(map[string][]*watch.RaceFreeFakeWatcher)
261	}
262	t.watchers[gvr][ns] = append(t.watchers[gvr][ns], fakewatcher)
263	return fakewatcher, nil
264}
265
266func (t *tracker) Get(gvr schema.GroupVersionResource, ns, name string) (runtime.Object, error) {
267	errNotFound := errors.NewNotFound(gvr.GroupResource(), name)
268
269	t.lock.RLock()
270	defer t.lock.RUnlock()
271
272	objs, ok := t.objects[gvr]
273	if !ok {
274		return nil, errNotFound
275	}
276
277	matchingObjs, err := filterByNamespaceAndName(objs, ns, name)
278	if err != nil {
279		return nil, err
280	}
281	if len(matchingObjs) == 0 {
282		return nil, errNotFound
283	}
284	if len(matchingObjs) > 1 {
285		return nil, fmt.Errorf("more than one object matched gvr %s, ns: %q name: %q", gvr, ns, name)
286	}
287
288	// Only one object should match in the tracker if it works
289	// correctly, as Add/Update methods enforce kind/namespace/name
290	// uniqueness.
291	obj := matchingObjs[0].DeepCopyObject()
292	if status, ok := obj.(*metav1.Status); ok {
293		if status.Status != metav1.StatusSuccess {
294			return nil, &errors.StatusError{ErrStatus: *status}
295		}
296	}
297
298	return obj, nil
299}
300
301func (t *tracker) Add(obj runtime.Object) error {
302	if meta.IsListType(obj) {
303		return t.addList(obj, false)
304	}
305	objMeta, err := meta.Accessor(obj)
306	if err != nil {
307		return err
308	}
309	gvks, _, err := t.scheme.ObjectKinds(obj)
310	if err != nil {
311		return err
312	}
313	if len(gvks) == 0 {
314		return fmt.Errorf("no registered kinds for %v", obj)
315	}
316	for _, gvk := range gvks {
317		// NOTE: UnsafeGuessKindToResource is a heuristic and default match. The
318		// actual registration in apiserver can specify arbitrary route for a
319		// gvk. If a test uses such objects, it cannot preset the tracker with
320		// objects via Add(). Instead, it should trigger the Create() function
321		// of the tracker, where an arbitrary gvr can be specified.
322		gvr, _ := meta.UnsafeGuessKindToResource(gvk)
323		// Resource doesn't have the concept of "__internal" version, just set it to "".
324		if gvr.Version == runtime.APIVersionInternal {
325			gvr.Version = ""
326		}
327
328		err := t.add(gvr, obj, objMeta.GetNamespace(), false)
329		if err != nil {
330			return err
331		}
332	}
333	return nil
334}
335
336func (t *tracker) Create(gvr schema.GroupVersionResource, obj runtime.Object, ns string) error {
337	return t.add(gvr, obj, ns, false)
338}
339
340func (t *tracker) Update(gvr schema.GroupVersionResource, obj runtime.Object, ns string) error {
341	return t.add(gvr, obj, ns, true)
342}
343
344func (t *tracker) getWatches(gvr schema.GroupVersionResource, ns string) []*watch.RaceFreeFakeWatcher {
345	watches := []*watch.RaceFreeFakeWatcher{}
346	if t.watchers[gvr] != nil {
347		if w := t.watchers[gvr][ns]; w != nil {
348			watches = append(watches, w...)
349		}
350		if ns != metav1.NamespaceAll {
351			if w := t.watchers[gvr][metav1.NamespaceAll]; w != nil {
352				watches = append(watches, w...)
353			}
354		}
355	}
356	return watches
357}
358
359func (t *tracker) add(gvr schema.GroupVersionResource, obj runtime.Object, ns string, replaceExisting bool) error {
360	t.lock.Lock()
361	defer t.lock.Unlock()
362
363	gr := gvr.GroupResource()
364
365	// To avoid the object from being accidentally modified by caller
366	// after it's been added to the tracker, we always store the deep
367	// copy.
368	obj = obj.DeepCopyObject()
369
370	newMeta, err := meta.Accessor(obj)
371	if err != nil {
372		return err
373	}
374
375	// Propagate namespace to the new object if hasn't already been set.
376	if len(newMeta.GetNamespace()) == 0 {
377		newMeta.SetNamespace(ns)
378	}
379
380	if ns != newMeta.GetNamespace() {
381		msg := fmt.Sprintf("request namespace does not match object namespace, request: %q object: %q", ns, newMeta.GetNamespace())
382		return errors.NewBadRequest(msg)
383	}
384
385	for i, existingObj := range t.objects[gvr] {
386		oldMeta, err := meta.Accessor(existingObj)
387		if err != nil {
388			return err
389		}
390		if oldMeta.GetNamespace() == newMeta.GetNamespace() && oldMeta.GetName() == newMeta.GetName() {
391			if replaceExisting {
392				for _, w := range t.getWatches(gvr, ns) {
393					w.Modify(obj)
394				}
395				t.objects[gvr][i] = obj
396				return nil
397			}
398			return errors.NewAlreadyExists(gr, newMeta.GetName())
399		}
400	}
401
402	if replaceExisting {
403		// Tried to update but no matching object was found.
404		return errors.NewNotFound(gr, newMeta.GetName())
405	}
406
407	t.objects[gvr] = append(t.objects[gvr], obj)
408
409	for _, w := range t.getWatches(gvr, ns) {
410		w.Add(obj)
411	}
412
413	return nil
414}
415
416func (t *tracker) addList(obj runtime.Object, replaceExisting bool) error {
417	list, err := meta.ExtractList(obj)
418	if err != nil {
419		return err
420	}
421	errs := runtime.DecodeList(list, t.decoder)
422	if len(errs) > 0 {
423		return errs[0]
424	}
425	for _, obj := range list {
426		if err := t.Add(obj); err != nil {
427			return err
428		}
429	}
430	return nil
431}
432
433func (t *tracker) Delete(gvr schema.GroupVersionResource, ns, name string) error {
434	t.lock.Lock()
435	defer t.lock.Unlock()
436
437	found := false
438
439	for i, existingObj := range t.objects[gvr] {
440		objMeta, err := meta.Accessor(existingObj)
441		if err != nil {
442			return err
443		}
444		if objMeta.GetNamespace() == ns && objMeta.GetName() == name {
445			obj := t.objects[gvr][i]
446			t.objects[gvr] = append(t.objects[gvr][:i], t.objects[gvr][i+1:]...)
447			for _, w := range t.getWatches(gvr, ns) {
448				w.Delete(obj)
449			}
450			found = true
451			break
452		}
453	}
454
455	if found {
456		return nil
457	}
458
459	return errors.NewNotFound(gvr.GroupResource(), name)
460}
461
462// filterByNamespaceAndName returns all objects in the collection that
463// match provided namespace and name. Empty namespace matches
464// non-namespaced objects.
465func filterByNamespaceAndName(objs []runtime.Object, ns, name string) ([]runtime.Object, error) {
466	var res []runtime.Object
467
468	for _, obj := range objs {
469		acc, err := meta.Accessor(obj)
470		if err != nil {
471			return nil, err
472		}
473		if ns != "" && acc.GetNamespace() != ns {
474			continue
475		}
476		if name != "" && acc.GetName() != name {
477			continue
478		}
479		res = append(res, obj)
480	}
481
482	return res, nil
483}
484
485func DefaultWatchReactor(watchInterface watch.Interface, err error) WatchReactionFunc {
486	return func(action Action) (bool, watch.Interface, error) {
487		return true, watchInterface, err
488	}
489}
490
491// SimpleReactor is a Reactor.  Each reaction function is attached to a given verb,resource tuple.  "*" in either field matches everything for that value.
492// For instance, *,pods matches all verbs on pods.  This allows for easier composition of reaction functions
493type SimpleReactor struct {
494	Verb     string
495	Resource string
496
497	Reaction ReactionFunc
498}
499
500func (r *SimpleReactor) Handles(action Action) bool {
501	verbCovers := r.Verb == "*" || r.Verb == action.GetVerb()
502	if !verbCovers {
503		return false
504	}
505	resourceCovers := r.Resource == "*" || r.Resource == action.GetResource().Resource
506	if !resourceCovers {
507		return false
508	}
509
510	return true
511}
512
513func (r *SimpleReactor) React(action Action) (bool, runtime.Object, error) {
514	return r.Reaction(action)
515}
516
517// SimpleWatchReactor is a WatchReactor.  Each reaction function is attached to a given resource.  "*" matches everything for that value.
518// For instance, *,pods matches all verbs on pods.  This allows for easier composition of reaction functions
519type SimpleWatchReactor struct {
520	Resource string
521
522	Reaction WatchReactionFunc
523}
524
525func (r *SimpleWatchReactor) Handles(action Action) bool {
526	resourceCovers := r.Resource == "*" || r.Resource == action.GetResource().Resource
527	if !resourceCovers {
528		return false
529	}
530
531	return true
532}
533
534func (r *SimpleWatchReactor) React(action Action) (bool, watch.Interface, error) {
535	return r.Reaction(action)
536}
537
538// SimpleProxyReactor is a ProxyReactor.  Each reaction function is attached to a given resource.  "*" matches everything for that value.
539// For instance, *,pods matches all verbs on pods.  This allows for easier composition of reaction functions.
540type SimpleProxyReactor struct {
541	Resource string
542
543	Reaction ProxyReactionFunc
544}
545
546func (r *SimpleProxyReactor) Handles(action Action) bool {
547	resourceCovers := r.Resource == "*" || r.Resource == action.GetResource().Resource
548	if !resourceCovers {
549		return false
550	}
551
552	return true
553}
554
555func (r *SimpleProxyReactor) React(action Action) (bool, restclient.ResponseWrapper, error) {
556	return r.Reaction(action)
557}
558