1/* 2Copyright 2019 The Kubernetes Authors. 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package storage 18 19import ( 20 "context" 21 "crypto/sha256" 22 "errors" 23 "fmt" 24 "math/rand" 25 "strconv" 26 "strings" 27 "sync/atomic" 28 "time" 29 30 "google.golang.org/grpc/codes" 31 "google.golang.org/grpc/status" 32 v1 "k8s.io/api/core/v1" 33 storagev1 "k8s.io/api/storage/v1" 34 storagev1beta1 "k8s.io/api/storage/v1beta1" 35 apierrors "k8s.io/apimachinery/pkg/api/errors" 36 "k8s.io/apimachinery/pkg/api/resource" 37 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" 38 "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" 39 "k8s.io/apimachinery/pkg/fields" 40 utilerrors "k8s.io/apimachinery/pkg/util/errors" 41 "k8s.io/apimachinery/pkg/util/sets" 42 "k8s.io/apimachinery/pkg/util/wait" 43 "k8s.io/apimachinery/pkg/watch" 44 clientset "k8s.io/client-go/kubernetes" 45 cachetools "k8s.io/client-go/tools/cache" 46 watchtools "k8s.io/client-go/tools/watch" 47 "k8s.io/kubernetes/pkg/controller/volume/scheduling" 48 "k8s.io/kubernetes/pkg/kubelet/events" 49 "k8s.io/kubernetes/test/e2e/framework" 50 e2eevents "k8s.io/kubernetes/test/e2e/framework/events" 51 e2emetrics "k8s.io/kubernetes/test/e2e/framework/metrics" 52 e2epod "k8s.io/kubernetes/test/e2e/framework/pod" 53 e2epv "k8s.io/kubernetes/test/e2e/framework/pv" 54 e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper" 55 e2evolume "k8s.io/kubernetes/test/e2e/framework/volume" 56 "k8s.io/kubernetes/test/e2e/storage/drivers" 57 storageframework "k8s.io/kubernetes/test/e2e/storage/framework" 58 "k8s.io/kubernetes/test/e2e/storage/testsuites" 59 "k8s.io/kubernetes/test/e2e/storage/utils" 60 imageutils "k8s.io/kubernetes/test/utils/image" 61 utilptr "k8s.io/utils/pointer" 62 63 "github.com/onsi/ginkgo" 64 "github.com/onsi/gomega" 65) 66 67const ( 68 csiNodeLimitUpdateTimeout = 5 * time.Minute 69 csiPodUnschedulableTimeout = 5 * time.Minute 70 csiResizeWaitPeriod = 5 * time.Minute 71 csiVolumeAttachmentTimeout = 7 * time.Minute 72 // how long to wait for Resizing Condition on PVC to appear 73 csiResizingConditionWait = 2 * time.Minute 74 75 // Time for starting a pod with a volume. 76 csiPodRunningTimeout = 5 * time.Minute 77 78 // How log to wait for kubelet to unstage a volume after a pod is deleted 79 csiUnstageWaitTimeout = 1 * time.Minute 80) 81 82// csiCall represents an expected call from Kubernetes to CSI mock driver and 83// expected return value. 84// When matching expected csiCall with a real CSI mock driver output, one csiCall 85// matches *one or more* calls with the same method and error code. 86// This is due to exponential backoff in Kubernetes, where the test cannot expect 87// exact number of call repetitions. 88type csiCall struct { 89 expectedMethod string 90 expectedError codes.Code 91 // This is a mark for the test itself to delete the tested pod *after* 92 // this csiCall is received. 93 deletePod bool 94} 95 96var _ = utils.SIGDescribe("CSI mock volume", func() { 97 type testParameters struct { 98 disableAttach bool 99 attachLimit int 100 registerDriver bool 101 lateBinding bool 102 enableTopology bool 103 podInfo *bool 104 storageCapacity *bool 105 scName string // pre-selected storage class name; must be unique in the cluster 106 enableResizing bool // enable resizing for both CSI mock driver and storageClass. 107 enableNodeExpansion bool // enable node expansion for CSI mock driver 108 // just disable resizing on driver it overrides enableResizing flag for CSI mock driver 109 disableResizingOnDriver bool 110 enableSnapshot bool 111 hooks *drivers.Hooks 112 tokenRequests []storagev1.TokenRequest 113 requiresRepublish *bool 114 fsGroupPolicy *storagev1.FSGroupPolicy 115 } 116 117 type mockDriverSetup struct { 118 cs clientset.Interface 119 config *storageframework.PerTestConfig 120 testCleanups []func() 121 pods []*v1.Pod 122 pvcs []*v1.PersistentVolumeClaim 123 sc map[string]*storagev1.StorageClass 124 vsc map[string]*unstructured.Unstructured 125 driver drivers.MockCSITestDriver 126 provisioner string 127 tp testParameters 128 } 129 130 var m mockDriverSetup 131 132 f := framework.NewDefaultFramework("csi-mock-volumes") 133 134 init := func(tp testParameters) { 135 m = mockDriverSetup{ 136 cs: f.ClientSet, 137 sc: make(map[string]*storagev1.StorageClass), 138 vsc: make(map[string]*unstructured.Unstructured), 139 tp: tp, 140 } 141 cs := f.ClientSet 142 var err error 143 driverOpts := drivers.CSIMockDriverOpts{ 144 RegisterDriver: tp.registerDriver, 145 PodInfo: tp.podInfo, 146 StorageCapacity: tp.storageCapacity, 147 EnableTopology: tp.enableTopology, 148 AttachLimit: tp.attachLimit, 149 DisableAttach: tp.disableAttach, 150 EnableResizing: tp.enableResizing, 151 EnableNodeExpansion: tp.enableNodeExpansion, 152 EnableSnapshot: tp.enableSnapshot, 153 TokenRequests: tp.tokenRequests, 154 RequiresRepublish: tp.requiresRepublish, 155 FSGroupPolicy: tp.fsGroupPolicy, 156 } 157 158 // At the moment, only tests which need hooks are 159 // using the embedded CSI mock driver. The rest run 160 // the driver inside the cluster although they could 161 // changed to use embedding merely by setting 162 // driverOpts.embedded to true. 163 // 164 // Not enabling it for all tests minimizes 165 // the risk that the introduction of embedded breaks 166 // some existings tests and avoids a dependency 167 // on port forwarding, which is important if some of 168 // these tests are supposed to become part of 169 // conformance testing (port forwarding isn't 170 // currently required). 171 if tp.hooks != nil { 172 driverOpts.Embedded = true 173 driverOpts.Hooks = *tp.hooks 174 } 175 176 // this just disable resizing on driver, keeping resizing on SC enabled. 177 if tp.disableResizingOnDriver { 178 driverOpts.EnableResizing = false 179 } 180 181 m.driver = drivers.InitMockCSIDriver(driverOpts) 182 config, testCleanup := m.driver.PrepareTest(f) 183 m.testCleanups = append(m.testCleanups, testCleanup) 184 m.config = config 185 m.provisioner = config.GetUniqueDriverName() 186 187 if tp.registerDriver { 188 err = waitForCSIDriver(cs, m.config.GetUniqueDriverName()) 189 framework.ExpectNoError(err, "Failed to get CSIDriver %v", m.config.GetUniqueDriverName()) 190 m.testCleanups = append(m.testCleanups, func() { 191 destroyCSIDriver(cs, m.config.GetUniqueDriverName()) 192 }) 193 } 194 195 // Wait for the CSIDriver actually get deployed and CSINode object to be generated. 196 // This indicates the mock CSI driver pod is up and running healthy. 197 err = drivers.WaitForCSIDriverRegistrationOnNode(m.config.ClientNodeSelection.Name, m.config.GetUniqueDriverName(), cs) 198 framework.ExpectNoError(err, "Failed to register CSIDriver %v", m.config.GetUniqueDriverName()) 199 } 200 201 createPod := func(ephemeral bool) (class *storagev1.StorageClass, claim *v1.PersistentVolumeClaim, pod *v1.Pod) { 202 ginkgo.By("Creating pod") 203 sc := m.driver.GetDynamicProvisionStorageClass(m.config, "") 204 scTest := testsuites.StorageClassTest{ 205 Name: m.driver.GetDriverInfo().Name, 206 Timeouts: f.Timeouts, 207 Provisioner: sc.Provisioner, 208 Parameters: sc.Parameters, 209 ClaimSize: "1Gi", 210 ExpectedSize: "1Gi", 211 DelayBinding: m.tp.lateBinding, 212 AllowVolumeExpansion: m.tp.enableResizing, 213 } 214 215 // The mock driver only works when everything runs on a single node. 216 nodeSelection := m.config.ClientNodeSelection 217 if ephemeral { 218 pod = startPausePodInline(f.ClientSet, scTest, nodeSelection, f.Namespace.Name) 219 if pod != nil { 220 m.pods = append(m.pods, pod) 221 } 222 } else { 223 class, claim, pod = startPausePod(f.ClientSet, scTest, nodeSelection, m.tp.scName, f.Namespace.Name) 224 if class != nil { 225 m.sc[class.Name] = class 226 } 227 if claim != nil { 228 m.pvcs = append(m.pvcs, claim) 229 } 230 if pod != nil { 231 m.pods = append(m.pods, pod) 232 } 233 } 234 return // result variables set above 235 } 236 237 createPodWithPVC := func(pvc *v1.PersistentVolumeClaim) (*v1.Pod, error) { 238 nodeSelection := m.config.ClientNodeSelection 239 pod, err := startPausePodWithClaim(m.cs, pvc, nodeSelection, f.Namespace.Name) 240 if pod != nil { 241 m.pods = append(m.pods, pod) 242 } 243 return pod, err 244 } 245 246 createPodWithFSGroup := func(fsGroup *int64) (*storagev1.StorageClass, *v1.PersistentVolumeClaim, *v1.Pod) { 247 ginkgo.By("Creating pod with fsGroup") 248 nodeSelection := m.config.ClientNodeSelection 249 sc := m.driver.GetDynamicProvisionStorageClass(m.config, "") 250 scTest := testsuites.StorageClassTest{ 251 Name: m.driver.GetDriverInfo().Name, 252 Provisioner: sc.Provisioner, 253 Parameters: sc.Parameters, 254 ClaimSize: "1Gi", 255 ExpectedSize: "1Gi", 256 DelayBinding: m.tp.lateBinding, 257 AllowVolumeExpansion: m.tp.enableResizing, 258 } 259 260 class, claim, pod := startBusyBoxPod(f.ClientSet, scTest, nodeSelection, m.tp.scName, f.Namespace.Name, fsGroup) 261 262 if class != nil { 263 m.sc[class.Name] = class 264 } 265 if claim != nil { 266 m.pvcs = append(m.pvcs, claim) 267 } 268 269 if pod != nil { 270 m.pods = append(m.pods, pod) 271 } 272 273 return class, claim, pod 274 } 275 276 cleanup := func() { 277 cs := f.ClientSet 278 var errs []error 279 280 for _, pod := range m.pods { 281 ginkgo.By(fmt.Sprintf("Deleting pod %s", pod.Name)) 282 errs = append(errs, e2epod.DeletePodWithWait(cs, pod)) 283 } 284 285 for _, claim := range m.pvcs { 286 ginkgo.By(fmt.Sprintf("Deleting claim %s", claim.Name)) 287 claim, err := cs.CoreV1().PersistentVolumeClaims(claim.Namespace).Get(context.TODO(), claim.Name, metav1.GetOptions{}) 288 if err == nil { 289 if err := cs.CoreV1().PersistentVolumeClaims(claim.Namespace).Delete(context.TODO(), claim.Name, metav1.DeleteOptions{}); err != nil { 290 errs = append(errs, err) 291 } 292 if claim.Spec.VolumeName != "" { 293 errs = append(errs, e2epv.WaitForPersistentVolumeDeleted(cs, claim.Spec.VolumeName, framework.Poll, 2*time.Minute)) 294 } 295 } 296 } 297 298 for _, sc := range m.sc { 299 ginkgo.By(fmt.Sprintf("Deleting storageclass %s", sc.Name)) 300 cs.StorageV1().StorageClasses().Delete(context.TODO(), sc.Name, metav1.DeleteOptions{}) 301 } 302 303 for _, vsc := range m.vsc { 304 ginkgo.By(fmt.Sprintf("Deleting volumesnapshotclass %s", vsc.GetName())) 305 m.config.Framework.DynamicClient.Resource(utils.SnapshotClassGVR).Delete(context.TODO(), vsc.GetName(), metav1.DeleteOptions{}) 306 } 307 ginkgo.By("Cleaning up resources") 308 for _, cleanupFunc := range m.testCleanups { 309 cleanupFunc() 310 } 311 312 err := utilerrors.NewAggregate(errs) 313 framework.ExpectNoError(err, "while cleaning up after test") 314 } 315 316 // The CSIDriverRegistry feature gate is needed for this test in Kubernetes 1.12. 317 ginkgo.Context("CSI attach test using mock driver", func() { 318 tests := []struct { 319 name string 320 disableAttach bool 321 deployClusterRegistrar bool 322 }{ 323 { 324 name: "should not require VolumeAttach for drivers without attachment", 325 disableAttach: true, 326 deployClusterRegistrar: true, 327 }, 328 { 329 name: "should require VolumeAttach for drivers with attachment", 330 deployClusterRegistrar: true, 331 }, 332 { 333 name: "should preserve attachment policy when no CSIDriver present", 334 deployClusterRegistrar: false, 335 }, 336 } 337 for _, t := range tests { 338 test := t 339 ginkgo.It(t.name, func() { 340 var err error 341 init(testParameters{registerDriver: test.deployClusterRegistrar, disableAttach: test.disableAttach}) 342 defer cleanup() 343 344 _, claim, pod := createPod(false) 345 if pod == nil { 346 return 347 } 348 err = e2epod.WaitForPodNameRunningInNamespace(m.cs, pod.Name, pod.Namespace) 349 framework.ExpectNoError(err, "Failed to start pod: %v", err) 350 351 ginkgo.By("Checking if VolumeAttachment was created for the pod") 352 handle := getVolumeHandle(m.cs, claim) 353 attachmentHash := sha256.Sum256([]byte(fmt.Sprintf("%s%s%s", handle, m.provisioner, m.config.ClientNodeSelection.Name))) 354 attachmentName := fmt.Sprintf("csi-%x", attachmentHash) 355 _, err = m.cs.StorageV1().VolumeAttachments().Get(context.TODO(), attachmentName, metav1.GetOptions{}) 356 if err != nil { 357 if apierrors.IsNotFound(err) { 358 if !test.disableAttach { 359 framework.ExpectNoError(err, "Expected VolumeAttachment but none was found") 360 } 361 } else { 362 framework.ExpectNoError(err, "Failed to find VolumeAttachment") 363 } 364 } 365 if test.disableAttach { 366 framework.ExpectError(err, "Unexpected VolumeAttachment found") 367 } 368 }) 369 370 } 371 }) 372 373 ginkgo.Context("CSI CSIDriver deployment after pod creation using non-attachable mock driver", func() { 374 ginkgo.It("should bringup pod after deploying CSIDriver attach=false [Slow]", func() { 375 var err error 376 init(testParameters{registerDriver: false, disableAttach: true}) 377 defer cleanup() 378 379 _, claim, pod := createPod(false /* persistent volume, late binding as specified above */) 380 if pod == nil { 381 return 382 } 383 384 ginkgo.By("Checking if attaching failed and pod cannot start") 385 eventSelector := fields.Set{ 386 "involvedObject.kind": "Pod", 387 "involvedObject.name": pod.Name, 388 "involvedObject.namespace": pod.Namespace, 389 "reason": events.FailedAttachVolume, 390 }.AsSelector().String() 391 msg := "AttachVolume.Attach failed for volume" 392 393 err = e2eevents.WaitTimeoutForEvent(m.cs, pod.Namespace, eventSelector, msg, f.Timeouts.PodStart) 394 if err != nil { 395 podErr := e2epod.WaitTimeoutForPodRunningInNamespace(m.cs, pod.Name, pod.Namespace, 10*time.Second) 396 framework.ExpectError(podErr, "Pod should not be in running status because attaching should failed") 397 // Events are unreliable, don't depend on the event. It's used only to speed up the test. 398 framework.Logf("Attach should fail and the corresponding event should show up, error: %v", err) 399 } 400 401 // VolumeAttachment should be created because the default value for CSI attachable is true 402 ginkgo.By("Checking if VolumeAttachment was created for the pod") 403 handle := getVolumeHandle(m.cs, claim) 404 attachmentHash := sha256.Sum256([]byte(fmt.Sprintf("%s%s%s", handle, m.provisioner, m.config.ClientNodeSelection.Name))) 405 attachmentName := fmt.Sprintf("csi-%x", attachmentHash) 406 _, err = m.cs.StorageV1().VolumeAttachments().Get(context.TODO(), attachmentName, metav1.GetOptions{}) 407 if err != nil { 408 if apierrors.IsNotFound(err) { 409 framework.ExpectNoError(err, "Expected VolumeAttachment but none was found") 410 } else { 411 framework.ExpectNoError(err, "Failed to find VolumeAttachment") 412 } 413 } 414 415 ginkgo.By("Deploy CSIDriver object with attachRequired=false") 416 driverNamespace := m.config.DriverNamespace 417 418 canAttach := false 419 o := utils.PatchCSIOptions{ 420 OldDriverName: "csi-mock", 421 NewDriverName: "csi-mock-" + f.UniqueName, 422 CanAttach: &canAttach, 423 } 424 cleanupCSIDriver, err := utils.CreateFromManifests(f, driverNamespace, func(item interface{}) error { 425 return utils.PatchCSIDeployment(f, o, item) 426 }, "test/e2e/testing-manifests/storage-csi/mock/csi-mock-driverinfo.yaml") 427 if err != nil { 428 framework.Failf("fail to deploy CSIDriver object: %v", err) 429 } 430 m.testCleanups = append(m.testCleanups, cleanupCSIDriver) 431 432 ginkgo.By("Wait for the pod in running status") 433 err = e2epod.WaitForPodNameRunningInNamespace(m.cs, pod.Name, pod.Namespace) 434 framework.ExpectNoError(err, "Failed to start pod: %v", err) 435 436 ginkgo.By(fmt.Sprintf("Wait for the volumeattachment to be deleted up to %v", csiVolumeAttachmentTimeout)) 437 // This step can be slow because we have to wait either a NodeUpdate event happens or 438 // the detachment for this volume timeout so that we can do a force detach. 439 err = waitForVolumeAttachmentTerminated(attachmentName, m.cs) 440 framework.ExpectNoError(err, "Failed to delete VolumeAttachment: %v", err) 441 }) 442 }) 443 444 ginkgo.Context("CSI workload information using mock driver", func() { 445 var ( 446 err error 447 podInfoTrue = true 448 podInfoFalse = false 449 ) 450 tests := []struct { 451 name string 452 podInfoOnMount *bool 453 deployClusterRegistrar bool 454 expectPodInfo bool 455 expectEphemeral bool 456 }{ 457 { 458 name: "should not be passed when podInfoOnMount=nil", 459 podInfoOnMount: nil, 460 deployClusterRegistrar: true, 461 expectPodInfo: false, 462 expectEphemeral: false, 463 }, 464 { 465 name: "should be passed when podInfoOnMount=true", 466 podInfoOnMount: &podInfoTrue, 467 deployClusterRegistrar: true, 468 expectPodInfo: true, 469 expectEphemeral: false, 470 }, 471 { 472 name: "contain ephemeral=true when using inline volume", 473 podInfoOnMount: &podInfoTrue, 474 deployClusterRegistrar: true, 475 expectPodInfo: true, 476 expectEphemeral: true, 477 }, 478 { 479 name: "should not be passed when podInfoOnMount=false", 480 podInfoOnMount: &podInfoFalse, 481 deployClusterRegistrar: true, 482 expectPodInfo: false, 483 expectEphemeral: false, 484 }, 485 { 486 name: "should not be passed when CSIDriver does not exist", 487 deployClusterRegistrar: false, 488 expectPodInfo: false, 489 expectEphemeral: false, 490 }, 491 } 492 for _, t := range tests { 493 test := t 494 ginkgo.It(t.name, func() { 495 init(testParameters{ 496 registerDriver: test.deployClusterRegistrar, 497 podInfo: test.podInfoOnMount}) 498 499 defer cleanup() 500 501 _, _, pod := createPod(test.expectEphemeral) 502 if pod == nil { 503 return 504 } 505 err = e2epod.WaitForPodNameRunningInNamespace(m.cs, pod.Name, pod.Namespace) 506 framework.ExpectNoError(err, "Failed to start pod: %v", err) 507 508 // If we expect an ephemeral volume, the feature has to be enabled. 509 // Otherwise need to check if we expect pod info, because the content 510 // of that depends on whether the feature is enabled or not. 511 csiInlineVolumesEnabled := test.expectEphemeral 512 if test.expectPodInfo { 513 ginkgo.By("checking for CSIInlineVolumes feature") 514 csiInlineVolumesEnabled, err = testsuites.CSIInlineVolumesEnabled(m.cs, f.Timeouts, f.Namespace.Name) 515 framework.ExpectNoError(err, "failed to test for CSIInlineVolumes") 516 } 517 518 ginkgo.By("Deleting the previously created pod") 519 err = e2epod.DeletePodWithWait(m.cs, pod) 520 framework.ExpectNoError(err, "while deleting") 521 522 ginkgo.By("Checking CSI driver logs") 523 err = checkPodLogs(m.driver.GetCalls, pod, test.expectPodInfo, test.expectEphemeral, csiInlineVolumesEnabled, false, 1) 524 framework.ExpectNoError(err) 525 }) 526 } 527 }) 528 529 ginkgo.Context("CSI volume limit information using mock driver", func() { 530 ginkgo.It("should report attach limit when limit is bigger than 0 [Slow]", func() { 531 // define volume limit to be 2 for this test 532 var err error 533 init(testParameters{attachLimit: 2}) 534 defer cleanup() 535 nodeName := m.config.ClientNodeSelection.Name 536 driverName := m.config.GetUniqueDriverName() 537 538 csiNodeAttachLimit, err := checkCSINodeForLimits(nodeName, driverName, m.cs) 539 framework.ExpectNoError(err, "while checking limits in CSINode: %v", err) 540 541 gomega.Expect(csiNodeAttachLimit).To(gomega.BeNumerically("==", 2)) 542 543 _, _, pod1 := createPod(false) 544 gomega.Expect(pod1).NotTo(gomega.BeNil(), "while creating first pod") 545 546 err = e2epod.WaitForPodNameRunningInNamespace(m.cs, pod1.Name, pod1.Namespace) 547 framework.ExpectNoError(err, "Failed to start pod1: %v", err) 548 549 _, _, pod2 := createPod(false) 550 gomega.Expect(pod2).NotTo(gomega.BeNil(), "while creating second pod") 551 552 err = e2epod.WaitForPodNameRunningInNamespace(m.cs, pod2.Name, pod2.Namespace) 553 framework.ExpectNoError(err, "Failed to start pod2: %v", err) 554 555 _, _, pod3 := createPod(false) 556 gomega.Expect(pod3).NotTo(gomega.BeNil(), "while creating third pod") 557 err = waitForMaxVolumeCondition(pod3, m.cs) 558 framework.ExpectNoError(err, "while waiting for max volume condition on pod : %+v", pod3) 559 }) 560 }) 561 562 ginkgo.Context("CSI Volume expansion", func() { 563 tests := []struct { 564 name string 565 nodeExpansionRequired bool 566 disableAttach bool 567 disableResizingOnDriver bool 568 expectFailure bool 569 }{ 570 { 571 name: "should expand volume without restarting pod if nodeExpansion=off", 572 nodeExpansionRequired: false, 573 }, 574 { 575 name: "should expand volume by restarting pod if attach=on, nodeExpansion=on", 576 nodeExpansionRequired: true, 577 }, 578 { 579 name: "should expand volume by restarting pod if attach=off, nodeExpansion=on", 580 disableAttach: true, 581 nodeExpansionRequired: true, 582 }, 583 { 584 name: "should not expand volume if resizingOnDriver=off, resizingOnSC=on", 585 disableResizingOnDriver: true, 586 expectFailure: true, 587 }, 588 } 589 for _, t := range tests { 590 test := t 591 ginkgo.It(t.name, func() { 592 var err error 593 tp := testParameters{ 594 enableResizing: true, 595 enableNodeExpansion: test.nodeExpansionRequired, 596 disableResizingOnDriver: test.disableResizingOnDriver, 597 } 598 // disabling attach requires drive registration feature 599 if test.disableAttach { 600 tp.disableAttach = true 601 tp.registerDriver = true 602 } 603 604 init(tp) 605 defer cleanup() 606 607 sc, pvc, pod := createPod(false) 608 gomega.Expect(pod).NotTo(gomega.BeNil(), "while creating pod for resizing") 609 610 framework.ExpectEqual(*sc.AllowVolumeExpansion, true, "failed creating sc with allowed expansion") 611 612 err = e2epod.WaitForPodNameRunningInNamespace(m.cs, pod.Name, pod.Namespace) 613 framework.ExpectNoError(err, "Failed to start pod1: %v", err) 614 615 ginkgo.By("Expanding current pvc") 616 newSize := resource.MustParse("6Gi") 617 newPVC, err := testsuites.ExpandPVCSize(pvc, newSize, m.cs) 618 framework.ExpectNoError(err, "While updating pvc for more size") 619 pvc = newPVC 620 gomega.Expect(pvc).NotTo(gomega.BeNil()) 621 622 pvcSize := pvc.Spec.Resources.Requests[v1.ResourceStorage] 623 if pvcSize.Cmp(newSize) != 0 { 624 framework.Failf("error updating pvc size %q", pvc.Name) 625 } 626 if test.expectFailure { 627 err = testsuites.WaitForResizingCondition(pvc, m.cs, csiResizingConditionWait) 628 framework.ExpectError(err, "unexpected resizing condition on PVC") 629 return 630 } 631 632 ginkgo.By("Waiting for persistent volume resize to finish") 633 err = testsuites.WaitForControllerVolumeResize(pvc, m.cs, csiResizeWaitPeriod) 634 framework.ExpectNoError(err, "While waiting for CSI PV resize to finish") 635 636 checkPVCSize := func() { 637 ginkgo.By("Waiting for PVC resize to finish") 638 pvc, err = testsuites.WaitForFSResize(pvc, m.cs) 639 framework.ExpectNoError(err, "while waiting for PVC resize to finish") 640 641 pvcConditions := pvc.Status.Conditions 642 framework.ExpectEqual(len(pvcConditions), 0, "pvc should not have conditions") 643 } 644 645 // if node expansion is not required PVC should be resized as well 646 if !test.nodeExpansionRequired { 647 checkPVCSize() 648 } else { 649 ginkgo.By("Checking for conditions on pvc") 650 npvc, err := testsuites.WaitForPendingFSResizeCondition(pvc, m.cs) 651 framework.ExpectNoError(err, "While waiting for pvc to have fs resizing condition") 652 pvc = npvc 653 654 inProgressConditions := pvc.Status.Conditions 655 if len(inProgressConditions) > 0 { 656 framework.ExpectEqual(inProgressConditions[0].Type, v1.PersistentVolumeClaimFileSystemResizePending, "pvc must have fs resizing condition") 657 } 658 659 ginkgo.By("Deleting the previously created pod") 660 err = e2epod.DeletePodWithWait(m.cs, pod) 661 framework.ExpectNoError(err, "while deleting pod for resizing") 662 663 ginkgo.By("Creating a new pod with same volume") 664 pod2, err := createPodWithPVC(pvc) 665 gomega.Expect(pod2).NotTo(gomega.BeNil(), "while creating pod for csi resizing") 666 framework.ExpectNoError(err, "while recreating pod for resizing") 667 668 checkPVCSize() 669 } 670 }) 671 } 672 }) 673 ginkgo.Context("CSI online volume expansion", func() { 674 tests := []struct { 675 name string 676 disableAttach bool 677 }{ 678 { 679 name: "should expand volume without restarting pod if attach=on, nodeExpansion=on", 680 }, 681 { 682 name: "should expand volume without restarting pod if attach=off, nodeExpansion=on", 683 disableAttach: true, 684 }, 685 } 686 for _, t := range tests { 687 test := t 688 ginkgo.It(test.name, func() { 689 var err error 690 params := testParameters{enableResizing: true, enableNodeExpansion: true} 691 if test.disableAttach { 692 params.disableAttach = true 693 params.registerDriver = true 694 } 695 696 init(params) 697 698 defer cleanup() 699 700 sc, pvc, pod := createPod(false) 701 gomega.Expect(pod).NotTo(gomega.BeNil(), "while creating pod for resizing") 702 703 framework.ExpectEqual(*sc.AllowVolumeExpansion, true, "failed creating sc with allowed expansion") 704 705 err = e2epod.WaitForPodNameRunningInNamespace(m.cs, pod.Name, pod.Namespace) 706 framework.ExpectNoError(err, "Failed to start pod1: %v", err) 707 708 ginkgo.By("Expanding current pvc") 709 newSize := resource.MustParse("6Gi") 710 newPVC, err := testsuites.ExpandPVCSize(pvc, newSize, m.cs) 711 framework.ExpectNoError(err, "While updating pvc for more size") 712 pvc = newPVC 713 gomega.Expect(pvc).NotTo(gomega.BeNil()) 714 715 pvcSize := pvc.Spec.Resources.Requests[v1.ResourceStorage] 716 if pvcSize.Cmp(newSize) != 0 { 717 framework.Failf("error updating pvc size %q", pvc.Name) 718 } 719 720 ginkgo.By("Waiting for persistent volume resize to finish") 721 err = testsuites.WaitForControllerVolumeResize(pvc, m.cs, csiResizeWaitPeriod) 722 framework.ExpectNoError(err, "While waiting for PV resize to finish") 723 724 ginkgo.By("Waiting for PVC resize to finish") 725 pvc, err = testsuites.WaitForFSResize(pvc, m.cs) 726 framework.ExpectNoError(err, "while waiting for PVC to finish") 727 728 pvcConditions := pvc.Status.Conditions 729 framework.ExpectEqual(len(pvcConditions), 0, "pvc should not have conditions") 730 731 }) 732 } 733 }) 734 735 ginkgo.Context("CSI NodeStage error cases [Slow]", func() { 736 trackedCalls := []string{ 737 "NodeStageVolume", 738 "NodeUnstageVolume", 739 } 740 741 tests := []struct { 742 name string 743 expectPodRunning bool 744 expectedCalls []csiCall 745 746 // Called for each NodeStateVolume calls, with counter incremented atomically before 747 // the invocation (i.e. first value will be 1). 748 nodeStageHook func(counter int64) error 749 }{ 750 { 751 // This is already tested elsewhere, adding simple good case here to test the test framework. 752 name: "should call NodeUnstage after NodeStage success", 753 expectPodRunning: true, 754 expectedCalls: []csiCall{ 755 {expectedMethod: "NodeStageVolume", expectedError: codes.OK, deletePod: true}, 756 {expectedMethod: "NodeUnstageVolume", expectedError: codes.OK}, 757 }, 758 }, 759 { 760 // Kubelet should repeat NodeStage as long as the pod exists 761 name: "should retry NodeStage after NodeStage final error", 762 expectPodRunning: true, 763 expectedCalls: []csiCall{ 764 // This matches all 3 NodeStage calls with InvalidArgument error 765 {expectedMethod: "NodeStageVolume", expectedError: codes.InvalidArgument}, 766 {expectedMethod: "NodeStageVolume", expectedError: codes.OK, deletePod: true}, 767 {expectedMethod: "NodeUnstageVolume", expectedError: codes.OK}, 768 }, 769 // Fail first 3 NodeStage requests, 4th succeeds 770 nodeStageHook: func(counter int64) error { 771 if counter < 4 { 772 return status.Error(codes.InvalidArgument, "fake error") 773 } 774 return nil 775 }, 776 }, 777 { 778 // Kubelet should repeat NodeStage as long as the pod exists 779 name: "should retry NodeStage after NodeStage ephemeral error", 780 expectPodRunning: true, 781 expectedCalls: []csiCall{ 782 // This matches all 3 NodeStage calls with DeadlineExceeded error 783 {expectedMethod: "NodeStageVolume", expectedError: codes.DeadlineExceeded}, 784 {expectedMethod: "NodeStageVolume", expectedError: codes.OK, deletePod: true}, 785 {expectedMethod: "NodeUnstageVolume", expectedError: codes.OK}, 786 }, 787 // Fail first 3 NodeStage requests, 4th succeeds 788 nodeStageHook: func(counter int64) error { 789 if counter < 4 { 790 return status.Error(codes.DeadlineExceeded, "fake error") 791 } 792 return nil 793 }, 794 }, 795 { 796 // After NodeUnstage with ephemeral error, the driver may continue staging the volume. 797 // Kubelet should call NodeUnstage to make sure the volume is really unstaged after 798 // the pod is deleted. 799 name: "should call NodeUnstage after NodeStage ephemeral error", 800 expectPodRunning: false, 801 expectedCalls: []csiCall{ 802 // Delete the pod before NodeStage succeeds - it should get "uncertain" because of ephemeral error 803 // This matches all repeated NodeStage calls with DeadlineExceeded error (due to exp. backoff). 804 {expectedMethod: "NodeStageVolume", expectedError: codes.DeadlineExceeded, deletePod: true}, 805 {expectedMethod: "NodeUnstageVolume", expectedError: codes.OK}, 806 }, 807 nodeStageHook: func(counter int64) error { 808 return status.Error(codes.DeadlineExceeded, "fake error") 809 }, 810 }, 811 { 812 // After NodeUnstage with final error, kubelet can be sure the volume is not staged. 813 // The test checks that NodeUnstage is *not* called. 814 name: "should not call NodeUnstage after NodeStage final error", 815 expectPodRunning: false, 816 expectedCalls: []csiCall{ 817 // Delete the pod before NodeStage succeeds - it should get "globally unmounted" because of final error. 818 // This matches all repeated NodeStage calls with InvalidArgument error (due to exp. backoff). 819 {expectedMethod: "NodeStageVolume", expectedError: codes.InvalidArgument, deletePod: true}, 820 }, 821 // nodeStageScript: `INVALIDARGUMENT;`, 822 nodeStageHook: func(counter int64) error { 823 return status.Error(codes.InvalidArgument, "fake error") 824 }, 825 }, 826 } 827 for _, t := range tests { 828 test := t 829 ginkgo.It(test.name, func() { 830 var hooks *drivers.Hooks 831 if test.nodeStageHook != nil { 832 hooks = createPreHook("NodeStageVolume", test.nodeStageHook) 833 } 834 init(testParameters{ 835 disableAttach: true, 836 registerDriver: true, 837 hooks: hooks, 838 }) 839 defer cleanup() 840 841 _, claim, pod := createPod(false) 842 if pod == nil { 843 return 844 } 845 // Wait for PVC to get bound to make sure the CSI driver is fully started. 846 err := e2epv.WaitForPersistentVolumeClaimPhase(v1.ClaimBound, f.ClientSet, f.Namespace.Name, claim.Name, time.Second, framework.ClaimProvisionTimeout) 847 framework.ExpectNoError(err, "while waiting for PVC to get provisioned") 848 849 ginkgo.By("Waiting for expected CSI calls") 850 // Watch for all calls up to deletePod = true 851 ctx, cancel := context.WithTimeout(context.Background(), csiPodRunningTimeout) 852 defer cancel() 853 for { 854 if ctx.Err() != nil { 855 framework.Failf("timed out waiting for the CSI call that indicates that the pod can be deleted: %v", test.expectedCalls) 856 } 857 time.Sleep(1 * time.Second) 858 _, index, err := compareCSICalls(trackedCalls, test.expectedCalls, m.driver.GetCalls) 859 framework.ExpectNoError(err, "while waiting for initial CSI calls") 860 if index == 0 { 861 // No CSI call received yet 862 continue 863 } 864 // Check the last *received* call wanted the pod to be deleted 865 if test.expectedCalls[index-1].deletePod { 866 break 867 } 868 } 869 870 if test.expectPodRunning { 871 ginkgo.By("Waiting for pod to be running") 872 err := e2epod.WaitForPodNameRunningInNamespace(m.cs, pod.Name, pod.Namespace) 873 framework.ExpectNoError(err, "Failed to start pod: %v", err) 874 } 875 876 ginkgo.By("Deleting the previously created pod") 877 err = e2epod.DeletePodWithWait(m.cs, pod) 878 framework.ExpectNoError(err, "while deleting") 879 880 ginkgo.By("Waiting for all remaining expected CSI calls") 881 err = wait.Poll(time.Second, csiUnstageWaitTimeout, func() (done bool, err error) { 882 _, index, err := compareCSICalls(trackedCalls, test.expectedCalls, m.driver.GetCalls) 883 if err != nil { 884 return true, err 885 } 886 if index == 0 { 887 // No CSI call received yet 888 return false, nil 889 } 890 if len(test.expectedCalls) == index { 891 // all calls received 892 return true, nil 893 } 894 return false, nil 895 }) 896 framework.ExpectNoError(err, "while waiting for all CSI calls") 897 }) 898 } 899 }) 900 901 ginkgo.Context("CSI NodeUnstage error cases [Slow]", func() { 902 trackedCalls := []string{ 903 "NodeStageVolume", 904 "NodeUnstageVolume", 905 } 906 907 // Each test starts two pods in sequence. 908 // The first pod always runs successfully, but NodeUnstage hook can set various error conditions. 909 // The test then checks how NodeStage of the second pod is called. 910 tests := []struct { 911 name string 912 expectedCalls []csiCall 913 914 // Called for each NodeStageVolume calls, with counter incremented atomically before 915 // the invocation (i.e. first value will be 1) and index of deleted pod (the first pod 916 // has index 1) 917 nodeUnstageHook func(counter, pod int64) error 918 }{ 919 { 920 // This is already tested elsewhere, adding simple good case here to test the test framework. 921 name: "should call NodeStage after NodeUnstage success", 922 expectedCalls: []csiCall{ 923 {expectedMethod: "NodeStageVolume", expectedError: codes.OK}, 924 {expectedMethod: "NodeUnstageVolume", expectedError: codes.OK}, 925 {expectedMethod: "NodeStageVolume", expectedError: codes.OK}, 926 {expectedMethod: "NodeUnstageVolume", expectedError: codes.OK}, 927 }, 928 }, 929 { 930 name: "two pods: should call NodeStage after previous NodeUnstage final error", 931 expectedCalls: []csiCall{ 932 {expectedMethod: "NodeStageVolume", expectedError: codes.OK}, 933 {expectedMethod: "NodeUnstageVolume", expectedError: codes.InvalidArgument}, 934 {expectedMethod: "NodeStageVolume", expectedError: codes.OK}, 935 {expectedMethod: "NodeUnstageVolume", expectedError: codes.OK}, 936 }, 937 nodeUnstageHook: func(counter, pod int64) error { 938 if pod == 1 { 939 return status.Error(codes.InvalidArgument, "fake final error") 940 } 941 return nil 942 }, 943 }, 944 { 945 name: "two pods: should call NodeStage after previous NodeUnstage transient error", 946 expectedCalls: []csiCall{ 947 {expectedMethod: "NodeStageVolume", expectedError: codes.OK}, 948 {expectedMethod: "NodeUnstageVolume", expectedError: codes.DeadlineExceeded}, 949 {expectedMethod: "NodeStageVolume", expectedError: codes.OK}, 950 {expectedMethod: "NodeUnstageVolume", expectedError: codes.OK}, 951 }, 952 nodeUnstageHook: func(counter, pod int64) error { 953 if pod == 1 { 954 return status.Error(codes.DeadlineExceeded, "fake transient error") 955 } 956 return nil 957 }, 958 }, 959 } 960 for _, t := range tests { 961 test := t 962 ginkgo.It(test.name, func() { 963 // Index of the last deleted pod. NodeUnstage calls are then related to this pod. 964 var deletedPodNumber int64 = 1 965 var hooks *drivers.Hooks 966 if test.nodeUnstageHook != nil { 967 hooks = createPreHook("NodeUnstageVolume", func(counter int64) error { 968 pod := atomic.LoadInt64(&deletedPodNumber) 969 return test.nodeUnstageHook(counter, pod) 970 }) 971 } 972 init(testParameters{ 973 disableAttach: true, 974 registerDriver: true, 975 hooks: hooks, 976 }) 977 defer cleanup() 978 979 _, claim, pod := createPod(false) 980 if pod == nil { 981 return 982 } 983 // Wait for PVC to get bound to make sure the CSI driver is fully started. 984 err := e2epv.WaitForPersistentVolumeClaimPhase(v1.ClaimBound, f.ClientSet, f.Namespace.Name, claim.Name, time.Second, framework.ClaimProvisionTimeout) 985 framework.ExpectNoError(err, "while waiting for PVC to get provisioned") 986 err = e2epod.WaitForPodNameRunningInNamespace(m.cs, pod.Name, pod.Namespace) 987 framework.ExpectNoError(err, "while waiting for the first pod to start") 988 err = e2epod.DeletePodWithWait(m.cs, pod) 989 framework.ExpectNoError(err, "while deleting the first pod") 990 991 // Create the second pod 992 pod, err = createPodWithPVC(claim) 993 framework.ExpectNoError(err, "while creating the second pod") 994 err = e2epod.WaitForPodNameRunningInNamespace(m.cs, pod.Name, pod.Namespace) 995 framework.ExpectNoError(err, "while waiting for the second pod to start") 996 // The second pod is running and kubelet can't call NodeUnstage of the first one. 997 // Therefore incrementing the pod counter is safe here. 998 atomic.AddInt64(&deletedPodNumber, 1) 999 err = e2epod.DeletePodWithWait(m.cs, pod) 1000 framework.ExpectNoError(err, "while deleting the second pod") 1001 1002 ginkgo.By("Waiting for all remaining expected CSI calls") 1003 err = wait.Poll(time.Second, csiUnstageWaitTimeout, func() (done bool, err error) { 1004 _, index, err := compareCSICalls(trackedCalls, test.expectedCalls, m.driver.GetCalls) 1005 if err != nil { 1006 return true, err 1007 } 1008 if index == 0 { 1009 // No CSI call received yet 1010 return false, nil 1011 } 1012 if len(test.expectedCalls) == index { 1013 // all calls received 1014 return true, nil 1015 } 1016 return false, nil 1017 }) 1018 framework.ExpectNoError(err, "while waiting for all CSI calls") 1019 }) 1020 } 1021 }) 1022 1023 ginkgo.Context("storage capacity", func() { 1024 tests := []struct { 1025 name string 1026 resourceExhausted bool 1027 lateBinding bool 1028 topology bool 1029 }{ 1030 { 1031 name: "unlimited", 1032 }, 1033 { 1034 name: "exhausted, immediate binding", 1035 resourceExhausted: true, 1036 }, 1037 { 1038 name: "exhausted, late binding, no topology", 1039 resourceExhausted: true, 1040 lateBinding: true, 1041 }, 1042 { 1043 name: "exhausted, late binding, with topology", 1044 resourceExhausted: true, 1045 lateBinding: true, 1046 topology: true, 1047 }, 1048 } 1049 1050 createVolume := "CreateVolume" 1051 deleteVolume := "DeleteVolume" 1052 // publishVolume := "NodePublishVolume" 1053 // unpublishVolume := "NodeUnpublishVolume" 1054 // stageVolume := "NodeStageVolume" 1055 // unstageVolume := "NodeUnstageVolume" 1056 1057 // These calls are assumed to occur in this order for 1058 // each test run. NodeStageVolume and 1059 // NodePublishVolume should also be deterministic and 1060 // only get called once, but sometimes kubelet calls 1061 // both multiple times, which breaks this test 1062 // (https://github.com/kubernetes/kubernetes/issues/90250). 1063 // Therefore they are temporarily commented out until 1064 // that issue is resolved. 1065 // 1066 // NodeUnpublishVolume and NodeUnstageVolume are racing 1067 // with DeleteVolume, so we cannot assume a deterministic 1068 // order and have to ignore them 1069 // (https://github.com/kubernetes/kubernetes/issues/94108). 1070 deterministicCalls := []string{ 1071 createVolume, 1072 // stageVolume, 1073 // publishVolume, 1074 // unpublishVolume, 1075 // unstageVolume, 1076 deleteVolume, 1077 } 1078 1079 for _, t := range tests { 1080 test := t 1081 ginkgo.It(test.name, func() { 1082 var err error 1083 params := testParameters{ 1084 lateBinding: test.lateBinding, 1085 enableTopology: test.topology, 1086 1087 // Not strictly necessary, but runs a bit faster this way 1088 // and for a while there also was a problem with a two minuted delay 1089 // due to a bug (https://github.com/kubernetes-csi/csi-test/pull/250). 1090 disableAttach: true, 1091 registerDriver: true, 1092 } 1093 1094 if test.resourceExhausted { 1095 params.hooks = createPreHook("CreateVolume", func(counter int64) error { 1096 if counter%2 != 0 { 1097 return status.Error(codes.ResourceExhausted, "fake error") 1098 } 1099 return nil 1100 }) 1101 } 1102 1103 init(params) 1104 defer cleanup() 1105 1106 ctx, cancel := context.WithTimeout(context.Background(), csiPodRunningTimeout) 1107 defer cancel() 1108 1109 // In contrast to the raw watch, RetryWatcher is expected to deliver all events even 1110 // when the underlying raw watch gets closed prematurely 1111 // (https://github.com/kubernetes/kubernetes/pull/93777#discussion_r467932080). 1112 // This is important because below the test is going to make assertions about the 1113 // PVC state changes. 1114 initResource, err := f.ClientSet.CoreV1().PersistentVolumeClaims(f.Namespace.Name).List(ctx, metav1.ListOptions{}) 1115 framework.ExpectNoError(err, "Failed to fetch initial PVC resource") 1116 listWatcher := &cachetools.ListWatch{ 1117 WatchFunc: func(listOptions metav1.ListOptions) (watch.Interface, error) { 1118 return f.ClientSet.CoreV1().PersistentVolumeClaims(f.Namespace.Name).Watch(ctx, listOptions) 1119 }, 1120 } 1121 pvcWatch, err := watchtools.NewRetryWatcher(initResource.GetResourceVersion(), listWatcher) 1122 framework.ExpectNoError(err, "create PVC watch") 1123 defer pvcWatch.Stop() 1124 1125 sc, claim, pod := createPod(false) 1126 gomega.Expect(pod).NotTo(gomega.BeNil(), "while creating pod") 1127 bindingMode := storagev1.VolumeBindingImmediate 1128 if test.lateBinding { 1129 bindingMode = storagev1.VolumeBindingWaitForFirstConsumer 1130 } 1131 framework.ExpectEqual(*sc.VolumeBindingMode, bindingMode, "volume binding mode") 1132 1133 err = e2epod.WaitForPodNameRunningInNamespace(m.cs, pod.Name, pod.Namespace) 1134 framework.ExpectNoError(err, "failed to start pod") 1135 err = e2epod.DeletePodWithWait(m.cs, pod) 1136 framework.ExpectNoError(err, "failed to delete pod") 1137 err = m.cs.CoreV1().PersistentVolumeClaims(claim.Namespace).Delete(ctx, claim.Name, metav1.DeleteOptions{}) 1138 framework.ExpectNoError(err, "failed to delete claim") 1139 1140 normal := []csiCall{} 1141 for _, method := range deterministicCalls { 1142 normal = append(normal, csiCall{expectedMethod: method}) 1143 } 1144 expected := normal 1145 // When simulating limited capacity, 1146 // we expect exactly two CreateVolume 1147 // calls because the first one should 1148 // have failed. 1149 if test.resourceExhausted { 1150 expected = []csiCall{ 1151 {expectedMethod: createVolume, expectedError: codes.ResourceExhausted}, 1152 } 1153 expected = append(expected, normal...) 1154 } 1155 1156 var calls []drivers.MockCSICall 1157 err = wait.PollImmediateUntil(time.Second, func() (done bool, err error) { 1158 c, index, err := compareCSICalls(deterministicCalls, expected, m.driver.GetCalls) 1159 if err != nil { 1160 return true, fmt.Errorf("error waiting for expected CSI calls: %s", err) 1161 } 1162 calls = c 1163 if index == 0 { 1164 // No CSI call received yet 1165 return false, nil 1166 } 1167 if len(expected) == index { 1168 // all calls received 1169 return true, nil 1170 } 1171 return false, nil 1172 }, ctx.Done()) 1173 framework.ExpectNoError(err, "while waiting for all CSI calls") 1174 1175 // The capacity error is dealt with in two different ways. 1176 // 1177 // For delayed binding, the external-provisioner should unset the node annotation 1178 // to give the scheduler the opportunity to reschedule the pod onto a different 1179 // node. 1180 // 1181 // For immediate binding, the external-scheduler must keep retrying. 1182 // 1183 // Unfortunately, the call log is the same in both cases. We have to collect 1184 // additional evidence that rescheduling really happened. What we have observed 1185 // above is how the PVC changed over time. Now we can analyze that. 1186 ginkgo.By("Checking PVC events") 1187 nodeAnnotationSet := false 1188 nodeAnnotationReset := false 1189 watchFailed := false 1190 loop: 1191 for { 1192 select { 1193 case event, ok := <-pvcWatch.ResultChan(): 1194 if !ok { 1195 watchFailed = true 1196 break loop 1197 } 1198 1199 framework.Logf("PVC event %s: %#v", event.Type, event.Object) 1200 switch event.Type { 1201 case watch.Modified: 1202 pvc, ok := event.Object.(*v1.PersistentVolumeClaim) 1203 if !ok { 1204 framework.Failf("PVC watch sent %#v instead of a PVC", event.Object) 1205 } 1206 _, set := pvc.Annotations["volume.kubernetes.io/selected-node"] 1207 if set { 1208 nodeAnnotationSet = true 1209 } else if nodeAnnotationSet { 1210 nodeAnnotationReset = true 1211 } 1212 case watch.Deleted: 1213 break loop 1214 case watch.Error: 1215 watchFailed = true 1216 break 1217 } 1218 case <-ctx.Done(): 1219 framework.Failf("Timeout while waiting to observe PVC list") 1220 } 1221 } 1222 1223 // More tests when capacity is limited. 1224 if test.resourceExhausted { 1225 for _, call := range calls { 1226 if call.Method == createVolume { 1227 gomega.Expect(call.Error).To(gomega.ContainSubstring("code = ResourceExhausted"), "first CreateVolume error in\n%s", calls) 1228 break 1229 } 1230 } 1231 1232 switch { 1233 case watchFailed: 1234 // If the watch failed or stopped prematurely (which can happen at any time), then we cannot 1235 // verify whether the annotation was set as expected. This is still considered a successful 1236 // test. 1237 framework.Logf("PVC watch delivered incomplete data, cannot check annotation") 1238 case test.lateBinding: 1239 gomega.Expect(nodeAnnotationSet).To(gomega.BeTrue(), "selected-node should have been set") 1240 // Whether it gets reset depends on whether we have topology enabled. Without 1241 // it, rescheduling is unnecessary. 1242 if test.topology { 1243 gomega.Expect(nodeAnnotationReset).To(gomega.BeTrue(), "selected-node should have been set") 1244 } else { 1245 gomega.Expect(nodeAnnotationReset).To(gomega.BeFalse(), "selected-node should not have been reset") 1246 } 1247 default: 1248 gomega.Expect(nodeAnnotationSet).To(gomega.BeFalse(), "selected-node should not have been set") 1249 gomega.Expect(nodeAnnotationReset).To(gomega.BeFalse(), "selected-node should not have been reset") 1250 } 1251 } 1252 }) 1253 } 1254 }) 1255 1256 // These tests *only* work on a cluster which has the CSIStorageCapacity feature enabled. 1257 ginkgo.Context("CSIStorageCapacity", func() { 1258 var ( 1259 err error 1260 yes = true 1261 no = false 1262 ) 1263 // Tests that expect a failure are slow because we have to wait for a while 1264 // to be sure that the volume isn't getting created. 1265 tests := []struct { 1266 name string 1267 storageCapacity *bool 1268 capacities []string 1269 expectFailure bool 1270 }{ 1271 { 1272 name: "CSIStorageCapacity unused", 1273 }, 1274 { 1275 name: "CSIStorageCapacity disabled", 1276 storageCapacity: &no, 1277 }, 1278 { 1279 name: "CSIStorageCapacity used, no capacity", 1280 storageCapacity: &yes, 1281 expectFailure: true, 1282 }, 1283 { 1284 name: "CSIStorageCapacity used, insufficient capacity", 1285 storageCapacity: &yes, 1286 expectFailure: true, 1287 capacities: []string{"1Mi"}, 1288 }, 1289 { 1290 name: "CSIStorageCapacity used, have capacity", 1291 storageCapacity: &yes, 1292 capacities: []string{"100Gi"}, 1293 }, 1294 // We could add more test cases here for 1295 // various situations, but covering those via 1296 // the scheduler binder unit tests is faster. 1297 } 1298 for _, t := range tests { 1299 test := t 1300 ginkgo.It(t.name, func() { 1301 scName := "mock-csi-storage-capacity-" + f.UniqueName 1302 init(testParameters{ 1303 registerDriver: true, 1304 scName: scName, 1305 storageCapacity: test.storageCapacity, 1306 lateBinding: true, 1307 }) 1308 defer cleanup() 1309 1310 // The storage class uses a random name, therefore we have to create it first 1311 // before adding CSIStorageCapacity objects for it. 1312 for _, capacityStr := range test.capacities { 1313 capacityQuantity := resource.MustParse(capacityStr) 1314 capacity := &storagev1beta1.CSIStorageCapacity{ 1315 ObjectMeta: metav1.ObjectMeta{ 1316 GenerateName: "fake-capacity-", 1317 }, 1318 // Empty topology, usable by any node. 1319 StorageClassName: scName, 1320 NodeTopology: &metav1.LabelSelector{}, 1321 Capacity: &capacityQuantity, 1322 } 1323 createdCapacity, err := f.ClientSet.StorageV1beta1().CSIStorageCapacities(f.Namespace.Name).Create(context.Background(), capacity, metav1.CreateOptions{}) 1324 framework.ExpectNoError(err, "create CSIStorageCapacity %+v", *capacity) 1325 m.testCleanups = append(m.testCleanups, func() { 1326 f.ClientSet.StorageV1beta1().CSIStorageCapacities(f.Namespace.Name).Delete(context.Background(), createdCapacity.Name, metav1.DeleteOptions{}) 1327 }) 1328 } 1329 1330 // kube-scheduler may need some time before it gets the CSIDriver and CSIStorageCapacity objects. 1331 // Without them, scheduling doesn't run as expected by the test. 1332 syncDelay := 5 * time.Second 1333 time.Sleep(syncDelay) 1334 1335 sc, _, pod := createPod(false /* persistent volume, late binding as specified above */) 1336 framework.ExpectEqual(sc.Name, scName, "pre-selected storage class name not used") 1337 1338 waitCtx, cancel := context.WithTimeout(context.Background(), f.Timeouts.PodStart) 1339 defer cancel() 1340 condition := anyOf( 1341 podRunning(waitCtx, f.ClientSet, pod.Name, pod.Namespace), 1342 // We only just created the CSIStorageCapacity objects, therefore 1343 // we have to ignore all older events, plus the syncDelay as our 1344 // safety margin. 1345 podHasStorage(waitCtx, f.ClientSet, pod.Name, pod.Namespace, time.Now().Add(syncDelay)), 1346 ) 1347 err = wait.PollImmediateUntil(poll, condition, waitCtx.Done()) 1348 if test.expectFailure { 1349 switch { 1350 case errors.Is(err, context.DeadlineExceeded), 1351 errors.Is(err, wait.ErrWaitTimeout), 1352 errors.Is(err, errNotEnoughSpace): 1353 // Okay, we expected that. 1354 case err == nil: 1355 framework.Fail("pod unexpectedly started to run") 1356 default: 1357 framework.Failf("unexpected error while waiting for pod: %v", err) 1358 } 1359 } else { 1360 framework.ExpectNoError(err, "failed to start pod") 1361 } 1362 1363 ginkgo.By("Deleting the previously created pod") 1364 err = e2epod.DeletePodWithWait(m.cs, pod) 1365 framework.ExpectNoError(err, "while deleting") 1366 }) 1367 } 1368 }) 1369 1370 ginkgo.Context("CSI Volume Snapshots [Feature:VolumeSnapshotDataSource]", func() { 1371 tests := []struct { 1372 name string 1373 createSnapshotHook func(counter int64) error 1374 }{ 1375 { 1376 name: "volumesnapshotcontent and pvc in Bound state with deletion timestamp set should not get deleted while snapshot finalizer exists", 1377 createSnapshotHook: func(counter int64) error { 1378 if counter < 8 { 1379 return status.Error(codes.DeadlineExceeded, "fake error") 1380 } 1381 return nil 1382 }, 1383 }, 1384 } 1385 for _, test := range tests { 1386 test := test 1387 ginkgo.It(test.name, func() { 1388 var hooks *drivers.Hooks 1389 if test.createSnapshotHook != nil { 1390 hooks = createPreHook("CreateSnapshot", test.createSnapshotHook) 1391 } 1392 init(testParameters{ 1393 disableAttach: true, 1394 registerDriver: true, 1395 enableSnapshot: true, 1396 hooks: hooks, 1397 }) 1398 sDriver, ok := m.driver.(storageframework.SnapshottableTestDriver) 1399 if !ok { 1400 e2eskipper.Skipf("mock driver %s does not support snapshots -- skipping", m.driver.GetDriverInfo().Name) 1401 1402 } 1403 ctx, cancel := context.WithTimeout(context.Background(), csiPodRunningTimeout) 1404 defer cancel() 1405 defer cleanup() 1406 1407 sc := m.driver.GetDynamicProvisionStorageClass(m.config, "") 1408 ginkgo.By("Creating storage class") 1409 class, err := m.cs.StorageV1().StorageClasses().Create(context.TODO(), sc, metav1.CreateOptions{}) 1410 framework.ExpectNoError(err, "Failed to create class: %v", err) 1411 m.sc[class.Name] = class 1412 claim := e2epv.MakePersistentVolumeClaim(e2epv.PersistentVolumeClaimConfig{ 1413 // Use static name so that the volumesnapshot can be created before the pvc. 1414 Name: "snapshot-test-pvc", 1415 StorageClassName: &(class.Name), 1416 }, f.Namespace.Name) 1417 1418 ginkgo.By("Creating snapshot") 1419 // TODO: Test VolumeSnapshots with Retain policy 1420 parameters := map[string]string{} 1421 snapshotClass, snapshot := storageframework.CreateSnapshot(sDriver, m.config, storageframework.DynamicSnapshotDelete, claim.Name, claim.Namespace, f.Timeouts, parameters) 1422 framework.ExpectNoError(err, "failed to create snapshot") 1423 m.vsc[snapshotClass.GetName()] = snapshotClass 1424 volumeSnapshotName := snapshot.GetName() 1425 1426 ginkgo.By(fmt.Sprintf("Creating PVC %s/%s", claim.Namespace, claim.Name)) 1427 claim, err = m.cs.CoreV1().PersistentVolumeClaims(f.Namespace.Name).Create(context.TODO(), claim, metav1.CreateOptions{}) 1428 framework.ExpectNoError(err, "Failed to create claim: %v", err) 1429 1430 ginkgo.By(fmt.Sprintf("Wait for finalizer to be added to claim %s/%s", claim.Namespace, claim.Name)) 1431 err = e2epv.WaitForPVCFinalizer(ctx, m.cs, claim.Name, claim.Namespace, pvcAsSourceProtectionFinalizer, 1*time.Millisecond, 1*time.Minute) 1432 framework.ExpectNoError(err) 1433 1434 ginkgo.By("Wait for PVC to be Bound") 1435 _, err = e2epv.WaitForPVClaimBoundPhase(m.cs, []*v1.PersistentVolumeClaim{claim}, 1*time.Minute) 1436 framework.ExpectNoError(err, "Failed to create claim: %v", err) 1437 1438 ginkgo.By(fmt.Sprintf("Delete PVC %s", claim.Name)) 1439 err = e2epv.DeletePersistentVolumeClaim(m.cs, claim.Name, claim.Namespace) 1440 framework.ExpectNoError(err, "failed to delete pvc") 1441 1442 ginkgo.By("Get PVC from API server and verify deletion timestamp is set") 1443 claim, err = m.cs.CoreV1().PersistentVolumeClaims(f.Namespace.Name).Get(context.TODO(), claim.Name, metav1.GetOptions{}) 1444 if err != nil { 1445 if !apierrors.IsNotFound(err) { 1446 framework.ExpectNoError(err, "Failed to get claim: %v", err) 1447 } 1448 framework.Logf("PVC not found. Continuing to test VolumeSnapshotContent finalizer") 1449 } else if claim.DeletionTimestamp == nil { 1450 framework.Failf("Expected deletion timestamp to be set on PVC: %v", claim) 1451 } 1452 1453 ginkgo.By(fmt.Sprintf("Get VolumeSnapshotContent bound to VolumeSnapshot %s", snapshot.GetName())) 1454 snapshotContent := utils.GetSnapshotContentFromSnapshot(m.config.Framework.DynamicClient, snapshot) 1455 volumeSnapshotContentName := snapshotContent.GetName() 1456 1457 ginkgo.By(fmt.Sprintf("Verify VolumeSnapshotContent %s contains finalizer %s", snapshot.GetName(), volumeSnapshotContentFinalizer)) 1458 err = utils.WaitForGVRFinalizer(ctx, m.config.Framework.DynamicClient, utils.SnapshotContentGVR, volumeSnapshotContentName, "", volumeSnapshotContentFinalizer, 1*time.Millisecond, 1*time.Minute) 1459 framework.ExpectNoError(err) 1460 1461 ginkgo.By(fmt.Sprintf("Delete VolumeSnapshotContent %s", snapshotContent.GetName())) 1462 err = m.config.Framework.DynamicClient.Resource(utils.SnapshotContentGVR).Delete(ctx, snapshotContent.GetName(), metav1.DeleteOptions{}) 1463 framework.ExpectNoError(err, "Failed to delete snapshotcontent: %v", err) 1464 1465 ginkgo.By("Get VolumeSnapshotContent from API server and verify deletion timestamp is set") 1466 snapshotContent, err = m.config.Framework.DynamicClient.Resource(utils.SnapshotContentGVR).Get(context.TODO(), snapshotContent.GetName(), metav1.GetOptions{}) 1467 framework.ExpectNoError(err) 1468 1469 if snapshotContent.GetDeletionTimestamp() == nil { 1470 framework.Failf("Expected deletion timestamp to be set on snapshotcontent") 1471 } 1472 1473 // If the claim is non existent, the Get() call on the API server returns 1474 // an non-nil claim object with all fields unset. 1475 // Refer https://github.com/kubernetes/kubernetes/pull/99167#issuecomment-781670012 1476 if claim != nil && claim.Spec.VolumeName != "" { 1477 ginkgo.By(fmt.Sprintf("Wait for PV %s to be deleted", claim.Spec.VolumeName)) 1478 err = e2epv.WaitForPersistentVolumeDeleted(m.cs, claim.Spec.VolumeName, framework.Poll, 3*time.Minute) 1479 framework.ExpectNoError(err, fmt.Sprintf("failed to delete PV %s", claim.Spec.VolumeName)) 1480 } 1481 1482 ginkgo.By(fmt.Sprintf("Verify VolumeSnapshot %s contains finalizer %s", snapshot.GetName(), volumeSnapshotBoundFinalizer)) 1483 err = utils.WaitForGVRFinalizer(ctx, m.config.Framework.DynamicClient, utils.SnapshotGVR, volumeSnapshotName, f.Namespace.Name, volumeSnapshotBoundFinalizer, 1*time.Millisecond, 1*time.Minute) 1484 framework.ExpectNoError(err) 1485 1486 ginkgo.By("Delete VolumeSnapshot") 1487 err = utils.DeleteAndWaitSnapshot(m.config.Framework.DynamicClient, f.Namespace.Name, volumeSnapshotName, framework.Poll, framework.SnapshotDeleteTimeout) 1488 framework.ExpectNoError(err, fmt.Sprintf("failed to delete VolumeSnapshot %s", volumeSnapshotName)) 1489 1490 ginkgo.By(fmt.Sprintf("Wait for VolumeSnapshotContent %s to be deleted", volumeSnapshotContentName)) 1491 err = utils.WaitForGVRDeletion(m.config.Framework.DynamicClient, utils.SnapshotContentGVR, volumeSnapshotContentName, framework.Poll, framework.SnapshotDeleteTimeout) 1492 framework.ExpectNoError(err, fmt.Sprintf("failed to delete VolumeSnapshotContent %s", volumeSnapshotContentName)) 1493 }) 1494 } 1495 }) 1496 1497 ginkgo.Context("CSIServiceAccountToken", func() { 1498 var ( 1499 err error 1500 ) 1501 tests := []struct { 1502 desc string 1503 deployCSIDriverObject bool 1504 tokenRequests []storagev1.TokenRequest 1505 }{ 1506 { 1507 desc: "token should not be plumbed down when csiServiceAccountTokenEnabled=false", 1508 deployCSIDriverObject: true, 1509 tokenRequests: nil, 1510 }, 1511 { 1512 desc: "token should not be plumbed down when CSIDriver is not deployed", 1513 deployCSIDriverObject: false, 1514 tokenRequests: []storagev1.TokenRequest{{}}, 1515 }, 1516 { 1517 desc: "token should be plumbed down when csiServiceAccountTokenEnabled=true", 1518 deployCSIDriverObject: true, 1519 tokenRequests: []storagev1.TokenRequest{{ExpirationSeconds: utilptr.Int64Ptr(60 * 10)}}, 1520 }, 1521 } 1522 for _, test := range tests { 1523 test := test 1524 csiServiceAccountTokenEnabled := test.tokenRequests != nil 1525 ginkgo.It(test.desc, func() { 1526 init(testParameters{ 1527 registerDriver: test.deployCSIDriverObject, 1528 tokenRequests: test.tokenRequests, 1529 requiresRepublish: &csiServiceAccountTokenEnabled, 1530 }) 1531 1532 defer cleanup() 1533 1534 _, _, pod := createPod(false) 1535 if pod == nil { 1536 return 1537 } 1538 err = e2epod.WaitForPodNameRunningInNamespace(m.cs, pod.Name, pod.Namespace) 1539 framework.ExpectNoError(err, "Failed to start pod: %v", err) 1540 1541 // sleep to make sure RequiresRepublish triggers more than 1 NodePublishVolume 1542 numNodePublishVolume := 1 1543 if test.deployCSIDriverObject && csiServiceAccountTokenEnabled { 1544 time.Sleep(time.Second) 1545 numNodePublishVolume = 2 1546 } 1547 1548 ginkgo.By("Deleting the previously created pod") 1549 err = e2epod.DeletePodWithWait(m.cs, pod) 1550 framework.ExpectNoError(err, "while deleting") 1551 1552 ginkgo.By("Checking CSI driver logs") 1553 err = checkPodLogs(m.driver.GetCalls, pod, false, false, false, test.deployCSIDriverObject && csiServiceAccountTokenEnabled, numNodePublishVolume) 1554 framework.ExpectNoError(err) 1555 }) 1556 } 1557 }) 1558 // These tests *only* work on a cluster which has the CSIVolumeFSGroupPolicy feature enabled. 1559 ginkgo.Context("CSI FSGroupPolicy [LinuxOnly]", func() { 1560 tests := []struct { 1561 name string 1562 fsGroupPolicy storagev1.FSGroupPolicy 1563 modified bool 1564 }{ 1565 { 1566 name: "should modify fsGroup if fsGroupPolicy=default", 1567 fsGroupPolicy: storagev1.ReadWriteOnceWithFSTypeFSGroupPolicy, 1568 modified: true, 1569 }, 1570 { 1571 name: "should modify fsGroup if fsGroupPolicy=File", 1572 fsGroupPolicy: storagev1.FileFSGroupPolicy, 1573 modified: true, 1574 }, 1575 { 1576 name: "should not modify fsGroup if fsGroupPolicy=None", 1577 fsGroupPolicy: storagev1.NoneFSGroupPolicy, 1578 modified: false, 1579 }, 1580 } 1581 for _, t := range tests { 1582 test := t 1583 ginkgo.It(test.name, func() { 1584 if framework.NodeOSDistroIs("windows") { 1585 e2eskipper.Skipf("FSGroupPolicy is only applied on linux nodes -- skipping") 1586 } 1587 init(testParameters{ 1588 disableAttach: true, 1589 registerDriver: true, 1590 fsGroupPolicy: &test.fsGroupPolicy, 1591 }) 1592 defer cleanup() 1593 1594 // kube-scheduler may need some time before it gets the CSIDriver object. 1595 // Without them, scheduling doesn't run as expected by the test. 1596 syncDelay := 5 * time.Second 1597 time.Sleep(syncDelay) 1598 1599 fsGroupVal := int64(rand.Int63n(20000) + 1024) 1600 fsGroup := &fsGroupVal 1601 1602 _, _, pod := createPodWithFSGroup(fsGroup) /* persistent volume */ 1603 1604 mountPath := pod.Spec.Containers[0].VolumeMounts[0].MountPath 1605 dirName := mountPath + "/" + f.UniqueName 1606 fileName := dirName + "/" + f.UniqueName 1607 1608 err := e2epod.WaitForPodNameRunningInNamespace(m.cs, pod.Name, pod.Namespace) 1609 framework.ExpectNoError(err, "failed to start pod") 1610 1611 // Create the subdirectory to ensure that fsGroup propagates 1612 createDirectory := fmt.Sprintf("mkdir %s", dirName) 1613 _, _, err = e2evolume.PodExec(f, pod, createDirectory) 1614 framework.ExpectNoError(err, "failed: creating the directory: %s", err) 1615 1616 // Inject the contents onto the mount 1617 createFile := fmt.Sprintf("echo '%s' > '%s'; sync", "filecontents", fileName) 1618 _, _, err = e2evolume.PodExec(f, pod, createFile) 1619 framework.ExpectNoError(err, "failed: writing the contents: %s", err) 1620 1621 // Delete the created file. This step is mandatory, as the mock driver 1622 // won't clean up the contents automatically. 1623 defer func() { 1624 delete := fmt.Sprintf("rm -fr %s", dirName) 1625 _, _, err = e2evolume.PodExec(f, pod, delete) 1626 framework.ExpectNoError(err, "failed: deleting the directory: %s", err) 1627 }() 1628 1629 // Ensure that the fsGroup matches what we expect 1630 if test.modified { 1631 utils.VerifyFSGroupInPod(f, fileName, strconv.FormatInt(*fsGroup, 10), pod) 1632 } else { 1633 utils.VerifyFSGroupInPod(f, fileName, "root", pod) 1634 } 1635 1636 // The created resources will be removed by the cleanup() function, 1637 // so need to delete anything here. 1638 }) 1639 } 1640 }) 1641 1642 ginkgo.Context("CSI Volume Snapshots secrets [Feature:VolumeSnapshotDataSource]", func() { 1643 1644 var ( 1645 // CSISnapshotterSecretName is the name of the secret to be created 1646 CSISnapshotterSecretName string = "snapshot-secret" 1647 1648 // CSISnapshotterSecretNameAnnotation is the annotation key for the CSI snapshotter secret name in VolumeSnapshotClass.parameters 1649 CSISnapshotterSecretNameAnnotation string = "csi.storage.k8s.io/snapshotter-secret-name" 1650 1651 // CSISnapshotterSecretNamespaceAnnotation is the annotation key for the CSI snapshotter secret namespace in VolumeSnapshotClass.parameters 1652 CSISnapshotterSecretNamespaceAnnotation string = "csi.storage.k8s.io/snapshotter-secret-namespace" 1653 1654 // anotations holds the annotations object 1655 annotations interface{} 1656 ) 1657 1658 tests := []struct { 1659 name string 1660 createSnapshotHook func(counter int64) error 1661 }{ 1662 { 1663 // volume snapshot should be created using secrets successfully even if there is a failure in the first few attempts, 1664 name: "volume snapshot create/delete with secrets", 1665 // Fail the first 8 calls to create snapshot and succeed the 9th call. 1666 createSnapshotHook: func(counter int64) error { 1667 if counter < 8 { 1668 return status.Error(codes.DeadlineExceeded, "fake error") 1669 } 1670 return nil 1671 }, 1672 }, 1673 } 1674 for _, test := range tests { 1675 ginkgo.It(test.name, func() { 1676 hooks := createPreHook("CreateSnapshot", test.createSnapshotHook) 1677 init(testParameters{ 1678 disableAttach: true, 1679 registerDriver: true, 1680 enableSnapshot: true, 1681 hooks: hooks, 1682 }) 1683 1684 sDriver, ok := m.driver.(storageframework.SnapshottableTestDriver) 1685 if !ok { 1686 e2eskipper.Skipf("mock driver does not support snapshots -- skipping") 1687 } 1688 defer cleanup() 1689 1690 var sc *storagev1.StorageClass 1691 if dDriver, ok := m.driver.(storageframework.DynamicPVTestDriver); ok { 1692 sc = dDriver.GetDynamicProvisionStorageClass(m.config, "") 1693 } 1694 ginkgo.By("Creating storage class") 1695 class, err := m.cs.StorageV1().StorageClasses().Create(context.TODO(), sc, metav1.CreateOptions{}) 1696 framework.ExpectNoError(err, "Failed to create storage class: %v", err) 1697 m.sc[class.Name] = class 1698 pvc := e2epv.MakePersistentVolumeClaim(e2epv.PersistentVolumeClaimConfig{ 1699 Name: "snapshot-test-pvc", 1700 StorageClassName: &(class.Name), 1701 }, f.Namespace.Name) 1702 1703 ginkgo.By(fmt.Sprintf("Creating PVC %s/%s", pvc.Namespace, pvc.Name)) 1704 pvc, err = m.cs.CoreV1().PersistentVolumeClaims(f.Namespace.Name).Create(context.TODO(), pvc, metav1.CreateOptions{}) 1705 framework.ExpectNoError(err, "Failed to create claim: %v", err) 1706 1707 ginkgo.By("Wait for PVC to be Bound") 1708 _, err = e2epv.WaitForPVClaimBoundPhase(m.cs, []*v1.PersistentVolumeClaim{pvc}, 1*time.Minute) 1709 framework.ExpectNoError(err, "Failed to create claim: %v", err) 1710 1711 m.pvcs = append(m.pvcs, pvc) 1712 1713 ginkgo.By("Creating Secret") 1714 secret := &v1.Secret{ 1715 ObjectMeta: metav1.ObjectMeta{ 1716 Namespace: f.Namespace.Name, 1717 Name: CSISnapshotterSecretName, 1718 }, 1719 Data: map[string][]byte{ 1720 "secret-data": []byte("secret-value-1"), 1721 }, 1722 } 1723 1724 if secret, err := m.cs.CoreV1().Secrets(f.Namespace.Name).Create(context.TODO(), secret, metav1.CreateOptions{}); err != nil { 1725 framework.Failf("unable to create test secret %s: %v", secret.Name, err) 1726 } 1727 1728 ginkgo.By("Creating snapshot with secrets") 1729 parameters := map[string]string{ 1730 CSISnapshotterSecretNameAnnotation: CSISnapshotterSecretName, 1731 CSISnapshotterSecretNamespaceAnnotation: f.Namespace.Name, 1732 } 1733 1734 _, snapshot := storageframework.CreateSnapshot(sDriver, m.config, storageframework.DynamicSnapshotDelete, pvc.Name, pvc.Namespace, f.Timeouts, parameters) 1735 framework.ExpectNoError(err, "failed to create snapshot") 1736 snapshotcontent := utils.GetSnapshotContentFromSnapshot(m.config.Framework.DynamicClient, snapshot) 1737 if annotations, ok = snapshotcontent.Object["metadata"].(map[string]interface{})["annotations"]; !ok { 1738 framework.Failf("Unable to get volume snapshot content annotations") 1739 } 1740 1741 // checks if delete snapshot secrets annotation is applied to the VolumeSnapshotContent. 1742 checkDeleteSnapshotSecrets(m.cs, annotations) 1743 1744 // delete the snapshot and check if the snapshot is deleted. 1745 deleteSnapshot(m.cs, m.config, snapshot) 1746 }) 1747 } 1748 }) 1749 1750 ginkgo.Context("CSI Snapshot Controller metrics [Feature:VolumeSnapshotDataSource]", func() { 1751 tests := []struct { 1752 name string 1753 pattern storageframework.TestPattern 1754 }{ 1755 { 1756 name: "snapshot controller should emit dynamic CreateSnapshot, CreateSnapshotAndReady, and DeleteSnapshot metrics", 1757 pattern: storageframework.DynamicSnapshotDelete, 1758 }, 1759 { 1760 name: "snapshot controller should emit pre-provisioned CreateSnapshot, CreateSnapshotAndReady, and DeleteSnapshot metrics", 1761 pattern: storageframework.PreprovisionedSnapshotDelete, 1762 }, 1763 } 1764 for _, test := range tests { 1765 ginkgo.It(test.name, func() { 1766 init(testParameters{ 1767 disableAttach: true, 1768 registerDriver: true, 1769 enableSnapshot: true, 1770 }) 1771 1772 sDriver, ok := m.driver.(storageframework.SnapshottableTestDriver) 1773 if !ok { 1774 e2eskipper.Skipf("mock driver does not support snapshots -- skipping") 1775 } 1776 defer cleanup() 1777 1778 metricsGrabber, err := e2emetrics.NewMetricsGrabber(m.config.Framework.ClientSet, nil, f.ClientConfig(), false, false, false, false, false, true) 1779 if err != nil { 1780 framework.Failf("Error creating metrics grabber : %v", err) 1781 } 1782 1783 // Grab initial metrics - if this fails, snapshot controller metrics are not setup. Skip in this case. 1784 _, err = metricsGrabber.GrabFromSnapshotController(framework.TestContext.SnapshotControllerPodName, framework.TestContext.SnapshotControllerHTTPPort) 1785 if err != nil { 1786 e2eskipper.Skipf("Snapshot controller metrics not found -- skipping") 1787 } 1788 1789 ginkgo.By("getting all initial metric values") 1790 metricsTestConfig := newSnapshotMetricsTestConfig("snapshot_controller_operation_total_seconds_count", 1791 "count", 1792 m.config.GetUniqueDriverName(), 1793 "CreateSnapshot", 1794 "success", 1795 "", 1796 test.pattern) 1797 createSnapshotMetrics := newSnapshotControllerMetrics(metricsTestConfig, metricsGrabber) 1798 originalCreateSnapshotCount, _ := createSnapshotMetrics.getSnapshotControllerMetricValue() 1799 metricsTestConfig.operationName = "CreateSnapshotAndReady" 1800 createSnapshotAndReadyMetrics := newSnapshotControllerMetrics(metricsTestConfig, metricsGrabber) 1801 originalCreateSnapshotAndReadyCount, _ := createSnapshotAndReadyMetrics.getSnapshotControllerMetricValue() 1802 1803 metricsTestConfig.operationName = "DeleteSnapshot" 1804 deleteSnapshotMetrics := newSnapshotControllerMetrics(metricsTestConfig, metricsGrabber) 1805 originalDeleteSnapshotCount, _ := deleteSnapshotMetrics.getSnapshotControllerMetricValue() 1806 1807 ginkgo.By("Creating storage class") 1808 var sc *storagev1.StorageClass 1809 if dDriver, ok := m.driver.(storageframework.DynamicPVTestDriver); ok { 1810 sc = dDriver.GetDynamicProvisionStorageClass(m.config, "") 1811 } 1812 class, err := m.cs.StorageV1().StorageClasses().Create(context.TODO(), sc, metav1.CreateOptions{}) 1813 framework.ExpectNoError(err, "Failed to create storage class: %v", err) 1814 m.sc[class.Name] = class 1815 pvc := e2epv.MakePersistentVolumeClaim(e2epv.PersistentVolumeClaimConfig{ 1816 Name: "snapshot-test-pvc", 1817 StorageClassName: &(class.Name), 1818 }, f.Namespace.Name) 1819 1820 ginkgo.By(fmt.Sprintf("Creating PVC %s/%s", pvc.Namespace, pvc.Name)) 1821 pvc, err = m.cs.CoreV1().PersistentVolumeClaims(f.Namespace.Name).Create(context.TODO(), pvc, metav1.CreateOptions{}) 1822 framework.ExpectNoError(err, "Failed to create claim: %v", err) 1823 1824 ginkgo.By("Wait for PVC to be Bound") 1825 _, err = e2epv.WaitForPVClaimBoundPhase(m.cs, []*v1.PersistentVolumeClaim{pvc}, 1*time.Minute) 1826 framework.ExpectNoError(err, "Failed to create claim: %v", err) 1827 1828 ginkgo.By("Creating snapshot") 1829 parameters := map[string]string{} 1830 sr := storageframework.CreateSnapshotResource(sDriver, m.config, test.pattern, pvc.Name, pvc.Namespace, f.Timeouts, parameters) 1831 framework.ExpectNoError(err, "failed to create snapshot") 1832 1833 ginkgo.By("Checking for CreateSnapshot metrics") 1834 createSnapshotMetrics.waitForSnapshotControllerMetric(originalCreateSnapshotCount+1.0, f.Timeouts.SnapshotControllerMetrics) 1835 1836 ginkgo.By("Checking for CreateSnapshotAndReady metrics") 1837 err = utils.WaitForSnapshotReady(m.config.Framework.DynamicClient, pvc.Namespace, sr.Vs.GetName(), framework.Poll, f.Timeouts.SnapshotCreate) 1838 framework.ExpectNoError(err, "failed to wait for snapshot ready") 1839 createSnapshotAndReadyMetrics.waitForSnapshotControllerMetric(originalCreateSnapshotAndReadyCount+1.0, f.Timeouts.SnapshotControllerMetrics) 1840 1841 // delete the snapshot and check if the snapshot is deleted 1842 deleteSnapshot(m.cs, m.config, sr.Vs) 1843 1844 ginkgo.By("check for delete metrics") 1845 metricsTestConfig.operationName = "DeleteSnapshot" 1846 deleteSnapshotMetrics.waitForSnapshotControllerMetric(originalDeleteSnapshotCount+1.0, f.Timeouts.SnapshotControllerMetrics) 1847 }) 1848 } 1849 }) 1850}) 1851 1852func deleteSnapshot(cs clientset.Interface, config *storageframework.PerTestConfig, snapshot *unstructured.Unstructured) { 1853 // delete the given snapshot 1854 dc := config.Framework.DynamicClient 1855 err := dc.Resource(utils.SnapshotGVR).Namespace(snapshot.GetNamespace()).Delete(context.TODO(), snapshot.GetName(), metav1.DeleteOptions{}) 1856 framework.ExpectNoError(err) 1857 1858 // check if the snapshot is deleted 1859 _, err = dc.Resource(utils.SnapshotGVR).Get(context.TODO(), snapshot.GetName(), metav1.GetOptions{}) 1860 framework.ExpectError(err) 1861} 1862 1863// A lot of this code was copied from e2e/framework. It would be nicer 1864// if it could be reused - see https://github.com/kubernetes/kubernetes/issues/92754 1865func podRunning(ctx context.Context, c clientset.Interface, podName, namespace string) wait.ConditionFunc { 1866 return func() (bool, error) { 1867 pod, err := c.CoreV1().Pods(namespace).Get(ctx, podName, metav1.GetOptions{}) 1868 if err != nil { 1869 return false, err 1870 } 1871 switch pod.Status.Phase { 1872 case v1.PodRunning: 1873 return true, nil 1874 case v1.PodFailed, v1.PodSucceeded: 1875 return false, errPodCompleted 1876 } 1877 return false, nil 1878 } 1879} 1880 1881const ( 1882 poll = 2 * time.Second 1883 pvcAsSourceProtectionFinalizer = "snapshot.storage.kubernetes.io/pvc-as-source-protection" 1884 volumeSnapshotContentFinalizer = "snapshot.storage.kubernetes.io/volumesnapshotcontent-bound-protection" 1885 volumeSnapshotBoundFinalizer = "snapshot.storage.kubernetes.io/volumesnapshot-bound-protection" 1886) 1887 1888var ( 1889 errPodCompleted = fmt.Errorf("pod ran to completion") 1890 errNotEnoughSpace = errors.New(scheduling.ErrReasonNotEnoughSpace) 1891) 1892 1893func podHasStorage(ctx context.Context, c clientset.Interface, podName, namespace string, when time.Time) wait.ConditionFunc { 1894 // Check for events of this pod. Copied from test/e2e/common/container_probe.go. 1895 expectedEvent := fields.Set{ 1896 "involvedObject.kind": "Pod", 1897 "involvedObject.name": podName, 1898 "involvedObject.namespace": namespace, 1899 "reason": "FailedScheduling", 1900 }.AsSelector().String() 1901 options := metav1.ListOptions{ 1902 FieldSelector: expectedEvent, 1903 } 1904 // copied from test/e2e/framework/events/events.go 1905 return func() (bool, error) { 1906 // We cannot be sure here whether it has enough storage, only when 1907 // it hasn't. In that case we abort waiting with a special error. 1908 events, err := c.CoreV1().Events(namespace).List(ctx, options) 1909 if err != nil { 1910 return false, fmt.Errorf("got error while getting events: %w", err) 1911 } 1912 for _, event := range events.Items { 1913 if /* event.CreationTimestamp.After(when) && 1914 */strings.Contains(event.Message, scheduling.ErrReasonNotEnoughSpace) { 1915 return false, errNotEnoughSpace 1916 } 1917 } 1918 return false, nil 1919 } 1920} 1921 1922func anyOf(conditions ...wait.ConditionFunc) wait.ConditionFunc { 1923 return func() (bool, error) { 1924 for _, condition := range conditions { 1925 done, err := condition() 1926 if err != nil { 1927 return false, err 1928 } 1929 if done { 1930 return true, nil 1931 } 1932 } 1933 return false, nil 1934 } 1935} 1936 1937func waitForMaxVolumeCondition(pod *v1.Pod, cs clientset.Interface) error { 1938 waitErr := wait.PollImmediate(10*time.Second, csiPodUnschedulableTimeout, func() (bool, error) { 1939 pod, err := cs.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{}) 1940 if err != nil { 1941 return false, err 1942 } 1943 for _, c := range pod.Status.Conditions { 1944 // Conformance tests cannot rely on specific output of optional fields (e.g., Reason 1945 // and Message) because these fields are not suject to the deprecation policy. 1946 if c.Type == v1.PodScheduled && c.Status == v1.ConditionFalse && c.Reason != "" && c.Message != "" { 1947 return true, nil 1948 } 1949 } 1950 return false, nil 1951 }) 1952 if waitErr != nil { 1953 return fmt.Errorf("error waiting for pod %s/%s to have max volume condition: %v", pod.Namespace, pod.Name, waitErr) 1954 } 1955 return nil 1956} 1957 1958func waitForVolumeAttachmentTerminated(attachmentName string, cs clientset.Interface) error { 1959 waitErr := wait.PollImmediate(10*time.Second, csiVolumeAttachmentTimeout, func() (bool, error) { 1960 _, err := cs.StorageV1().VolumeAttachments().Get(context.TODO(), attachmentName, metav1.GetOptions{}) 1961 if err != nil { 1962 // if the volumeattachment object is not found, it means it has been terminated. 1963 if apierrors.IsNotFound(err) { 1964 return true, nil 1965 } 1966 return false, err 1967 } 1968 return false, nil 1969 }) 1970 if waitErr != nil { 1971 return fmt.Errorf("error waiting volume attachment %v to terminate: %v", attachmentName, waitErr) 1972 } 1973 return nil 1974} 1975 1976func checkCSINodeForLimits(nodeName string, driverName string, cs clientset.Interface) (int32, error) { 1977 var attachLimit int32 1978 1979 waitErr := wait.PollImmediate(10*time.Second, csiNodeLimitUpdateTimeout, func() (bool, error) { 1980 csiNode, err := cs.StorageV1().CSINodes().Get(context.TODO(), nodeName, metav1.GetOptions{}) 1981 if err != nil && !apierrors.IsNotFound(err) { 1982 return false, err 1983 } 1984 attachLimit = getVolumeLimitFromCSINode(csiNode, driverName) 1985 if attachLimit > 0 { 1986 return true, nil 1987 } 1988 return false, nil 1989 }) 1990 if waitErr != nil { 1991 return 0, fmt.Errorf("error waiting for non-zero volume limit of driver %s on node %s: %v", driverName, nodeName, waitErr) 1992 } 1993 return attachLimit, nil 1994} 1995 1996func createClaim(cs clientset.Interface, t testsuites.StorageClassTest, node e2epod.NodeSelection, scName, ns string) (*storagev1.StorageClass, *v1.PersistentVolumeClaim) { 1997 class := newStorageClass(t, ns, "") 1998 if scName != "" { 1999 class.Name = scName 2000 } 2001 var err error 2002 _, err = cs.StorageV1().StorageClasses().Get(context.TODO(), class.Name, metav1.GetOptions{}) 2003 if err != nil { 2004 class, err = cs.StorageV1().StorageClasses().Create(context.TODO(), class, metav1.CreateOptions{}) 2005 framework.ExpectNoError(err, "Failed to create class: %v", err) 2006 } 2007 2008 claim := e2epv.MakePersistentVolumeClaim(e2epv.PersistentVolumeClaimConfig{ 2009 ClaimSize: t.ClaimSize, 2010 StorageClassName: &(class.Name), 2011 VolumeMode: &t.VolumeMode, 2012 }, ns) 2013 claim, err = cs.CoreV1().PersistentVolumeClaims(ns).Create(context.TODO(), claim, metav1.CreateOptions{}) 2014 framework.ExpectNoError(err, "Failed to create claim: %v", err) 2015 2016 if !t.DelayBinding { 2017 pvcClaims := []*v1.PersistentVolumeClaim{claim} 2018 _, err = e2epv.WaitForPVClaimBoundPhase(cs, pvcClaims, framework.ClaimProvisionTimeout) 2019 framework.ExpectNoError(err, "Failed waiting for PVC to be bound: %v", err) 2020 } 2021 return class, claim 2022} 2023 2024func startPausePod(cs clientset.Interface, t testsuites.StorageClassTest, node e2epod.NodeSelection, scName, ns string) (*storagev1.StorageClass, *v1.PersistentVolumeClaim, *v1.Pod) { 2025 class, claim := createClaim(cs, t, node, scName, ns) 2026 2027 pod, err := startPausePodWithClaim(cs, claim, node, ns) 2028 framework.ExpectNoError(err, "Failed to create pause pod: %v", err) 2029 return class, claim, pod 2030} 2031 2032func startBusyBoxPod(cs clientset.Interface, t testsuites.StorageClassTest, node e2epod.NodeSelection, scName, ns string, fsGroup *int64) (*storagev1.StorageClass, *v1.PersistentVolumeClaim, *v1.Pod) { 2033 class, claim := createClaim(cs, t, node, scName, ns) 2034 pod, err := startBusyBoxPodWithClaim(cs, claim, node, ns, fsGroup) 2035 framework.ExpectNoError(err, "Failed to create busybox pod: %v", err) 2036 return class, claim, pod 2037} 2038 2039func startPausePodInline(cs clientset.Interface, t testsuites.StorageClassTest, node e2epod.NodeSelection, ns string) *v1.Pod { 2040 pod, err := startPausePodWithInlineVolume(cs, 2041 &v1.CSIVolumeSource{ 2042 Driver: t.Provisioner, 2043 }, 2044 node, ns) 2045 framework.ExpectNoError(err, "Failed to create pod: %v", err) 2046 return pod 2047} 2048 2049func startPausePodWithClaim(cs clientset.Interface, pvc *v1.PersistentVolumeClaim, node e2epod.NodeSelection, ns string) (*v1.Pod, error) { 2050 return startPausePodWithVolumeSource(cs, 2051 v1.VolumeSource{ 2052 PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ 2053 ClaimName: pvc.Name, 2054 ReadOnly: false, 2055 }, 2056 }, 2057 node, ns) 2058} 2059 2060func startBusyBoxPodWithClaim(cs clientset.Interface, pvc *v1.PersistentVolumeClaim, node e2epod.NodeSelection, ns string, fsGroup *int64) (*v1.Pod, error) { 2061 return startBusyBoxPodWithVolumeSource(cs, 2062 v1.VolumeSource{ 2063 PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ 2064 ClaimName: pvc.Name, 2065 ReadOnly: false, 2066 }, 2067 }, 2068 node, ns, fsGroup) 2069} 2070 2071func startPausePodWithInlineVolume(cs clientset.Interface, inlineVolume *v1.CSIVolumeSource, node e2epod.NodeSelection, ns string) (*v1.Pod, error) { 2072 return startPausePodWithVolumeSource(cs, 2073 v1.VolumeSource{ 2074 CSI: inlineVolume, 2075 }, 2076 node, ns) 2077} 2078 2079func startPausePodWithVolumeSource(cs clientset.Interface, volumeSource v1.VolumeSource, node e2epod.NodeSelection, ns string) (*v1.Pod, error) { 2080 pod := &v1.Pod{ 2081 ObjectMeta: metav1.ObjectMeta{ 2082 GenerateName: "pvc-volume-tester-", 2083 }, 2084 Spec: v1.PodSpec{ 2085 Containers: []v1.Container{ 2086 { 2087 Name: "volume-tester", 2088 Image: imageutils.GetE2EImage(imageutils.Pause), 2089 VolumeMounts: []v1.VolumeMount{ 2090 { 2091 Name: "my-volume", 2092 MountPath: "/mnt/test", 2093 }, 2094 }, 2095 }, 2096 }, 2097 RestartPolicy: v1.RestartPolicyNever, 2098 Volumes: []v1.Volume{ 2099 { 2100 Name: "my-volume", 2101 VolumeSource: volumeSource, 2102 }, 2103 }, 2104 }, 2105 } 2106 e2epod.SetNodeSelection(&pod.Spec, node) 2107 return cs.CoreV1().Pods(ns).Create(context.TODO(), pod, metav1.CreateOptions{}) 2108} 2109 2110func startBusyBoxPodWithVolumeSource(cs clientset.Interface, volumeSource v1.VolumeSource, node e2epod.NodeSelection, ns string, fsGroup *int64) (*v1.Pod, error) { 2111 pod := &v1.Pod{ 2112 ObjectMeta: metav1.ObjectMeta{ 2113 GenerateName: "pvc-volume-tester-", 2114 }, 2115 Spec: v1.PodSpec{ 2116 Containers: []v1.Container{ 2117 { 2118 Name: "volume-tester", 2119 Image: framework.BusyBoxImage, 2120 VolumeMounts: []v1.VolumeMount{ 2121 { 2122 Name: "my-volume", 2123 MountPath: "/mnt/test", 2124 }, 2125 }, 2126 Command: e2epod.GenerateScriptCmd("while true ; do sleep 2; done"), 2127 }, 2128 }, 2129 SecurityContext: &v1.PodSecurityContext{ 2130 FSGroup: fsGroup, 2131 }, 2132 RestartPolicy: v1.RestartPolicyNever, 2133 Volumes: []v1.Volume{ 2134 { 2135 Name: "my-volume", 2136 VolumeSource: volumeSource, 2137 }, 2138 }, 2139 }, 2140 } 2141 e2epod.SetNodeSelection(&pod.Spec, node) 2142 return cs.CoreV1().Pods(ns).Create(context.TODO(), pod, metav1.CreateOptions{}) 2143} 2144 2145// checkPodLogs tests that NodePublish was called with expected volume_context and (for ephemeral inline volumes) 2146// has the matching NodeUnpublish 2147func checkPodLogs(getCalls func() ([]drivers.MockCSICall, error), pod *v1.Pod, expectPodInfo, ephemeralVolume, csiInlineVolumesEnabled, csiServiceAccountTokenEnabled bool, expectedNumNodePublish int) error { 2148 expectedAttributes := map[string]string{} 2149 if expectPodInfo { 2150 expectedAttributes["csi.storage.k8s.io/pod.name"] = pod.Name 2151 expectedAttributes["csi.storage.k8s.io/pod.namespace"] = pod.Namespace 2152 expectedAttributes["csi.storage.k8s.io/pod.uid"] = string(pod.UID) 2153 expectedAttributes["csi.storage.k8s.io/serviceAccount.name"] = "default" 2154 2155 } 2156 if csiInlineVolumesEnabled { 2157 // This is only passed in 1.15 when the CSIInlineVolume feature gate is set. 2158 expectedAttributes["csi.storage.k8s.io/ephemeral"] = strconv.FormatBool(ephemeralVolume) 2159 } 2160 2161 if csiServiceAccountTokenEnabled { 2162 expectedAttributes["csi.storage.k8s.io/serviceAccount.tokens"] = "<nonempty>" 2163 } 2164 2165 // Find NodePublish in the GRPC calls. 2166 foundAttributes := sets.NewString() 2167 numNodePublishVolume := 0 2168 numNodeUnpublishVolume := 0 2169 calls, err := getCalls() 2170 if err != nil { 2171 return err 2172 } 2173 2174 for _, call := range calls { 2175 switch call.Method { 2176 case "NodePublishVolume": 2177 numNodePublishVolume++ 2178 if numNodePublishVolume == 1 { 2179 // Check that NodePublish had expected attributes for first volume 2180 for k, v := range expectedAttributes { 2181 vv, found := call.Request.VolumeContext[k] 2182 if found && (v == vv || (v == "<nonempty>" && len(vv) != 0)) { 2183 foundAttributes.Insert(k) 2184 framework.Logf("Found volume attribute %s: %s", k, vv) 2185 } 2186 } 2187 } 2188 case "NodeUnpublishVolume": 2189 framework.Logf("Found NodeUnpublishVolume: %+v", call) 2190 numNodeUnpublishVolume++ 2191 } 2192 } 2193 if numNodePublishVolume < expectedNumNodePublish { 2194 return fmt.Errorf("NodePublish should be called at least %d", expectedNumNodePublish) 2195 } 2196 2197 if numNodeUnpublishVolume == 0 { 2198 return fmt.Errorf("NodeUnpublish was never called") 2199 } 2200 if foundAttributes.Len() != len(expectedAttributes) { 2201 return fmt.Errorf("number of found volume attributes does not match, expected %d, got %d", len(expectedAttributes), foundAttributes.Len()) 2202 } 2203 return nil 2204} 2205 2206// compareCSICalls compares expectedCalls with logs of the mock driver. 2207// It returns index of the first expectedCall that was *not* received 2208// yet or error when calls do not match. 2209// All repeated calls to the CSI mock driver (e.g. due to exponential backoff) 2210// are squashed and checked against single expectedCallSequence item. 2211// 2212// Only permanent errors are returned. Other errors are logged and no 2213// calls are returned. The caller is expected to retry. 2214func compareCSICalls(trackedCalls []string, expectedCallSequence []csiCall, getCalls func() ([]drivers.MockCSICall, error)) ([]drivers.MockCSICall, int, error) { 2215 allCalls, err := getCalls() 2216 if err != nil { 2217 framework.Logf("intermittent (?) log retrieval error, proceeding without output: %v", err) 2218 return nil, 0, nil 2219 } 2220 2221 // Remove all repeated and ignored calls 2222 tracked := sets.NewString(trackedCalls...) 2223 var calls []drivers.MockCSICall 2224 var last drivers.MockCSICall 2225 for _, c := range allCalls { 2226 if !tracked.Has(c.Method) { 2227 continue 2228 } 2229 if c.Method != last.Method || c.FullError.Code != last.FullError.Code { 2230 last = c 2231 calls = append(calls, c) 2232 } 2233 // This call is the same as the last one, ignore it. 2234 } 2235 2236 for i, c := range calls { 2237 if i >= len(expectedCallSequence) { 2238 // Log all unexpected calls first, return error below outside the loop. 2239 framework.Logf("Unexpected CSI driver call: %s (%d)", c.Method, c.FullError) 2240 continue 2241 } 2242 2243 // Compare current call with expected call 2244 expectedCall := expectedCallSequence[i] 2245 if c.Method != expectedCall.expectedMethod || c.FullError.Code != expectedCall.expectedError { 2246 return allCalls, i, fmt.Errorf("Unexpected CSI call %d: expected %s (%d), got %s (%d)", i, expectedCall.expectedMethod, expectedCall.expectedError, c.Method, c.FullError.Code) 2247 } 2248 } 2249 if len(calls) > len(expectedCallSequence) { 2250 return allCalls, len(expectedCallSequence), fmt.Errorf("Received %d unexpected CSI driver calls", len(calls)-len(expectedCallSequence)) 2251 } 2252 // All calls were correct 2253 return allCalls, len(calls), nil 2254 2255} 2256 2257func waitForCSIDriver(cs clientset.Interface, driverName string) error { 2258 timeout := 4 * time.Minute 2259 2260 framework.Logf("waiting up to %v for CSIDriver %q", timeout, driverName) 2261 for start := time.Now(); time.Since(start) < timeout; time.Sleep(framework.Poll) { 2262 _, err := cs.StorageV1().CSIDrivers().Get(context.TODO(), driverName, metav1.GetOptions{}) 2263 if !apierrors.IsNotFound(err) { 2264 return err 2265 } 2266 } 2267 return fmt.Errorf("gave up after waiting %v for CSIDriver %q", timeout, driverName) 2268} 2269 2270func destroyCSIDriver(cs clientset.Interface, driverName string) { 2271 driverGet, err := cs.StorageV1().CSIDrivers().Get(context.TODO(), driverName, metav1.GetOptions{}) 2272 if err == nil { 2273 framework.Logf("deleting %s.%s: %s", driverGet.TypeMeta.APIVersion, driverGet.TypeMeta.Kind, driverGet.ObjectMeta.Name) 2274 // Uncomment the following line to get full dump of CSIDriver object 2275 // framework.Logf("%s", framework.PrettyPrint(driverGet)) 2276 cs.StorageV1().CSIDrivers().Delete(context.TODO(), driverName, metav1.DeleteOptions{}) 2277 } 2278} 2279 2280func getVolumeHandle(cs clientset.Interface, claim *v1.PersistentVolumeClaim) string { 2281 // re-get the claim to the latest state with bound volume 2282 claim, err := cs.CoreV1().PersistentVolumeClaims(claim.Namespace).Get(context.TODO(), claim.Name, metav1.GetOptions{}) 2283 if err != nil { 2284 framework.ExpectNoError(err, "Cannot get PVC") 2285 return "" 2286 } 2287 pvName := claim.Spec.VolumeName 2288 pv, err := cs.CoreV1().PersistentVolumes().Get(context.TODO(), pvName, metav1.GetOptions{}) 2289 if err != nil { 2290 framework.ExpectNoError(err, "Cannot get PV") 2291 return "" 2292 } 2293 if pv.Spec.CSI == nil { 2294 gomega.Expect(pv.Spec.CSI).NotTo(gomega.BeNil()) 2295 return "" 2296 } 2297 return pv.Spec.CSI.VolumeHandle 2298} 2299 2300func getVolumeLimitFromCSINode(csiNode *storagev1.CSINode, driverName string) int32 { 2301 for _, d := range csiNode.Spec.Drivers { 2302 if d.Name != driverName { 2303 continue 2304 } 2305 if d.Allocatable != nil && d.Allocatable.Count != nil { 2306 return *d.Allocatable.Count 2307 } 2308 } 2309 return 0 2310} 2311 2312// checkDeleteSnapshotSecrets checks if delete snapshot secrets annotation is applied to the VolumeSnapshotContent. 2313func checkDeleteSnapshotSecrets(cs clientset.Interface, annotations interface{}) error { 2314 ginkgo.By("checking if delete snapshot secrets annotation is applied to the VolumeSnapshotContent") 2315 2316 var ( 2317 annDeletionSecretName string 2318 annDeletionSecretNamespace string 2319 ok bool 2320 err error 2321 2322 // CSISnapshotterDeleteSecretNameAnnotation is the annotation key for the CSI snapshotter delete secret name in VolumeSnapshotClass.parameters 2323 CSISnapshotterDeleteSecretNameAnnotation string = "snapshot.storage.kubernetes.io/deletion-secret-name" 2324 2325 // CSISnapshotterDeleteSecretNamespaceAnnotation is the annotation key for the CSI snapshotter delete secret namespace in VolumeSnapshotClass.parameters 2326 CSISnapshotterDeleteSecretNamespaceAnnotation string = "snapshot.storage.kubernetes.io/deletion-secret-namespace" 2327 ) 2328 2329 annotationsObj, ok := annotations.(map[string]interface{}) 2330 if !ok { 2331 framework.Failf("failed to get annotations from annotations object") 2332 } 2333 2334 if annDeletionSecretName, ok = annotationsObj[CSISnapshotterDeleteSecretNameAnnotation].(string); !ok { 2335 framework.Failf("unable to get secret annotation name") 2336 } 2337 if annDeletionSecretNamespace, ok = annotationsObj[CSISnapshotterDeleteSecretNamespaceAnnotation].(string); !ok { 2338 framework.Failf("unable to get secret annotation namespace") 2339 } 2340 2341 // verify if secrets exists 2342 if _, err = cs.CoreV1().Secrets(annDeletionSecretNamespace).Get(context.TODO(), annDeletionSecretName, metav1.GetOptions{}); err != nil { 2343 framework.Failf("unable to get test secret %s: %v", annDeletionSecretName, err) 2344 } 2345 2346 return err 2347} 2348 2349// createPreHook counts invocations of a certain method (identified by a substring in the full gRPC method name). 2350func createPreHook(method string, callback func(counter int64) error) *drivers.Hooks { 2351 var counter int64 2352 2353 return &drivers.Hooks{ 2354 Pre: func() func(ctx context.Context, fullMethod string, request interface{}) (reply interface{}, err error) { 2355 return func(ctx context.Context, fullMethod string, request interface{}) (reply interface{}, err error) { 2356 if strings.Contains(fullMethod, method) { 2357 counter := atomic.AddInt64(&counter, 1) 2358 return nil, callback(counter) 2359 } 2360 return nil, nil 2361 } 2362 }(), 2363 } 2364} 2365 2366type snapshotMetricsTestConfig struct { 2367 // expected values 2368 metricName string 2369 metricType string 2370 driverName string 2371 operationName string 2372 operationStatus string 2373 snapshotType string 2374 le string 2375} 2376 2377type snapshotControllerMetrics struct { 2378 // configuration for metric 2379 cfg snapshotMetricsTestConfig 2380 metricsGrabber *e2emetrics.Grabber 2381 2382 // results 2383 countMetrics map[string]float64 2384 sumMetrics map[string]float64 2385 bucketMetrics map[string]float64 2386} 2387 2388func newSnapshotMetricsTestConfig(metricName, metricType, driverName, operationName, operationStatus, le string, pattern storageframework.TestPattern) snapshotMetricsTestConfig { 2389 var snapshotType string 2390 switch pattern.SnapshotType { 2391 case storageframework.DynamicCreatedSnapshot: 2392 snapshotType = "dynamic" 2393 2394 case storageframework.PreprovisionedCreatedSnapshot: 2395 snapshotType = "pre-provisioned" 2396 2397 default: 2398 framework.Failf("invalid snapshotType: %v", pattern.SnapshotType) 2399 } 2400 2401 return snapshotMetricsTestConfig{ 2402 metricName: metricName, 2403 metricType: metricType, 2404 driverName: driverName, 2405 operationName: operationName, 2406 operationStatus: operationStatus, 2407 snapshotType: snapshotType, 2408 le: le, 2409 } 2410} 2411 2412func newSnapshotControllerMetrics(cfg snapshotMetricsTestConfig, metricsGrabber *e2emetrics.Grabber) *snapshotControllerMetrics { 2413 return &snapshotControllerMetrics{ 2414 cfg: cfg, 2415 metricsGrabber: metricsGrabber, 2416 2417 countMetrics: make(map[string]float64), 2418 sumMetrics: make(map[string]float64), 2419 bucketMetrics: make(map[string]float64), 2420 } 2421} 2422 2423func (scm *snapshotControllerMetrics) waitForSnapshotControllerMetric(expectedValue float64, timeout time.Duration) { 2424 metricKey := scm.getMetricKey() 2425 if successful := utils.WaitUntil(10*time.Second, timeout, func() bool { 2426 // get metric value 2427 actualValue, err := scm.getSnapshotControllerMetricValue() 2428 if err != nil { 2429 return false 2430 } 2431 2432 // Another operation could have finished from a previous test, 2433 // so we check if we have at least the expected value. 2434 if actualValue < expectedValue { 2435 return false 2436 } 2437 2438 return true 2439 }); successful { 2440 return 2441 } 2442 2443 scm.showMetricsFailure(metricKey) 2444 framework.Failf("Unable to get valid snapshot controller metrics after %v", timeout) 2445} 2446 2447func (scm *snapshotControllerMetrics) getSnapshotControllerMetricValue() (float64, error) { 2448 metricKey := scm.getMetricKey() 2449 2450 // grab and parse into readable format 2451 err := scm.grabSnapshotControllerMetrics() 2452 if err != nil { 2453 return 0, err 2454 } 2455 2456 metrics := scm.getMetricsTable() 2457 actual, ok := metrics[metricKey] 2458 if !ok { 2459 return 0, fmt.Errorf("did not find metric for key %s", metricKey) 2460 } 2461 2462 return actual, nil 2463} 2464 2465func (scm *snapshotControllerMetrics) getMetricsTable() map[string]float64 { 2466 var metrics map[string]float64 2467 switch scm.cfg.metricType { 2468 case "count": 2469 metrics = scm.countMetrics 2470 2471 case "sum": 2472 metrics = scm.sumMetrics 2473 2474 case "bucket": 2475 metrics = scm.bucketMetrics 2476 } 2477 2478 return metrics 2479} 2480 2481func (scm *snapshotControllerMetrics) showMetricsFailure(metricKey string) { 2482 framework.Logf("failed to find metric key %s inside of the following metrics:", metricKey) 2483 2484 metrics := scm.getMetricsTable() 2485 for k, v := range metrics { 2486 framework.Logf("%s: %v", k, v) 2487 } 2488} 2489 2490func (scm *snapshotControllerMetrics) grabSnapshotControllerMetrics() error { 2491 // pull all metrics 2492 metrics, err := scm.metricsGrabber.GrabFromSnapshotController(framework.TestContext.SnapshotControllerPodName, framework.TestContext.SnapshotControllerHTTPPort) 2493 if err != nil { 2494 return err 2495 } 2496 2497 for method, samples := range metrics { 2498 2499 for _, sample := range samples { 2500 operationName := string(sample.Metric["operation_name"]) 2501 driverName := string(sample.Metric["driver_name"]) 2502 operationStatus := string(sample.Metric["operation_status"]) 2503 snapshotType := string(sample.Metric["snapshot_type"]) 2504 le := string(sample.Metric["le"]) 2505 key := snapshotMetricKey(scm.cfg.metricName, driverName, operationName, operationStatus, snapshotType, le) 2506 2507 switch method { 2508 case "snapshot_controller_operation_total_seconds_count": 2509 for _, sample := range samples { 2510 scm.countMetrics[key] = float64(sample.Value) 2511 } 2512 2513 case "snapshot_controller_operation_total_seconds_sum": 2514 for _, sample := range samples { 2515 scm.sumMetrics[key] = float64(sample.Value) 2516 } 2517 2518 case "snapshot_controller_operation_total_seconds_bucket": 2519 for _, sample := range samples { 2520 scm.bucketMetrics[key] = float64(sample.Value) 2521 } 2522 } 2523 } 2524 } 2525 2526 return nil 2527} 2528 2529func (scm *snapshotControllerMetrics) getMetricKey() string { 2530 return snapshotMetricKey(scm.cfg.metricName, scm.cfg.driverName, scm.cfg.operationName, scm.cfg.operationStatus, scm.cfg.snapshotType, scm.cfg.le) 2531} 2532 2533func snapshotMetricKey(metricName, driverName, operationName, operationStatus, snapshotType, le string) string { 2534 key := driverName 2535 2536 // build key for shorthand metrics storage 2537 for _, s := range []string{metricName, operationName, operationStatus, snapshotType, le} { 2538 if s != "" { 2539 key = fmt.Sprintf("%s_%s", key, s) 2540 } 2541 } 2542 2543 return key 2544} 2545