1/* 2Copyright 2019 The Kubernetes Authors. 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package testing 18 19import ( 20 "errors" 21 "fmt" 22 "reflect" 23 "strconv" 24 "sync" 25 26 v1 "k8s.io/api/core/v1" 27 apierrors "k8s.io/apimachinery/pkg/api/errors" 28 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" 29 "k8s.io/apimachinery/pkg/runtime" 30 "k8s.io/apimachinery/pkg/runtime/schema" 31 "k8s.io/apimachinery/pkg/util/diff" 32 "k8s.io/apimachinery/pkg/watch" 33 "k8s.io/client-go/kubernetes/fake" 34 core "k8s.io/client-go/testing" 35 "k8s.io/klog/v2" 36) 37 38// ErrVersionConflict is the error returned when resource version of requested 39// object conflicts with the object in storage. 40var ErrVersionConflict = errors.New("VersionError") 41 42// VolumeReactor is a core.Reactor that simulates etcd and API server. It 43// stores: 44// - Latest version of claims volumes saved by the controller. 45// - Queue of all saves (to simulate "volume/claim updated" events). This queue 46// contains all intermediate state of an object - e.g. a claim.VolumeName 47// is updated first and claim.Phase second. This queue will then contain both 48// updates as separate entries. 49// - Number of changes since the last call to VolumeReactor.syncAll(). 50// - Optionally, volume and claim fake watchers which should be the same ones 51// used by the controller. Any time an event function like deleteVolumeEvent 52// is called to simulate an event, the reactor's stores are updated and the 53// controller is sent the event via the fake watcher. 54// - Optionally, list of error that should be returned by reactor, simulating 55// etcd / API server failures. These errors are evaluated in order and every 56// error is returned only once. I.e. when the reactor finds matching 57// ReactorError, it return appropriate error and removes the ReactorError from 58// the list. 59type VolumeReactor struct { 60 volumes map[string]*v1.PersistentVolume 61 claims map[string]*v1.PersistentVolumeClaim 62 changedObjects []interface{} 63 changedSinceLastSync int 64 fakeVolumeWatch *watch.FakeWatcher 65 fakeClaimWatch *watch.FakeWatcher 66 lock sync.RWMutex 67 errors []ReactorError 68 watchers map[schema.GroupVersionResource]map[string][]*watch.RaceFreeFakeWatcher 69} 70 71// ReactorError is an error that is returned by test reactor (=simulated 72// etcd+/API server) when an action performed by the reactor matches given verb 73// ("get", "update", "create", "delete" or "*"") on given resource 74// ("persistentvolumes", "persistentvolumeclaims" or "*"). 75type ReactorError struct { 76 Verb string 77 Resource string 78 Error error 79} 80 81// React is a callback called by fake kubeClient from the controller. 82// In other words, every claim/volume change performed by the controller ends 83// here. 84// This callback checks versions of the updated objects and refuse those that 85// are too old (simulating real etcd). 86// All updated objects are stored locally to keep track of object versions and 87// to evaluate test results. 88// All updated objects are also inserted into changedObjects queue and 89// optionally sent back to the controller via its watchers. 90func (r *VolumeReactor) React(action core.Action) (handled bool, ret runtime.Object, err error) { 91 r.lock.Lock() 92 defer r.lock.Unlock() 93 94 klog.V(4).Infof("reactor got operation %q on %q", action.GetVerb(), action.GetResource()) 95 96 // Inject error when requested 97 err = r.injectReactError(action) 98 if err != nil { 99 return true, nil, err 100 } 101 102 // Test did not request to inject an error, continue simulating API server. 103 switch { 104 case action.Matches("create", "persistentvolumes"): 105 obj := action.(core.UpdateAction).GetObject() 106 volume := obj.(*v1.PersistentVolume) 107 108 // check the volume does not exist 109 _, found := r.volumes[volume.Name] 110 if found { 111 return true, nil, fmt.Errorf("cannot create volume %s: volume already exists", volume.Name) 112 } 113 114 // mimic apiserver defaulting 115 if volume.Spec.VolumeMode == nil { 116 volume.Spec.VolumeMode = new(v1.PersistentVolumeMode) 117 *volume.Spec.VolumeMode = v1.PersistentVolumeFilesystem 118 } 119 120 // Store the updated object to appropriate places. 121 r.volumes[volume.Name] = volume 122 for _, w := range r.getWatches(action.GetResource(), action.GetNamespace()) { 123 w.Add(volume) 124 } 125 r.changedObjects = append(r.changedObjects, volume) 126 r.changedSinceLastSync++ 127 klog.V(4).Infof("created volume %s", volume.Name) 128 return true, volume, nil 129 130 case action.Matches("create", "persistentvolumeclaims"): 131 obj := action.(core.UpdateAction).GetObject() 132 claim := obj.(*v1.PersistentVolumeClaim) 133 134 // check the claim does not exist 135 _, found := r.claims[claim.Name] 136 if found { 137 return true, nil, fmt.Errorf("cannot create claim %s: claim already exists", claim.Name) 138 } 139 140 // Store the updated object to appropriate places. 141 r.claims[claim.Name] = claim 142 for _, w := range r.getWatches(action.GetResource(), action.GetNamespace()) { 143 w.Add(claim) 144 } 145 r.changedObjects = append(r.changedObjects, claim) 146 r.changedSinceLastSync++ 147 klog.V(4).Infof("created claim %s", claim.Name) 148 return true, claim, nil 149 150 case action.Matches("update", "persistentvolumes"): 151 obj := action.(core.UpdateAction).GetObject() 152 volume := obj.(*v1.PersistentVolume) 153 154 // Check and bump object version 155 storedVolume, found := r.volumes[volume.Name] 156 if found { 157 storedVer, _ := strconv.Atoi(storedVolume.ResourceVersion) 158 requestedVer, _ := strconv.Atoi(volume.ResourceVersion) 159 if storedVer != requestedVer { 160 return true, obj, ErrVersionConflict 161 } 162 if reflect.DeepEqual(storedVolume, volume) { 163 klog.V(4).Infof("nothing updated volume %s", volume.Name) 164 return true, volume, nil 165 } 166 // Don't modify the existing object 167 volume = volume.DeepCopy() 168 volume.ResourceVersion = strconv.Itoa(storedVer + 1) 169 } else { 170 return true, nil, fmt.Errorf("cannot update volume %s: volume not found", volume.Name) 171 } 172 173 // Store the updated object to appropriate places. 174 for _, w := range r.getWatches(action.GetResource(), action.GetNamespace()) { 175 w.Modify(volume) 176 } 177 r.volumes[volume.Name] = volume 178 r.changedObjects = append(r.changedObjects, volume) 179 r.changedSinceLastSync++ 180 klog.V(4).Infof("saved updated volume %s", volume.Name) 181 return true, volume, nil 182 183 case action.Matches("update", "persistentvolumeclaims"): 184 obj := action.(core.UpdateAction).GetObject() 185 claim := obj.(*v1.PersistentVolumeClaim) 186 187 // Check and bump object version 188 storedClaim, found := r.claims[claim.Name] 189 if found { 190 storedVer, _ := strconv.Atoi(storedClaim.ResourceVersion) 191 requestedVer, _ := strconv.Atoi(claim.ResourceVersion) 192 if storedVer != requestedVer { 193 return true, obj, ErrVersionConflict 194 } 195 if reflect.DeepEqual(storedClaim, claim) { 196 klog.V(4).Infof("nothing updated claim %s", claim.Name) 197 return true, claim, nil 198 } 199 // Don't modify the existing object 200 claim = claim.DeepCopy() 201 claim.ResourceVersion = strconv.Itoa(storedVer + 1) 202 } else { 203 return true, nil, fmt.Errorf("cannot update claim %s: claim not found", claim.Name) 204 } 205 206 // Store the updated object to appropriate places. 207 for _, w := range r.getWatches(action.GetResource(), action.GetNamespace()) { 208 w.Modify(claim) 209 } 210 r.claims[claim.Name] = claim 211 r.changedObjects = append(r.changedObjects, claim) 212 r.changedSinceLastSync++ 213 klog.V(4).Infof("saved updated claim %s", claim.Name) 214 return true, claim, nil 215 216 case action.Matches("get", "persistentvolumes"): 217 name := action.(core.GetAction).GetName() 218 volume, found := r.volumes[name] 219 if found { 220 klog.V(4).Infof("GetVolume: found %s", volume.Name) 221 return true, volume.DeepCopy(), nil 222 } 223 klog.V(4).Infof("GetVolume: volume %s not found", name) 224 return true, nil, apierrors.NewNotFound(action.GetResource().GroupResource(), name) 225 226 case action.Matches("get", "persistentvolumeclaims"): 227 name := action.(core.GetAction).GetName() 228 claim, found := r.claims[name] 229 if found { 230 klog.V(4).Infof("GetClaim: found %s", claim.Name) 231 return true, claim.DeepCopy(), nil 232 } 233 klog.V(4).Infof("GetClaim: claim %s not found", name) 234 return true, nil, apierrors.NewNotFound(action.GetResource().GroupResource(), name) 235 236 case action.Matches("delete", "persistentvolumes"): 237 name := action.(core.DeleteAction).GetName() 238 klog.V(4).Infof("deleted volume %s", name) 239 obj, found := r.volumes[name] 240 if found { 241 delete(r.volumes, name) 242 for _, w := range r.getWatches(action.GetResource(), action.GetNamespace()) { 243 w.Delete(obj) 244 } 245 r.changedSinceLastSync++ 246 return true, nil, nil 247 } 248 return true, nil, fmt.Errorf("cannot delete volume %s: not found", name) 249 250 case action.Matches("delete", "persistentvolumeclaims"): 251 name := action.(core.DeleteAction).GetName() 252 klog.V(4).Infof("deleted claim %s", name) 253 obj, found := r.claims[name] 254 if found { 255 delete(r.claims, name) 256 for _, w := range r.getWatches(action.GetResource(), action.GetNamespace()) { 257 w.Delete(obj) 258 } 259 r.changedSinceLastSync++ 260 return true, nil, nil 261 } 262 return true, nil, fmt.Errorf("cannot delete claim %s: not found", name) 263 } 264 265 return false, nil, nil 266} 267 268// Watch watches objects from the VolumeReactor. Watch returns a channel which 269// will push added / modified / deleted object. 270func (r *VolumeReactor) Watch(gvr schema.GroupVersionResource, ns string) (watch.Interface, error) { 271 r.lock.Lock() 272 defer r.lock.Unlock() 273 274 fakewatcher := watch.NewRaceFreeFake() 275 276 if _, exists := r.watchers[gvr]; !exists { 277 r.watchers[gvr] = make(map[string][]*watch.RaceFreeFakeWatcher) 278 } 279 r.watchers[gvr][ns] = append(r.watchers[gvr][ns], fakewatcher) 280 return fakewatcher, nil 281} 282 283func (r *VolumeReactor) getWatches(gvr schema.GroupVersionResource, ns string) []*watch.RaceFreeFakeWatcher { 284 watches := []*watch.RaceFreeFakeWatcher{} 285 if r.watchers[gvr] != nil { 286 if w := r.watchers[gvr][ns]; w != nil { 287 watches = append(watches, w...) 288 } 289 if ns != metav1.NamespaceAll { 290 if w := r.watchers[gvr][metav1.NamespaceAll]; w != nil { 291 watches = append(watches, w...) 292 } 293 } 294 } 295 return watches 296} 297 298// injectReactError returns an error when the test requested given action to 299// fail. nil is returned otherwise. 300func (r *VolumeReactor) injectReactError(action core.Action) error { 301 if len(r.errors) == 0 { 302 // No more errors to inject, everything should succeed. 303 return nil 304 } 305 306 for i, expected := range r.errors { 307 klog.V(4).Infof("trying to match %q %q with %q %q", expected.Verb, expected.Resource, action.GetVerb(), action.GetResource()) 308 if action.Matches(expected.Verb, expected.Resource) { 309 // That's the action we're waiting for, remove it from injectedErrors 310 r.errors = append(r.errors[:i], r.errors[i+1:]...) 311 klog.V(4).Infof("reactor found matching error at index %d: %q %q, returning %v", i, expected.Verb, expected.Resource, expected.Error) 312 return expected.Error 313 } 314 } 315 return nil 316} 317 318// CheckVolumes compares all expectedVolumes with set of volumes at the end of 319// the test and reports differences. 320func (r *VolumeReactor) CheckVolumes(expectedVolumes []*v1.PersistentVolume) error { 321 r.lock.Lock() 322 defer r.lock.Unlock() 323 324 expectedMap := make(map[string]*v1.PersistentVolume) 325 gotMap := make(map[string]*v1.PersistentVolume) 326 // Clear any ResourceVersion from both sets 327 for _, v := range expectedVolumes { 328 // Don't modify the existing object 329 v := v.DeepCopy() 330 v.ResourceVersion = "" 331 if v.Spec.ClaimRef != nil { 332 v.Spec.ClaimRef.ResourceVersion = "" 333 } 334 expectedMap[v.Name] = v 335 } 336 for _, v := range r.volumes { 337 // We must clone the volume because of golang race check - it was 338 // written by the controller without any locks on it. 339 v := v.DeepCopy() 340 v.ResourceVersion = "" 341 if v.Spec.ClaimRef != nil { 342 v.Spec.ClaimRef.ResourceVersion = "" 343 } 344 gotMap[v.Name] = v 345 } 346 if !reflect.DeepEqual(expectedMap, gotMap) { 347 // Print ugly but useful diff of expected and received objects for 348 // easier debugging. 349 return fmt.Errorf("Volume check failed [A-expected, B-got]: %s", diff.ObjectDiff(expectedMap, gotMap)) 350 } 351 return nil 352} 353 354// CheckClaims compares all expectedClaims with set of claims at the end of the 355// test and reports differences. 356func (r *VolumeReactor) CheckClaims(expectedClaims []*v1.PersistentVolumeClaim) error { 357 r.lock.Lock() 358 defer r.lock.Unlock() 359 360 expectedMap := make(map[string]*v1.PersistentVolumeClaim) 361 gotMap := make(map[string]*v1.PersistentVolumeClaim) 362 for _, c := range expectedClaims { 363 // Don't modify the existing object 364 c = c.DeepCopy() 365 c.ResourceVersion = "" 366 expectedMap[c.Name] = c 367 } 368 for _, c := range r.claims { 369 // We must clone the claim because of golang race check - it was 370 // written by the controller without any locks on it. 371 c = c.DeepCopy() 372 c.ResourceVersion = "" 373 gotMap[c.Name] = c 374 } 375 if !reflect.DeepEqual(expectedMap, gotMap) { 376 // Print ugly but useful diff of expected and received objects for 377 // easier debugging. 378 return fmt.Errorf("Claim check failed [A-expected, B-got result]: %s", diff.ObjectDiff(expectedMap, gotMap)) 379 } 380 return nil 381} 382 383// PopChange returns one recorded updated object, either *v1.PersistentVolume 384// or *v1.PersistentVolumeClaim. Returns nil when there are no changes. 385func (r *VolumeReactor) PopChange() interface{} { 386 r.lock.Lock() 387 defer r.lock.Unlock() 388 389 if len(r.changedObjects) == 0 { 390 return nil 391 } 392 393 // For debugging purposes, print the queue 394 for _, obj := range r.changedObjects { 395 switch obj.(type) { 396 case *v1.PersistentVolume: 397 vol, _ := obj.(*v1.PersistentVolume) 398 klog.V(4).Infof("reactor queue: %s", vol.Name) 399 case *v1.PersistentVolumeClaim: 400 claim, _ := obj.(*v1.PersistentVolumeClaim) 401 klog.V(4).Infof("reactor queue: %s", claim.Name) 402 } 403 } 404 405 // Pop the first item from the queue and return it 406 obj := r.changedObjects[0] 407 r.changedObjects = r.changedObjects[1:] 408 return obj 409} 410 411// SyncAll simulates the controller periodic sync of volumes and claim. It 412// simply adds all these objects to the internal queue of updates. This method 413// should be used when the test manually calls syncClaim/syncVolume. Test that 414// use real controller loop (ctrl.Run()) will get periodic sync automatically. 415func (r *VolumeReactor) SyncAll() { 416 r.lock.Lock() 417 defer r.lock.Unlock() 418 419 for _, c := range r.claims { 420 r.changedObjects = append(r.changedObjects, c) 421 } 422 for _, v := range r.volumes { 423 r.changedObjects = append(r.changedObjects, v) 424 } 425 r.changedSinceLastSync = 0 426} 427 428// GetChangeCount returns changes since last sync. 429func (r *VolumeReactor) GetChangeCount() int { 430 r.lock.Lock() 431 defer r.lock.Unlock() 432 return r.changedSinceLastSync 433} 434 435// DeleteVolumeEvent simulates that a volume has been deleted in etcd and 436// the controller receives 'volume deleted' event. 437func (r *VolumeReactor) DeleteVolumeEvent(volume *v1.PersistentVolume) { 438 r.lock.Lock() 439 defer r.lock.Unlock() 440 441 // Remove the volume from list of resulting volumes. 442 delete(r.volumes, volume.Name) 443 444 // Generate deletion event. Cloned volume is needed to prevent races (and we 445 // would get a clone from etcd too). 446 if r.fakeVolumeWatch != nil { 447 r.fakeVolumeWatch.Delete(volume.DeepCopy()) 448 } 449} 450 451// DeleteClaimEvent simulates that a claim has been deleted in etcd and the 452// controller receives 'claim deleted' event. 453func (r *VolumeReactor) DeleteClaimEvent(claim *v1.PersistentVolumeClaim) { 454 r.lock.Lock() 455 defer r.lock.Unlock() 456 457 // Remove the claim from list of resulting claims. 458 delete(r.claims, claim.Name) 459 460 // Generate deletion event. Cloned volume is needed to prevent races (and we 461 // would get a clone from etcd too). 462 if r.fakeClaimWatch != nil { 463 r.fakeClaimWatch.Delete(claim.DeepCopy()) 464 } 465} 466 467// AddClaimEvent simulates that a claim has been deleted in etcd and the 468// controller receives 'claim added' event. 469func (r *VolumeReactor) AddClaimEvent(claim *v1.PersistentVolumeClaim) { 470 r.lock.Lock() 471 defer r.lock.Unlock() 472 473 r.claims[claim.Name] = claim 474 // Generate event. No cloning is needed, this claim is not stored in the 475 // controller cache yet. 476 if r.fakeClaimWatch != nil { 477 r.fakeClaimWatch.Add(claim) 478 } 479} 480 481// AddClaims adds PVCs into VolumeReactor. 482func (r *VolumeReactor) AddClaims(claims []*v1.PersistentVolumeClaim) { 483 r.lock.Lock() 484 defer r.lock.Unlock() 485 for _, claim := range claims { 486 r.claims[claim.Name] = claim 487 } 488} 489 490// AddVolumes adds PVs into VolumeReactor. 491func (r *VolumeReactor) AddVolumes(volumes []*v1.PersistentVolume) { 492 r.lock.Lock() 493 defer r.lock.Unlock() 494 for _, volume := range volumes { 495 r.volumes[volume.Name] = volume 496 } 497} 498 499// AddClaim adds a PVC into VolumeReactor. 500func (r *VolumeReactor) AddClaim(claim *v1.PersistentVolumeClaim) { 501 r.lock.Lock() 502 defer r.lock.Unlock() 503 r.claims[claim.Name] = claim 504} 505 506// AddVolume adds a PV into VolumeReactor. 507func (r *VolumeReactor) AddVolume(volume *v1.PersistentVolume) { 508 r.lock.Lock() 509 defer r.lock.Unlock() 510 r.volumes[volume.Name] = volume 511} 512 513// DeleteVolume deletes a PV by name. 514func (r *VolumeReactor) DeleteVolume(name string) { 515 r.lock.Lock() 516 defer r.lock.Unlock() 517 delete(r.volumes, name) 518} 519 520// AddClaimBoundToVolume adds a PVC and binds it to corresponding PV. 521func (r *VolumeReactor) AddClaimBoundToVolume(claim *v1.PersistentVolumeClaim) { 522 r.lock.Lock() 523 defer r.lock.Unlock() 524 r.claims[claim.Name] = claim 525 if volume, ok := r.volumes[claim.Spec.VolumeName]; ok { 526 volume.Status.Phase = v1.VolumeBound 527 } 528} 529 530// MarkVolumeAvailable marks a PV available by name. 531func (r *VolumeReactor) MarkVolumeAvailable(name string) { 532 r.lock.Lock() 533 defer r.lock.Unlock() 534 if volume, ok := r.volumes[name]; ok { 535 volume.Spec.ClaimRef = nil 536 volume.Status.Phase = v1.VolumeAvailable 537 volume.Annotations = nil 538 } 539} 540 541// NewVolumeReactor creates a volume reactor. 542func NewVolumeReactor(client *fake.Clientset, fakeVolumeWatch, fakeClaimWatch *watch.FakeWatcher, errors []ReactorError) *VolumeReactor { 543 reactor := &VolumeReactor{ 544 volumes: make(map[string]*v1.PersistentVolume), 545 claims: make(map[string]*v1.PersistentVolumeClaim), 546 fakeVolumeWatch: fakeVolumeWatch, 547 fakeClaimWatch: fakeClaimWatch, 548 errors: errors, 549 watchers: make(map[schema.GroupVersionResource]map[string][]*watch.RaceFreeFakeWatcher), 550 } 551 client.AddReactor("create", "persistentvolumes", reactor.React) 552 client.AddReactor("create", "persistentvolumeclaims", reactor.React) 553 client.AddReactor("update", "persistentvolumes", reactor.React) 554 client.AddReactor("update", "persistentvolumeclaims", reactor.React) 555 client.AddReactor("get", "persistentvolumes", reactor.React) 556 client.AddReactor("get", "persistentvolumeclaims", reactor.React) 557 client.AddReactor("delete", "persistentvolumes", reactor.React) 558 client.AddReactor("delete", "persistentvolumeclaims", reactor.React) 559 return reactor 560} 561