1/*
2Copyright 2019 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package testing
18
19import (
20	"errors"
21	"fmt"
22	"reflect"
23	"strconv"
24	"sync"
25
26	v1 "k8s.io/api/core/v1"
27	apierrors "k8s.io/apimachinery/pkg/api/errors"
28	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29	"k8s.io/apimachinery/pkg/runtime"
30	"k8s.io/apimachinery/pkg/runtime/schema"
31	"k8s.io/apimachinery/pkg/util/diff"
32	"k8s.io/apimachinery/pkg/watch"
33	"k8s.io/client-go/kubernetes/fake"
34	core "k8s.io/client-go/testing"
35	"k8s.io/klog/v2"
36)
37
38// ErrVersionConflict is the error returned when resource version of requested
39// object conflicts with the object in storage.
40var ErrVersionConflict = errors.New("VersionError")
41
42// VolumeReactor is a core.Reactor that simulates etcd and API server. It
43// stores:
44// - Latest version of claims volumes saved by the controller.
45// - Queue of all saves (to simulate "volume/claim updated" events). This queue
46//   contains all intermediate state of an object - e.g. a claim.VolumeName
47//   is updated first and claim.Phase second. This queue will then contain both
48//   updates as separate entries.
49// - Number of changes since the last call to VolumeReactor.syncAll().
50// - Optionally, volume and claim fake watchers which should be the same ones
51//   used by the controller. Any time an event function like deleteVolumeEvent
52//   is called to simulate an event, the reactor's stores are updated and the
53//   controller is sent the event via the fake watcher.
54// - Optionally, list of error that should be returned by reactor, simulating
55//   etcd / API server failures. These errors are evaluated in order and every
56//   error is returned only once. I.e. when the reactor finds matching
57//   ReactorError, it return appropriate error and removes the ReactorError from
58//   the list.
59type VolumeReactor struct {
60	volumes              map[string]*v1.PersistentVolume
61	claims               map[string]*v1.PersistentVolumeClaim
62	changedObjects       []interface{}
63	changedSinceLastSync int
64	fakeVolumeWatch      *watch.FakeWatcher
65	fakeClaimWatch       *watch.FakeWatcher
66	lock                 sync.RWMutex
67	errors               []ReactorError
68	watchers             map[schema.GroupVersionResource]map[string][]*watch.RaceFreeFakeWatcher
69}
70
71// ReactorError is an error that is returned by test reactor (=simulated
72// etcd+/API server) when an action performed by the reactor matches given verb
73// ("get", "update", "create", "delete" or "*"") on given resource
74// ("persistentvolumes", "persistentvolumeclaims" or "*").
75type ReactorError struct {
76	Verb     string
77	Resource string
78	Error    error
79}
80
81// React is a callback called by fake kubeClient from the controller.
82// In other words, every claim/volume change performed by the controller ends
83// here.
84// This callback checks versions of the updated objects and refuse those that
85// are too old (simulating real etcd).
86// All updated objects are stored locally to keep track of object versions and
87// to evaluate test results.
88// All updated objects are also inserted into changedObjects queue and
89// optionally sent back to the controller via its watchers.
90func (r *VolumeReactor) React(action core.Action) (handled bool, ret runtime.Object, err error) {
91	r.lock.Lock()
92	defer r.lock.Unlock()
93
94	klog.V(4).Infof("reactor got operation %q on %q", action.GetVerb(), action.GetResource())
95
96	// Inject error when requested
97	err = r.injectReactError(action)
98	if err != nil {
99		return true, nil, err
100	}
101
102	// Test did not request to inject an error, continue simulating API server.
103	switch {
104	case action.Matches("create", "persistentvolumes"):
105		obj := action.(core.UpdateAction).GetObject()
106		volume := obj.(*v1.PersistentVolume)
107
108		// check the volume does not exist
109		_, found := r.volumes[volume.Name]
110		if found {
111			return true, nil, fmt.Errorf("cannot create volume %s: volume already exists", volume.Name)
112		}
113
114		// mimic apiserver defaulting
115		if volume.Spec.VolumeMode == nil {
116			volume.Spec.VolumeMode = new(v1.PersistentVolumeMode)
117			*volume.Spec.VolumeMode = v1.PersistentVolumeFilesystem
118		}
119
120		// Store the updated object to appropriate places.
121		r.volumes[volume.Name] = volume
122		for _, w := range r.getWatches(action.GetResource(), action.GetNamespace()) {
123			w.Add(volume)
124		}
125		r.changedObjects = append(r.changedObjects, volume)
126		r.changedSinceLastSync++
127		klog.V(4).Infof("created volume %s", volume.Name)
128		return true, volume, nil
129
130	case action.Matches("create", "persistentvolumeclaims"):
131		obj := action.(core.UpdateAction).GetObject()
132		claim := obj.(*v1.PersistentVolumeClaim)
133
134		// check the claim does not exist
135		_, found := r.claims[claim.Name]
136		if found {
137			return true, nil, fmt.Errorf("cannot create claim %s: claim already exists", claim.Name)
138		}
139
140		// Store the updated object to appropriate places.
141		r.claims[claim.Name] = claim
142		for _, w := range r.getWatches(action.GetResource(), action.GetNamespace()) {
143			w.Add(claim)
144		}
145		r.changedObjects = append(r.changedObjects, claim)
146		r.changedSinceLastSync++
147		klog.V(4).Infof("created claim %s", claim.Name)
148		return true, claim, nil
149
150	case action.Matches("update", "persistentvolumes"):
151		obj := action.(core.UpdateAction).GetObject()
152		volume := obj.(*v1.PersistentVolume)
153
154		// Check and bump object version
155		storedVolume, found := r.volumes[volume.Name]
156		if found {
157			storedVer, _ := strconv.Atoi(storedVolume.ResourceVersion)
158			requestedVer, _ := strconv.Atoi(volume.ResourceVersion)
159			if storedVer != requestedVer {
160				return true, obj, ErrVersionConflict
161			}
162			if reflect.DeepEqual(storedVolume, volume) {
163				klog.V(4).Infof("nothing updated volume %s", volume.Name)
164				return true, volume, nil
165			}
166			// Don't modify the existing object
167			volume = volume.DeepCopy()
168			volume.ResourceVersion = strconv.Itoa(storedVer + 1)
169		} else {
170			return true, nil, fmt.Errorf("cannot update volume %s: volume not found", volume.Name)
171		}
172
173		// Store the updated object to appropriate places.
174		for _, w := range r.getWatches(action.GetResource(), action.GetNamespace()) {
175			w.Modify(volume)
176		}
177		r.volumes[volume.Name] = volume
178		r.changedObjects = append(r.changedObjects, volume)
179		r.changedSinceLastSync++
180		klog.V(4).Infof("saved updated volume %s", volume.Name)
181		return true, volume, nil
182
183	case action.Matches("update", "persistentvolumeclaims"):
184		obj := action.(core.UpdateAction).GetObject()
185		claim := obj.(*v1.PersistentVolumeClaim)
186
187		// Check and bump object version
188		storedClaim, found := r.claims[claim.Name]
189		if found {
190			storedVer, _ := strconv.Atoi(storedClaim.ResourceVersion)
191			requestedVer, _ := strconv.Atoi(claim.ResourceVersion)
192			if storedVer != requestedVer {
193				return true, obj, ErrVersionConflict
194			}
195			if reflect.DeepEqual(storedClaim, claim) {
196				klog.V(4).Infof("nothing updated claim %s", claim.Name)
197				return true, claim, nil
198			}
199			// Don't modify the existing object
200			claim = claim.DeepCopy()
201			claim.ResourceVersion = strconv.Itoa(storedVer + 1)
202		} else {
203			return true, nil, fmt.Errorf("cannot update claim %s: claim not found", claim.Name)
204		}
205
206		// Store the updated object to appropriate places.
207		for _, w := range r.getWatches(action.GetResource(), action.GetNamespace()) {
208			w.Modify(claim)
209		}
210		r.claims[claim.Name] = claim
211		r.changedObjects = append(r.changedObjects, claim)
212		r.changedSinceLastSync++
213		klog.V(4).Infof("saved updated claim %s", claim.Name)
214		return true, claim, nil
215
216	case action.Matches("get", "persistentvolumes"):
217		name := action.(core.GetAction).GetName()
218		volume, found := r.volumes[name]
219		if found {
220			klog.V(4).Infof("GetVolume: found %s", volume.Name)
221			return true, volume.DeepCopy(), nil
222		}
223		klog.V(4).Infof("GetVolume: volume %s not found", name)
224		return true, nil, apierrors.NewNotFound(action.GetResource().GroupResource(), name)
225
226	case action.Matches("get", "persistentvolumeclaims"):
227		name := action.(core.GetAction).GetName()
228		claim, found := r.claims[name]
229		if found {
230			klog.V(4).Infof("GetClaim: found %s", claim.Name)
231			return true, claim.DeepCopy(), nil
232		}
233		klog.V(4).Infof("GetClaim: claim %s not found", name)
234		return true, nil, apierrors.NewNotFound(action.GetResource().GroupResource(), name)
235
236	case action.Matches("delete", "persistentvolumes"):
237		name := action.(core.DeleteAction).GetName()
238		klog.V(4).Infof("deleted volume %s", name)
239		obj, found := r.volumes[name]
240		if found {
241			delete(r.volumes, name)
242			for _, w := range r.getWatches(action.GetResource(), action.GetNamespace()) {
243				w.Delete(obj)
244			}
245			r.changedSinceLastSync++
246			return true, nil, nil
247		}
248		return true, nil, fmt.Errorf("cannot delete volume %s: not found", name)
249
250	case action.Matches("delete", "persistentvolumeclaims"):
251		name := action.(core.DeleteAction).GetName()
252		klog.V(4).Infof("deleted claim %s", name)
253		obj, found := r.claims[name]
254		if found {
255			delete(r.claims, name)
256			for _, w := range r.getWatches(action.GetResource(), action.GetNamespace()) {
257				w.Delete(obj)
258			}
259			r.changedSinceLastSync++
260			return true, nil, nil
261		}
262		return true, nil, fmt.Errorf("cannot delete claim %s: not found", name)
263	}
264
265	return false, nil, nil
266}
267
268// Watch watches objects from the VolumeReactor. Watch returns a channel which
269// will push added / modified / deleted object.
270func (r *VolumeReactor) Watch(gvr schema.GroupVersionResource, ns string) (watch.Interface, error) {
271	r.lock.Lock()
272	defer r.lock.Unlock()
273
274	fakewatcher := watch.NewRaceFreeFake()
275
276	if _, exists := r.watchers[gvr]; !exists {
277		r.watchers[gvr] = make(map[string][]*watch.RaceFreeFakeWatcher)
278	}
279	r.watchers[gvr][ns] = append(r.watchers[gvr][ns], fakewatcher)
280	return fakewatcher, nil
281}
282
283func (r *VolumeReactor) getWatches(gvr schema.GroupVersionResource, ns string) []*watch.RaceFreeFakeWatcher {
284	watches := []*watch.RaceFreeFakeWatcher{}
285	if r.watchers[gvr] != nil {
286		if w := r.watchers[gvr][ns]; w != nil {
287			watches = append(watches, w...)
288		}
289		if ns != metav1.NamespaceAll {
290			if w := r.watchers[gvr][metav1.NamespaceAll]; w != nil {
291				watches = append(watches, w...)
292			}
293		}
294	}
295	return watches
296}
297
298// injectReactError returns an error when the test requested given action to
299// fail. nil is returned otherwise.
300func (r *VolumeReactor) injectReactError(action core.Action) error {
301	if len(r.errors) == 0 {
302		// No more errors to inject, everything should succeed.
303		return nil
304	}
305
306	for i, expected := range r.errors {
307		klog.V(4).Infof("trying to match %q %q with %q %q", expected.Verb, expected.Resource, action.GetVerb(), action.GetResource())
308		if action.Matches(expected.Verb, expected.Resource) {
309			// That's the action we're waiting for, remove it from injectedErrors
310			r.errors = append(r.errors[:i], r.errors[i+1:]...)
311			klog.V(4).Infof("reactor found matching error at index %d: %q %q, returning %v", i, expected.Verb, expected.Resource, expected.Error)
312			return expected.Error
313		}
314	}
315	return nil
316}
317
318// CheckVolumes compares all expectedVolumes with set of volumes at the end of
319// the test and reports differences.
320func (r *VolumeReactor) CheckVolumes(expectedVolumes []*v1.PersistentVolume) error {
321	r.lock.Lock()
322	defer r.lock.Unlock()
323
324	expectedMap := make(map[string]*v1.PersistentVolume)
325	gotMap := make(map[string]*v1.PersistentVolume)
326	// Clear any ResourceVersion from both sets
327	for _, v := range expectedVolumes {
328		// Don't modify the existing object
329		v := v.DeepCopy()
330		v.ResourceVersion = ""
331		if v.Spec.ClaimRef != nil {
332			v.Spec.ClaimRef.ResourceVersion = ""
333		}
334		expectedMap[v.Name] = v
335	}
336	for _, v := range r.volumes {
337		// We must clone the volume because of golang race check - it was
338		// written by the controller without any locks on it.
339		v := v.DeepCopy()
340		v.ResourceVersion = ""
341		if v.Spec.ClaimRef != nil {
342			v.Spec.ClaimRef.ResourceVersion = ""
343		}
344		gotMap[v.Name] = v
345	}
346	if !reflect.DeepEqual(expectedMap, gotMap) {
347		// Print ugly but useful diff of expected and received objects for
348		// easier debugging.
349		return fmt.Errorf("Volume check failed [A-expected, B-got]: %s", diff.ObjectDiff(expectedMap, gotMap))
350	}
351	return nil
352}
353
354// CheckClaims compares all expectedClaims with set of claims at the end of the
355// test and reports differences.
356func (r *VolumeReactor) CheckClaims(expectedClaims []*v1.PersistentVolumeClaim) error {
357	r.lock.Lock()
358	defer r.lock.Unlock()
359
360	expectedMap := make(map[string]*v1.PersistentVolumeClaim)
361	gotMap := make(map[string]*v1.PersistentVolumeClaim)
362	for _, c := range expectedClaims {
363		// Don't modify the existing object
364		c = c.DeepCopy()
365		c.ResourceVersion = ""
366		expectedMap[c.Name] = c
367	}
368	for _, c := range r.claims {
369		// We must clone the claim because of golang race check - it was
370		// written by the controller without any locks on it.
371		c = c.DeepCopy()
372		c.ResourceVersion = ""
373		gotMap[c.Name] = c
374	}
375	if !reflect.DeepEqual(expectedMap, gotMap) {
376		// Print ugly but useful diff of expected and received objects for
377		// easier debugging.
378		return fmt.Errorf("Claim check failed [A-expected, B-got result]: %s", diff.ObjectDiff(expectedMap, gotMap))
379	}
380	return nil
381}
382
383// PopChange returns one recorded updated object, either *v1.PersistentVolume
384// or *v1.PersistentVolumeClaim. Returns nil when there are no changes.
385func (r *VolumeReactor) PopChange() interface{} {
386	r.lock.Lock()
387	defer r.lock.Unlock()
388
389	if len(r.changedObjects) == 0 {
390		return nil
391	}
392
393	// For debugging purposes, print the queue
394	for _, obj := range r.changedObjects {
395		switch obj.(type) {
396		case *v1.PersistentVolume:
397			vol, _ := obj.(*v1.PersistentVolume)
398			klog.V(4).Infof("reactor queue: %s", vol.Name)
399		case *v1.PersistentVolumeClaim:
400			claim, _ := obj.(*v1.PersistentVolumeClaim)
401			klog.V(4).Infof("reactor queue: %s", claim.Name)
402		}
403	}
404
405	// Pop the first item from the queue and return it
406	obj := r.changedObjects[0]
407	r.changedObjects = r.changedObjects[1:]
408	return obj
409}
410
411// SyncAll simulates the controller periodic sync of volumes and claim. It
412// simply adds all these objects to the internal queue of updates. This method
413// should be used when the test manually calls syncClaim/syncVolume. Test that
414// use real controller loop (ctrl.Run()) will get periodic sync automatically.
415func (r *VolumeReactor) SyncAll() {
416	r.lock.Lock()
417	defer r.lock.Unlock()
418
419	for _, c := range r.claims {
420		r.changedObjects = append(r.changedObjects, c)
421	}
422	for _, v := range r.volumes {
423		r.changedObjects = append(r.changedObjects, v)
424	}
425	r.changedSinceLastSync = 0
426}
427
428// GetChangeCount returns changes since last sync.
429func (r *VolumeReactor) GetChangeCount() int {
430	r.lock.Lock()
431	defer r.lock.Unlock()
432	return r.changedSinceLastSync
433}
434
435// DeleteVolumeEvent simulates that a volume has been deleted in etcd and
436// the controller receives 'volume deleted' event.
437func (r *VolumeReactor) DeleteVolumeEvent(volume *v1.PersistentVolume) {
438	r.lock.Lock()
439	defer r.lock.Unlock()
440
441	// Remove the volume from list of resulting volumes.
442	delete(r.volumes, volume.Name)
443
444	// Generate deletion event. Cloned volume is needed to prevent races (and we
445	// would get a clone from etcd too).
446	if r.fakeVolumeWatch != nil {
447		r.fakeVolumeWatch.Delete(volume.DeepCopy())
448	}
449}
450
451// DeleteClaimEvent simulates that a claim has been deleted in etcd and the
452// controller receives 'claim deleted' event.
453func (r *VolumeReactor) DeleteClaimEvent(claim *v1.PersistentVolumeClaim) {
454	r.lock.Lock()
455	defer r.lock.Unlock()
456
457	// Remove the claim from list of resulting claims.
458	delete(r.claims, claim.Name)
459
460	// Generate deletion event. Cloned volume is needed to prevent races (and we
461	// would get a clone from etcd too).
462	if r.fakeClaimWatch != nil {
463		r.fakeClaimWatch.Delete(claim.DeepCopy())
464	}
465}
466
467// AddClaimEvent simulates that a claim has been deleted in etcd and the
468// controller receives 'claim added' event.
469func (r *VolumeReactor) AddClaimEvent(claim *v1.PersistentVolumeClaim) {
470	r.lock.Lock()
471	defer r.lock.Unlock()
472
473	r.claims[claim.Name] = claim
474	// Generate event. No cloning is needed, this claim is not stored in the
475	// controller cache yet.
476	if r.fakeClaimWatch != nil {
477		r.fakeClaimWatch.Add(claim)
478	}
479}
480
481// AddClaims adds PVCs into VolumeReactor.
482func (r *VolumeReactor) AddClaims(claims []*v1.PersistentVolumeClaim) {
483	r.lock.Lock()
484	defer r.lock.Unlock()
485	for _, claim := range claims {
486		r.claims[claim.Name] = claim
487	}
488}
489
490// AddVolumes adds PVs into VolumeReactor.
491func (r *VolumeReactor) AddVolumes(volumes []*v1.PersistentVolume) {
492	r.lock.Lock()
493	defer r.lock.Unlock()
494	for _, volume := range volumes {
495		r.volumes[volume.Name] = volume
496	}
497}
498
499// AddClaim adds a PVC into VolumeReactor.
500func (r *VolumeReactor) AddClaim(claim *v1.PersistentVolumeClaim) {
501	r.lock.Lock()
502	defer r.lock.Unlock()
503	r.claims[claim.Name] = claim
504}
505
506// AddVolume adds a PV into VolumeReactor.
507func (r *VolumeReactor) AddVolume(volume *v1.PersistentVolume) {
508	r.lock.Lock()
509	defer r.lock.Unlock()
510	r.volumes[volume.Name] = volume
511}
512
513// DeleteVolume deletes a PV by name.
514func (r *VolumeReactor) DeleteVolume(name string) {
515	r.lock.Lock()
516	defer r.lock.Unlock()
517	delete(r.volumes, name)
518}
519
520// AddClaimBoundToVolume adds a PVC and binds it to corresponding PV.
521func (r *VolumeReactor) AddClaimBoundToVolume(claim *v1.PersistentVolumeClaim) {
522	r.lock.Lock()
523	defer r.lock.Unlock()
524	r.claims[claim.Name] = claim
525	if volume, ok := r.volumes[claim.Spec.VolumeName]; ok {
526		volume.Status.Phase = v1.VolumeBound
527	}
528}
529
530// MarkVolumeAvailable marks a PV available by name.
531func (r *VolumeReactor) MarkVolumeAvailable(name string) {
532	r.lock.Lock()
533	defer r.lock.Unlock()
534	if volume, ok := r.volumes[name]; ok {
535		volume.Spec.ClaimRef = nil
536		volume.Status.Phase = v1.VolumeAvailable
537		volume.Annotations = nil
538	}
539}
540
541// NewVolumeReactor creates a volume reactor.
542func NewVolumeReactor(client *fake.Clientset, fakeVolumeWatch, fakeClaimWatch *watch.FakeWatcher, errors []ReactorError) *VolumeReactor {
543	reactor := &VolumeReactor{
544		volumes:         make(map[string]*v1.PersistentVolume),
545		claims:          make(map[string]*v1.PersistentVolumeClaim),
546		fakeVolumeWatch: fakeVolumeWatch,
547		fakeClaimWatch:  fakeClaimWatch,
548		errors:          errors,
549		watchers:        make(map[schema.GroupVersionResource]map[string][]*watch.RaceFreeFakeWatcher),
550	}
551	client.AddReactor("create", "persistentvolumes", reactor.React)
552	client.AddReactor("create", "persistentvolumeclaims", reactor.React)
553	client.AddReactor("update", "persistentvolumes", reactor.React)
554	client.AddReactor("update", "persistentvolumeclaims", reactor.React)
555	client.AddReactor("get", "persistentvolumes", reactor.React)
556	client.AddReactor("get", "persistentvolumeclaims", reactor.React)
557	client.AddReactor("delete", "persistentvolumes", reactor.React)
558	client.AddReactor("delete", "persistentvolumeclaims", reactor.React)
559	return reactor
560}
561