1/*
2Copyright 2019 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package storage
18
19import (
20	"context"
21	"crypto/sha256"
22	"errors"
23	"fmt"
24	"math/rand"
25	"strconv"
26	"strings"
27	"sync/atomic"
28	"time"
29
30	"google.golang.org/grpc/codes"
31	"google.golang.org/grpc/status"
32	v1 "k8s.io/api/core/v1"
33	storagev1 "k8s.io/api/storage/v1"
34	storagev1beta1 "k8s.io/api/storage/v1beta1"
35	apierrors "k8s.io/apimachinery/pkg/api/errors"
36	"k8s.io/apimachinery/pkg/api/resource"
37	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
38	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
39	"k8s.io/apimachinery/pkg/fields"
40	utilerrors "k8s.io/apimachinery/pkg/util/errors"
41	"k8s.io/apimachinery/pkg/util/sets"
42	"k8s.io/apimachinery/pkg/util/wait"
43	"k8s.io/apimachinery/pkg/watch"
44	clientset "k8s.io/client-go/kubernetes"
45	cachetools "k8s.io/client-go/tools/cache"
46	watchtools "k8s.io/client-go/tools/watch"
47	"k8s.io/kubernetes/pkg/controller/volume/scheduling"
48	"k8s.io/kubernetes/pkg/kubelet/events"
49	"k8s.io/kubernetes/test/e2e/framework"
50	e2eevents "k8s.io/kubernetes/test/e2e/framework/events"
51	e2emetrics "k8s.io/kubernetes/test/e2e/framework/metrics"
52	e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
53	e2epv "k8s.io/kubernetes/test/e2e/framework/pv"
54	e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
55	e2evolume "k8s.io/kubernetes/test/e2e/framework/volume"
56	"k8s.io/kubernetes/test/e2e/storage/drivers"
57	storageframework "k8s.io/kubernetes/test/e2e/storage/framework"
58	"k8s.io/kubernetes/test/e2e/storage/testsuites"
59	"k8s.io/kubernetes/test/e2e/storage/utils"
60	imageutils "k8s.io/kubernetes/test/utils/image"
61	utilptr "k8s.io/utils/pointer"
62
63	"github.com/onsi/ginkgo"
64	"github.com/onsi/gomega"
65)
66
67const (
68	csiNodeLimitUpdateTimeout  = 5 * time.Minute
69	csiPodUnschedulableTimeout = 5 * time.Minute
70	csiResizeWaitPeriod        = 5 * time.Minute
71	csiVolumeAttachmentTimeout = 7 * time.Minute
72	// how long to wait for Resizing Condition on PVC to appear
73	csiResizingConditionWait = 2 * time.Minute
74
75	// Time for starting a pod with a volume.
76	csiPodRunningTimeout = 5 * time.Minute
77
78	// How log to wait for kubelet to unstage a volume after a pod is deleted
79	csiUnstageWaitTimeout = 1 * time.Minute
80)
81
82// csiCall represents an expected call from Kubernetes to CSI mock driver and
83// expected return value.
84// When matching expected csiCall with a real CSI mock driver output, one csiCall
85// matches *one or more* calls with the same method and error code.
86// This is due to exponential backoff in Kubernetes, where the test cannot expect
87// exact number of call repetitions.
88type csiCall struct {
89	expectedMethod string
90	expectedError  codes.Code
91	// This is a mark for the test itself to delete the tested pod *after*
92	// this csiCall is received.
93	deletePod bool
94}
95
96var _ = utils.SIGDescribe("CSI mock volume", func() {
97	type testParameters struct {
98		disableAttach       bool
99		attachLimit         int
100		registerDriver      bool
101		lateBinding         bool
102		enableTopology      bool
103		podInfo             *bool
104		storageCapacity     *bool
105		scName              string // pre-selected storage class name; must be unique in the cluster
106		enableResizing      bool   // enable resizing for both CSI mock driver and storageClass.
107		enableNodeExpansion bool   // enable node expansion for CSI mock driver
108		// just disable resizing on driver it overrides enableResizing flag for CSI mock driver
109		disableResizingOnDriver bool
110		enableSnapshot          bool
111		hooks                   *drivers.Hooks
112		tokenRequests           []storagev1.TokenRequest
113		requiresRepublish       *bool
114		fsGroupPolicy           *storagev1.FSGroupPolicy
115	}
116
117	type mockDriverSetup struct {
118		cs           clientset.Interface
119		config       *storageframework.PerTestConfig
120		testCleanups []func()
121		pods         []*v1.Pod
122		pvcs         []*v1.PersistentVolumeClaim
123		sc           map[string]*storagev1.StorageClass
124		vsc          map[string]*unstructured.Unstructured
125		driver       drivers.MockCSITestDriver
126		provisioner  string
127		tp           testParameters
128	}
129
130	var m mockDriverSetup
131
132	f := framework.NewDefaultFramework("csi-mock-volumes")
133
134	init := func(tp testParameters) {
135		m = mockDriverSetup{
136			cs:  f.ClientSet,
137			sc:  make(map[string]*storagev1.StorageClass),
138			vsc: make(map[string]*unstructured.Unstructured),
139			tp:  tp,
140		}
141		cs := f.ClientSet
142		var err error
143		driverOpts := drivers.CSIMockDriverOpts{
144			RegisterDriver:      tp.registerDriver,
145			PodInfo:             tp.podInfo,
146			StorageCapacity:     tp.storageCapacity,
147			EnableTopology:      tp.enableTopology,
148			AttachLimit:         tp.attachLimit,
149			DisableAttach:       tp.disableAttach,
150			EnableResizing:      tp.enableResizing,
151			EnableNodeExpansion: tp.enableNodeExpansion,
152			EnableSnapshot:      tp.enableSnapshot,
153			TokenRequests:       tp.tokenRequests,
154			RequiresRepublish:   tp.requiresRepublish,
155			FSGroupPolicy:       tp.fsGroupPolicy,
156		}
157
158		// At the moment, only tests which need hooks are
159		// using the embedded CSI mock driver. The rest run
160		// the driver inside the cluster although they could
161		// changed to use embedding merely by setting
162		// driverOpts.embedded to true.
163		//
164		// Not enabling it for all tests minimizes
165		// the risk that the introduction of embedded breaks
166		// some existings tests and avoids a dependency
167		// on port forwarding, which is important if some of
168		// these tests are supposed to become part of
169		// conformance testing (port forwarding isn't
170		// currently required).
171		if tp.hooks != nil {
172			driverOpts.Embedded = true
173			driverOpts.Hooks = *tp.hooks
174		}
175
176		// this just disable resizing on driver, keeping resizing on SC enabled.
177		if tp.disableResizingOnDriver {
178			driverOpts.EnableResizing = false
179		}
180
181		m.driver = drivers.InitMockCSIDriver(driverOpts)
182		config, testCleanup := m.driver.PrepareTest(f)
183		m.testCleanups = append(m.testCleanups, testCleanup)
184		m.config = config
185		m.provisioner = config.GetUniqueDriverName()
186
187		if tp.registerDriver {
188			err = waitForCSIDriver(cs, m.config.GetUniqueDriverName())
189			framework.ExpectNoError(err, "Failed to get CSIDriver %v", m.config.GetUniqueDriverName())
190			m.testCleanups = append(m.testCleanups, func() {
191				destroyCSIDriver(cs, m.config.GetUniqueDriverName())
192			})
193		}
194
195		// Wait for the CSIDriver actually get deployed and CSINode object to be generated.
196		// This indicates the mock CSI driver pod is up and running healthy.
197		err = drivers.WaitForCSIDriverRegistrationOnNode(m.config.ClientNodeSelection.Name, m.config.GetUniqueDriverName(), cs)
198		framework.ExpectNoError(err, "Failed to register CSIDriver %v", m.config.GetUniqueDriverName())
199	}
200
201	createPod := func(ephemeral bool) (class *storagev1.StorageClass, claim *v1.PersistentVolumeClaim, pod *v1.Pod) {
202		ginkgo.By("Creating pod")
203		sc := m.driver.GetDynamicProvisionStorageClass(m.config, "")
204		scTest := testsuites.StorageClassTest{
205			Name:                 m.driver.GetDriverInfo().Name,
206			Timeouts:             f.Timeouts,
207			Provisioner:          sc.Provisioner,
208			Parameters:           sc.Parameters,
209			ClaimSize:            "1Gi",
210			ExpectedSize:         "1Gi",
211			DelayBinding:         m.tp.lateBinding,
212			AllowVolumeExpansion: m.tp.enableResizing,
213		}
214
215		// The mock driver only works when everything runs on a single node.
216		nodeSelection := m.config.ClientNodeSelection
217		if ephemeral {
218			pod = startPausePodInline(f.ClientSet, scTest, nodeSelection, f.Namespace.Name)
219			if pod != nil {
220				m.pods = append(m.pods, pod)
221			}
222		} else {
223			class, claim, pod = startPausePod(f.ClientSet, scTest, nodeSelection, m.tp.scName, f.Namespace.Name)
224			if class != nil {
225				m.sc[class.Name] = class
226			}
227			if claim != nil {
228				m.pvcs = append(m.pvcs, claim)
229			}
230			if pod != nil {
231				m.pods = append(m.pods, pod)
232			}
233		}
234		return // result variables set above
235	}
236
237	createPodWithPVC := func(pvc *v1.PersistentVolumeClaim) (*v1.Pod, error) {
238		nodeSelection := m.config.ClientNodeSelection
239		pod, err := startPausePodWithClaim(m.cs, pvc, nodeSelection, f.Namespace.Name)
240		if pod != nil {
241			m.pods = append(m.pods, pod)
242		}
243		return pod, err
244	}
245
246	createPodWithFSGroup := func(fsGroup *int64) (*storagev1.StorageClass, *v1.PersistentVolumeClaim, *v1.Pod) {
247		ginkgo.By("Creating pod with fsGroup")
248		nodeSelection := m.config.ClientNodeSelection
249		sc := m.driver.GetDynamicProvisionStorageClass(m.config, "")
250		scTest := testsuites.StorageClassTest{
251			Name:                 m.driver.GetDriverInfo().Name,
252			Provisioner:          sc.Provisioner,
253			Parameters:           sc.Parameters,
254			ClaimSize:            "1Gi",
255			ExpectedSize:         "1Gi",
256			DelayBinding:         m.tp.lateBinding,
257			AllowVolumeExpansion: m.tp.enableResizing,
258		}
259
260		class, claim, pod := startBusyBoxPod(f.ClientSet, scTest, nodeSelection, m.tp.scName, f.Namespace.Name, fsGroup)
261
262		if class != nil {
263			m.sc[class.Name] = class
264		}
265		if claim != nil {
266			m.pvcs = append(m.pvcs, claim)
267		}
268
269		if pod != nil {
270			m.pods = append(m.pods, pod)
271		}
272
273		return class, claim, pod
274	}
275
276	cleanup := func() {
277		cs := f.ClientSet
278		var errs []error
279
280		for _, pod := range m.pods {
281			ginkgo.By(fmt.Sprintf("Deleting pod %s", pod.Name))
282			errs = append(errs, e2epod.DeletePodWithWait(cs, pod))
283		}
284
285		for _, claim := range m.pvcs {
286			ginkgo.By(fmt.Sprintf("Deleting claim %s", claim.Name))
287			claim, err := cs.CoreV1().PersistentVolumeClaims(claim.Namespace).Get(context.TODO(), claim.Name, metav1.GetOptions{})
288			if err == nil {
289				if err := cs.CoreV1().PersistentVolumeClaims(claim.Namespace).Delete(context.TODO(), claim.Name, metav1.DeleteOptions{}); err != nil {
290					errs = append(errs, err)
291				}
292				if claim.Spec.VolumeName != "" {
293					errs = append(errs, e2epv.WaitForPersistentVolumeDeleted(cs, claim.Spec.VolumeName, framework.Poll, 2*time.Minute))
294				}
295			}
296		}
297
298		for _, sc := range m.sc {
299			ginkgo.By(fmt.Sprintf("Deleting storageclass %s", sc.Name))
300			cs.StorageV1().StorageClasses().Delete(context.TODO(), sc.Name, metav1.DeleteOptions{})
301		}
302
303		for _, vsc := range m.vsc {
304			ginkgo.By(fmt.Sprintf("Deleting volumesnapshotclass %s", vsc.GetName()))
305			m.config.Framework.DynamicClient.Resource(utils.SnapshotClassGVR).Delete(context.TODO(), vsc.GetName(), metav1.DeleteOptions{})
306		}
307		ginkgo.By("Cleaning up resources")
308		for _, cleanupFunc := range m.testCleanups {
309			cleanupFunc()
310		}
311
312		err := utilerrors.NewAggregate(errs)
313		framework.ExpectNoError(err, "while cleaning up after test")
314	}
315
316	// The CSIDriverRegistry feature gate is needed for this test in Kubernetes 1.12.
317	ginkgo.Context("CSI attach test using mock driver", func() {
318		tests := []struct {
319			name                   string
320			disableAttach          bool
321			deployClusterRegistrar bool
322		}{
323			{
324				name:                   "should not require VolumeAttach for drivers without attachment",
325				disableAttach:          true,
326				deployClusterRegistrar: true,
327			},
328			{
329				name:                   "should require VolumeAttach for drivers with attachment",
330				deployClusterRegistrar: true,
331			},
332			{
333				name:                   "should preserve attachment policy when no CSIDriver present",
334				deployClusterRegistrar: false,
335			},
336		}
337		for _, t := range tests {
338			test := t
339			ginkgo.It(t.name, func() {
340				var err error
341				init(testParameters{registerDriver: test.deployClusterRegistrar, disableAttach: test.disableAttach})
342				defer cleanup()
343
344				_, claim, pod := createPod(false)
345				if pod == nil {
346					return
347				}
348				err = e2epod.WaitForPodNameRunningInNamespace(m.cs, pod.Name, pod.Namespace)
349				framework.ExpectNoError(err, "Failed to start pod: %v", err)
350
351				ginkgo.By("Checking if VolumeAttachment was created for the pod")
352				handle := getVolumeHandle(m.cs, claim)
353				attachmentHash := sha256.Sum256([]byte(fmt.Sprintf("%s%s%s", handle, m.provisioner, m.config.ClientNodeSelection.Name)))
354				attachmentName := fmt.Sprintf("csi-%x", attachmentHash)
355				_, err = m.cs.StorageV1().VolumeAttachments().Get(context.TODO(), attachmentName, metav1.GetOptions{})
356				if err != nil {
357					if apierrors.IsNotFound(err) {
358						if !test.disableAttach {
359							framework.ExpectNoError(err, "Expected VolumeAttachment but none was found")
360						}
361					} else {
362						framework.ExpectNoError(err, "Failed to find VolumeAttachment")
363					}
364				}
365				if test.disableAttach {
366					framework.ExpectError(err, "Unexpected VolumeAttachment found")
367				}
368			})
369
370		}
371	})
372
373	ginkgo.Context("CSI CSIDriver deployment after pod creation using non-attachable mock driver", func() {
374		ginkgo.It("should bringup pod after deploying CSIDriver attach=false [Slow]", func() {
375			var err error
376			init(testParameters{registerDriver: false, disableAttach: true})
377			defer cleanup()
378
379			_, claim, pod := createPod(false /* persistent volume, late binding as specified above */)
380			if pod == nil {
381				return
382			}
383
384			ginkgo.By("Checking if attaching failed and pod cannot start")
385			eventSelector := fields.Set{
386				"involvedObject.kind":      "Pod",
387				"involvedObject.name":      pod.Name,
388				"involvedObject.namespace": pod.Namespace,
389				"reason":                   events.FailedAttachVolume,
390			}.AsSelector().String()
391			msg := "AttachVolume.Attach failed for volume"
392
393			err = e2eevents.WaitTimeoutForEvent(m.cs, pod.Namespace, eventSelector, msg, f.Timeouts.PodStart)
394			if err != nil {
395				podErr := e2epod.WaitTimeoutForPodRunningInNamespace(m.cs, pod.Name, pod.Namespace, 10*time.Second)
396				framework.ExpectError(podErr, "Pod should not be in running status because attaching should failed")
397				// Events are unreliable, don't depend on the event. It's used only to speed up the test.
398				framework.Logf("Attach should fail and the corresponding event should show up, error: %v", err)
399			}
400
401			// VolumeAttachment should be created because the default value for CSI attachable is true
402			ginkgo.By("Checking if VolumeAttachment was created for the pod")
403			handle := getVolumeHandle(m.cs, claim)
404			attachmentHash := sha256.Sum256([]byte(fmt.Sprintf("%s%s%s", handle, m.provisioner, m.config.ClientNodeSelection.Name)))
405			attachmentName := fmt.Sprintf("csi-%x", attachmentHash)
406			_, err = m.cs.StorageV1().VolumeAttachments().Get(context.TODO(), attachmentName, metav1.GetOptions{})
407			if err != nil {
408				if apierrors.IsNotFound(err) {
409					framework.ExpectNoError(err, "Expected VolumeAttachment but none was found")
410				} else {
411					framework.ExpectNoError(err, "Failed to find VolumeAttachment")
412				}
413			}
414
415			ginkgo.By("Deploy CSIDriver object with attachRequired=false")
416			driverNamespace := m.config.DriverNamespace
417
418			canAttach := false
419			o := utils.PatchCSIOptions{
420				OldDriverName: "csi-mock",
421				NewDriverName: "csi-mock-" + f.UniqueName,
422				CanAttach:     &canAttach,
423			}
424			cleanupCSIDriver, err := utils.CreateFromManifests(f, driverNamespace, func(item interface{}) error {
425				return utils.PatchCSIDeployment(f, o, item)
426			}, "test/e2e/testing-manifests/storage-csi/mock/csi-mock-driverinfo.yaml")
427			if err != nil {
428				framework.Failf("fail to deploy CSIDriver object: %v", err)
429			}
430			m.testCleanups = append(m.testCleanups, cleanupCSIDriver)
431
432			ginkgo.By("Wait for the pod in running status")
433			err = e2epod.WaitForPodNameRunningInNamespace(m.cs, pod.Name, pod.Namespace)
434			framework.ExpectNoError(err, "Failed to start pod: %v", err)
435
436			ginkgo.By(fmt.Sprintf("Wait for the volumeattachment to be deleted up to %v", csiVolumeAttachmentTimeout))
437			// This step can be slow because we have to wait either a NodeUpdate event happens or
438			// the detachment for this volume timeout so that we can do a force detach.
439			err = waitForVolumeAttachmentTerminated(attachmentName, m.cs)
440			framework.ExpectNoError(err, "Failed to delete VolumeAttachment: %v", err)
441		})
442	})
443
444	ginkgo.Context("CSI workload information using mock driver", func() {
445		var (
446			err          error
447			podInfoTrue  = true
448			podInfoFalse = false
449		)
450		tests := []struct {
451			name                   string
452			podInfoOnMount         *bool
453			deployClusterRegistrar bool
454			expectPodInfo          bool
455			expectEphemeral        bool
456		}{
457			{
458				name:                   "should not be passed when podInfoOnMount=nil",
459				podInfoOnMount:         nil,
460				deployClusterRegistrar: true,
461				expectPodInfo:          false,
462				expectEphemeral:        false,
463			},
464			{
465				name:                   "should be passed when podInfoOnMount=true",
466				podInfoOnMount:         &podInfoTrue,
467				deployClusterRegistrar: true,
468				expectPodInfo:          true,
469				expectEphemeral:        false,
470			},
471			{
472				name:                   "contain ephemeral=true when using inline volume",
473				podInfoOnMount:         &podInfoTrue,
474				deployClusterRegistrar: true,
475				expectPodInfo:          true,
476				expectEphemeral:        true,
477			},
478			{
479				name:                   "should not be passed when podInfoOnMount=false",
480				podInfoOnMount:         &podInfoFalse,
481				deployClusterRegistrar: true,
482				expectPodInfo:          false,
483				expectEphemeral:        false,
484			},
485			{
486				name:                   "should not be passed when CSIDriver does not exist",
487				deployClusterRegistrar: false,
488				expectPodInfo:          false,
489				expectEphemeral:        false,
490			},
491		}
492		for _, t := range tests {
493			test := t
494			ginkgo.It(t.name, func() {
495				init(testParameters{
496					registerDriver: test.deployClusterRegistrar,
497					podInfo:        test.podInfoOnMount})
498
499				defer cleanup()
500
501				_, _, pod := createPod(test.expectEphemeral)
502				if pod == nil {
503					return
504				}
505				err = e2epod.WaitForPodNameRunningInNamespace(m.cs, pod.Name, pod.Namespace)
506				framework.ExpectNoError(err, "Failed to start pod: %v", err)
507
508				// If we expect an ephemeral volume, the feature has to be enabled.
509				// Otherwise need to check if we expect pod info, because the content
510				// of that depends on whether the feature is enabled or not.
511				csiInlineVolumesEnabled := test.expectEphemeral
512				if test.expectPodInfo {
513					ginkgo.By("checking for CSIInlineVolumes feature")
514					csiInlineVolumesEnabled, err = testsuites.CSIInlineVolumesEnabled(m.cs, f.Timeouts, f.Namespace.Name)
515					framework.ExpectNoError(err, "failed to test for CSIInlineVolumes")
516				}
517
518				ginkgo.By("Deleting the previously created pod")
519				err = e2epod.DeletePodWithWait(m.cs, pod)
520				framework.ExpectNoError(err, "while deleting")
521
522				ginkgo.By("Checking CSI driver logs")
523				err = checkPodLogs(m.driver.GetCalls, pod, test.expectPodInfo, test.expectEphemeral, csiInlineVolumesEnabled, false, 1)
524				framework.ExpectNoError(err)
525			})
526		}
527	})
528
529	ginkgo.Context("CSI volume limit information using mock driver", func() {
530		ginkgo.It("should report attach limit when limit is bigger than 0 [Slow]", func() {
531			// define volume limit to be 2 for this test
532			var err error
533			init(testParameters{attachLimit: 2})
534			defer cleanup()
535			nodeName := m.config.ClientNodeSelection.Name
536			driverName := m.config.GetUniqueDriverName()
537
538			csiNodeAttachLimit, err := checkCSINodeForLimits(nodeName, driverName, m.cs)
539			framework.ExpectNoError(err, "while checking limits in CSINode: %v", err)
540
541			gomega.Expect(csiNodeAttachLimit).To(gomega.BeNumerically("==", 2))
542
543			_, _, pod1 := createPod(false)
544			gomega.Expect(pod1).NotTo(gomega.BeNil(), "while creating first pod")
545
546			err = e2epod.WaitForPodNameRunningInNamespace(m.cs, pod1.Name, pod1.Namespace)
547			framework.ExpectNoError(err, "Failed to start pod1: %v", err)
548
549			_, _, pod2 := createPod(false)
550			gomega.Expect(pod2).NotTo(gomega.BeNil(), "while creating second pod")
551
552			err = e2epod.WaitForPodNameRunningInNamespace(m.cs, pod2.Name, pod2.Namespace)
553			framework.ExpectNoError(err, "Failed to start pod2: %v", err)
554
555			_, _, pod3 := createPod(false)
556			gomega.Expect(pod3).NotTo(gomega.BeNil(), "while creating third pod")
557			err = waitForMaxVolumeCondition(pod3, m.cs)
558			framework.ExpectNoError(err, "while waiting for max volume condition on pod : %+v", pod3)
559		})
560	})
561
562	ginkgo.Context("CSI Volume expansion", func() {
563		tests := []struct {
564			name                    string
565			nodeExpansionRequired   bool
566			disableAttach           bool
567			disableResizingOnDriver bool
568			expectFailure           bool
569		}{
570			{
571				name:                  "should expand volume without restarting pod if nodeExpansion=off",
572				nodeExpansionRequired: false,
573			},
574			{
575				name:                  "should expand volume by restarting pod if attach=on, nodeExpansion=on",
576				nodeExpansionRequired: true,
577			},
578			{
579				name:                  "should expand volume by restarting pod if attach=off, nodeExpansion=on",
580				disableAttach:         true,
581				nodeExpansionRequired: true,
582			},
583			{
584				name:                    "should not expand volume if resizingOnDriver=off, resizingOnSC=on",
585				disableResizingOnDriver: true,
586				expectFailure:           true,
587			},
588		}
589		for _, t := range tests {
590			test := t
591			ginkgo.It(t.name, func() {
592				var err error
593				tp := testParameters{
594					enableResizing:          true,
595					enableNodeExpansion:     test.nodeExpansionRequired,
596					disableResizingOnDriver: test.disableResizingOnDriver,
597				}
598				// disabling attach requires drive registration feature
599				if test.disableAttach {
600					tp.disableAttach = true
601					tp.registerDriver = true
602				}
603
604				init(tp)
605				defer cleanup()
606
607				sc, pvc, pod := createPod(false)
608				gomega.Expect(pod).NotTo(gomega.BeNil(), "while creating pod for resizing")
609
610				framework.ExpectEqual(*sc.AllowVolumeExpansion, true, "failed creating sc with allowed expansion")
611
612				err = e2epod.WaitForPodNameRunningInNamespace(m.cs, pod.Name, pod.Namespace)
613				framework.ExpectNoError(err, "Failed to start pod1: %v", err)
614
615				ginkgo.By("Expanding current pvc")
616				newSize := resource.MustParse("6Gi")
617				newPVC, err := testsuites.ExpandPVCSize(pvc, newSize, m.cs)
618				framework.ExpectNoError(err, "While updating pvc for more size")
619				pvc = newPVC
620				gomega.Expect(pvc).NotTo(gomega.BeNil())
621
622				pvcSize := pvc.Spec.Resources.Requests[v1.ResourceStorage]
623				if pvcSize.Cmp(newSize) != 0 {
624					framework.Failf("error updating pvc size %q", pvc.Name)
625				}
626				if test.expectFailure {
627					err = testsuites.WaitForResizingCondition(pvc, m.cs, csiResizingConditionWait)
628					framework.ExpectError(err, "unexpected resizing condition on PVC")
629					return
630				}
631
632				ginkgo.By("Waiting for persistent volume resize to finish")
633				err = testsuites.WaitForControllerVolumeResize(pvc, m.cs, csiResizeWaitPeriod)
634				framework.ExpectNoError(err, "While waiting for CSI PV resize to finish")
635
636				checkPVCSize := func() {
637					ginkgo.By("Waiting for PVC resize to finish")
638					pvc, err = testsuites.WaitForFSResize(pvc, m.cs)
639					framework.ExpectNoError(err, "while waiting for PVC resize to finish")
640
641					pvcConditions := pvc.Status.Conditions
642					framework.ExpectEqual(len(pvcConditions), 0, "pvc should not have conditions")
643				}
644
645				// if node expansion is not required PVC should be resized as well
646				if !test.nodeExpansionRequired {
647					checkPVCSize()
648				} else {
649					ginkgo.By("Checking for conditions on pvc")
650					npvc, err := testsuites.WaitForPendingFSResizeCondition(pvc, m.cs)
651					framework.ExpectNoError(err, "While waiting for pvc to have fs resizing condition")
652					pvc = npvc
653
654					inProgressConditions := pvc.Status.Conditions
655					if len(inProgressConditions) > 0 {
656						framework.ExpectEqual(inProgressConditions[0].Type, v1.PersistentVolumeClaimFileSystemResizePending, "pvc must have fs resizing condition")
657					}
658
659					ginkgo.By("Deleting the previously created pod")
660					err = e2epod.DeletePodWithWait(m.cs, pod)
661					framework.ExpectNoError(err, "while deleting pod for resizing")
662
663					ginkgo.By("Creating a new pod with same volume")
664					pod2, err := createPodWithPVC(pvc)
665					gomega.Expect(pod2).NotTo(gomega.BeNil(), "while creating pod for csi resizing")
666					framework.ExpectNoError(err, "while recreating pod for resizing")
667
668					checkPVCSize()
669				}
670			})
671		}
672	})
673	ginkgo.Context("CSI online volume expansion", func() {
674		tests := []struct {
675			name          string
676			disableAttach bool
677		}{
678			{
679				name: "should expand volume without restarting pod if attach=on, nodeExpansion=on",
680			},
681			{
682				name:          "should expand volume without restarting pod if attach=off, nodeExpansion=on",
683				disableAttach: true,
684			},
685		}
686		for _, t := range tests {
687			test := t
688			ginkgo.It(test.name, func() {
689				var err error
690				params := testParameters{enableResizing: true, enableNodeExpansion: true}
691				if test.disableAttach {
692					params.disableAttach = true
693					params.registerDriver = true
694				}
695
696				init(params)
697
698				defer cleanup()
699
700				sc, pvc, pod := createPod(false)
701				gomega.Expect(pod).NotTo(gomega.BeNil(), "while creating pod for resizing")
702
703				framework.ExpectEqual(*sc.AllowVolumeExpansion, true, "failed creating sc with allowed expansion")
704
705				err = e2epod.WaitForPodNameRunningInNamespace(m.cs, pod.Name, pod.Namespace)
706				framework.ExpectNoError(err, "Failed to start pod1: %v", err)
707
708				ginkgo.By("Expanding current pvc")
709				newSize := resource.MustParse("6Gi")
710				newPVC, err := testsuites.ExpandPVCSize(pvc, newSize, m.cs)
711				framework.ExpectNoError(err, "While updating pvc for more size")
712				pvc = newPVC
713				gomega.Expect(pvc).NotTo(gomega.BeNil())
714
715				pvcSize := pvc.Spec.Resources.Requests[v1.ResourceStorage]
716				if pvcSize.Cmp(newSize) != 0 {
717					framework.Failf("error updating pvc size %q", pvc.Name)
718				}
719
720				ginkgo.By("Waiting for persistent volume resize to finish")
721				err = testsuites.WaitForControllerVolumeResize(pvc, m.cs, csiResizeWaitPeriod)
722				framework.ExpectNoError(err, "While waiting for PV resize to finish")
723
724				ginkgo.By("Waiting for PVC resize to finish")
725				pvc, err = testsuites.WaitForFSResize(pvc, m.cs)
726				framework.ExpectNoError(err, "while waiting for PVC to finish")
727
728				pvcConditions := pvc.Status.Conditions
729				framework.ExpectEqual(len(pvcConditions), 0, "pvc should not have conditions")
730
731			})
732		}
733	})
734
735	ginkgo.Context("CSI NodeStage error cases [Slow]", func() {
736		trackedCalls := []string{
737			"NodeStageVolume",
738			"NodeUnstageVolume",
739		}
740
741		tests := []struct {
742			name             string
743			expectPodRunning bool
744			expectedCalls    []csiCall
745
746			// Called for each NodeStateVolume calls, with counter incremented atomically before
747			// the invocation (i.e. first value will be 1).
748			nodeStageHook func(counter int64) error
749		}{
750			{
751				// This is already tested elsewhere, adding simple good case here to test the test framework.
752				name:             "should call NodeUnstage after NodeStage success",
753				expectPodRunning: true,
754				expectedCalls: []csiCall{
755					{expectedMethod: "NodeStageVolume", expectedError: codes.OK, deletePod: true},
756					{expectedMethod: "NodeUnstageVolume", expectedError: codes.OK},
757				},
758			},
759			{
760				// Kubelet should repeat NodeStage as long as the pod exists
761				name:             "should retry NodeStage after NodeStage final error",
762				expectPodRunning: true,
763				expectedCalls: []csiCall{
764					// This matches all 3 NodeStage calls with InvalidArgument error
765					{expectedMethod: "NodeStageVolume", expectedError: codes.InvalidArgument},
766					{expectedMethod: "NodeStageVolume", expectedError: codes.OK, deletePod: true},
767					{expectedMethod: "NodeUnstageVolume", expectedError: codes.OK},
768				},
769				// Fail first 3 NodeStage requests, 4th succeeds
770				nodeStageHook: func(counter int64) error {
771					if counter < 4 {
772						return status.Error(codes.InvalidArgument, "fake error")
773					}
774					return nil
775				},
776			},
777			{
778				// Kubelet should repeat NodeStage as long as the pod exists
779				name:             "should retry NodeStage after NodeStage ephemeral error",
780				expectPodRunning: true,
781				expectedCalls: []csiCall{
782					// This matches all 3 NodeStage calls with DeadlineExceeded error
783					{expectedMethod: "NodeStageVolume", expectedError: codes.DeadlineExceeded},
784					{expectedMethod: "NodeStageVolume", expectedError: codes.OK, deletePod: true},
785					{expectedMethod: "NodeUnstageVolume", expectedError: codes.OK},
786				},
787				// Fail first 3 NodeStage requests, 4th succeeds
788				nodeStageHook: func(counter int64) error {
789					if counter < 4 {
790						return status.Error(codes.DeadlineExceeded, "fake error")
791					}
792					return nil
793				},
794			},
795			{
796				// After NodeUnstage with ephemeral error, the driver may continue staging the volume.
797				// Kubelet should call NodeUnstage to make sure the volume is really unstaged after
798				// the pod is deleted.
799				name:             "should call NodeUnstage after NodeStage ephemeral error",
800				expectPodRunning: false,
801				expectedCalls: []csiCall{
802					// Delete the pod before NodeStage succeeds - it should get "uncertain" because of ephemeral error
803					// This matches all repeated NodeStage calls with DeadlineExceeded error (due to exp. backoff).
804					{expectedMethod: "NodeStageVolume", expectedError: codes.DeadlineExceeded, deletePod: true},
805					{expectedMethod: "NodeUnstageVolume", expectedError: codes.OK},
806				},
807				nodeStageHook: func(counter int64) error {
808					return status.Error(codes.DeadlineExceeded, "fake error")
809				},
810			},
811			{
812				// After NodeUnstage with final error, kubelet can be sure the volume is not staged.
813				// The test checks that NodeUnstage is *not* called.
814				name:             "should not call NodeUnstage after NodeStage final error",
815				expectPodRunning: false,
816				expectedCalls: []csiCall{
817					// Delete the pod before NodeStage succeeds - it should get "globally unmounted" because of final error.
818					// This matches all repeated NodeStage calls with InvalidArgument error (due to exp. backoff).
819					{expectedMethod: "NodeStageVolume", expectedError: codes.InvalidArgument, deletePod: true},
820				},
821				// nodeStageScript: `INVALIDARGUMENT;`,
822				nodeStageHook: func(counter int64) error {
823					return status.Error(codes.InvalidArgument, "fake error")
824				},
825			},
826		}
827		for _, t := range tests {
828			test := t
829			ginkgo.It(test.name, func() {
830				var hooks *drivers.Hooks
831				if test.nodeStageHook != nil {
832					hooks = createPreHook("NodeStageVolume", test.nodeStageHook)
833				}
834				init(testParameters{
835					disableAttach:  true,
836					registerDriver: true,
837					hooks:          hooks,
838				})
839				defer cleanup()
840
841				_, claim, pod := createPod(false)
842				if pod == nil {
843					return
844				}
845				// Wait for PVC to get bound to make sure the CSI driver is fully started.
846				err := e2epv.WaitForPersistentVolumeClaimPhase(v1.ClaimBound, f.ClientSet, f.Namespace.Name, claim.Name, time.Second, framework.ClaimProvisionTimeout)
847				framework.ExpectNoError(err, "while waiting for PVC to get provisioned")
848
849				ginkgo.By("Waiting for expected CSI calls")
850				// Watch for all calls up to deletePod = true
851				ctx, cancel := context.WithTimeout(context.Background(), csiPodRunningTimeout)
852				defer cancel()
853				for {
854					if ctx.Err() != nil {
855						framework.Failf("timed out waiting for the CSI call that indicates that the pod can be deleted: %v", test.expectedCalls)
856					}
857					time.Sleep(1 * time.Second)
858					_, index, err := compareCSICalls(trackedCalls, test.expectedCalls, m.driver.GetCalls)
859					framework.ExpectNoError(err, "while waiting for initial CSI calls")
860					if index == 0 {
861						// No CSI call received yet
862						continue
863					}
864					// Check the last *received* call wanted the pod to be deleted
865					if test.expectedCalls[index-1].deletePod {
866						break
867					}
868				}
869
870				if test.expectPodRunning {
871					ginkgo.By("Waiting for pod to be running")
872					err := e2epod.WaitForPodNameRunningInNamespace(m.cs, pod.Name, pod.Namespace)
873					framework.ExpectNoError(err, "Failed to start pod: %v", err)
874				}
875
876				ginkgo.By("Deleting the previously created pod")
877				err = e2epod.DeletePodWithWait(m.cs, pod)
878				framework.ExpectNoError(err, "while deleting")
879
880				ginkgo.By("Waiting for all remaining expected CSI calls")
881				err = wait.Poll(time.Second, csiUnstageWaitTimeout, func() (done bool, err error) {
882					_, index, err := compareCSICalls(trackedCalls, test.expectedCalls, m.driver.GetCalls)
883					if err != nil {
884						return true, err
885					}
886					if index == 0 {
887						// No CSI call received yet
888						return false, nil
889					}
890					if len(test.expectedCalls) == index {
891						// all calls received
892						return true, nil
893					}
894					return false, nil
895				})
896				framework.ExpectNoError(err, "while waiting for all CSI calls")
897			})
898		}
899	})
900
901	ginkgo.Context("CSI NodeUnstage error cases [Slow]", func() {
902		trackedCalls := []string{
903			"NodeStageVolume",
904			"NodeUnstageVolume",
905		}
906
907		// Each test starts two pods in sequence.
908		// The first pod always runs successfully, but NodeUnstage hook can set various error conditions.
909		// The test then checks how NodeStage of the second pod is called.
910		tests := []struct {
911			name          string
912			expectedCalls []csiCall
913
914			// Called for each NodeStageVolume calls, with counter incremented atomically before
915			// the invocation (i.e. first value will be 1) and index of deleted pod (the first pod
916			// has index 1)
917			nodeUnstageHook func(counter, pod int64) error
918		}{
919			{
920				// This is already tested elsewhere, adding simple good case here to test the test framework.
921				name: "should call NodeStage after NodeUnstage success",
922				expectedCalls: []csiCall{
923					{expectedMethod: "NodeStageVolume", expectedError: codes.OK},
924					{expectedMethod: "NodeUnstageVolume", expectedError: codes.OK},
925					{expectedMethod: "NodeStageVolume", expectedError: codes.OK},
926					{expectedMethod: "NodeUnstageVolume", expectedError: codes.OK},
927				},
928			},
929			{
930				name: "two pods: should call NodeStage after previous NodeUnstage final error",
931				expectedCalls: []csiCall{
932					{expectedMethod: "NodeStageVolume", expectedError: codes.OK},
933					{expectedMethod: "NodeUnstageVolume", expectedError: codes.InvalidArgument},
934					{expectedMethod: "NodeStageVolume", expectedError: codes.OK},
935					{expectedMethod: "NodeUnstageVolume", expectedError: codes.OK},
936				},
937				nodeUnstageHook: func(counter, pod int64) error {
938					if pod == 1 {
939						return status.Error(codes.InvalidArgument, "fake final error")
940					}
941					return nil
942				},
943			},
944			{
945				name: "two pods: should call NodeStage after previous NodeUnstage transient error",
946				expectedCalls: []csiCall{
947					{expectedMethod: "NodeStageVolume", expectedError: codes.OK},
948					{expectedMethod: "NodeUnstageVolume", expectedError: codes.DeadlineExceeded},
949					{expectedMethod: "NodeStageVolume", expectedError: codes.OK},
950					{expectedMethod: "NodeUnstageVolume", expectedError: codes.OK},
951				},
952				nodeUnstageHook: func(counter, pod int64) error {
953					if pod == 1 {
954						return status.Error(codes.DeadlineExceeded, "fake transient error")
955					}
956					return nil
957				},
958			},
959		}
960		for _, t := range tests {
961			test := t
962			ginkgo.It(test.name, func() {
963				// Index of the last deleted pod. NodeUnstage calls are then related to this pod.
964				var deletedPodNumber int64 = 1
965				var hooks *drivers.Hooks
966				if test.nodeUnstageHook != nil {
967					hooks = createPreHook("NodeUnstageVolume", func(counter int64) error {
968						pod := atomic.LoadInt64(&deletedPodNumber)
969						return test.nodeUnstageHook(counter, pod)
970					})
971				}
972				init(testParameters{
973					disableAttach:  true,
974					registerDriver: true,
975					hooks:          hooks,
976				})
977				defer cleanup()
978
979				_, claim, pod := createPod(false)
980				if pod == nil {
981					return
982				}
983				// Wait for PVC to get bound to make sure the CSI driver is fully started.
984				err := e2epv.WaitForPersistentVolumeClaimPhase(v1.ClaimBound, f.ClientSet, f.Namespace.Name, claim.Name, time.Second, framework.ClaimProvisionTimeout)
985				framework.ExpectNoError(err, "while waiting for PVC to get provisioned")
986				err = e2epod.WaitForPodNameRunningInNamespace(m.cs, pod.Name, pod.Namespace)
987				framework.ExpectNoError(err, "while waiting for the first pod to start")
988				err = e2epod.DeletePodWithWait(m.cs, pod)
989				framework.ExpectNoError(err, "while deleting the first pod")
990
991				// Create the second pod
992				pod, err = createPodWithPVC(claim)
993				framework.ExpectNoError(err, "while creating the second pod")
994				err = e2epod.WaitForPodNameRunningInNamespace(m.cs, pod.Name, pod.Namespace)
995				framework.ExpectNoError(err, "while waiting for the second pod to start")
996				// The second pod is running and kubelet can't call NodeUnstage of the first one.
997				// Therefore incrementing the pod counter is safe here.
998				atomic.AddInt64(&deletedPodNumber, 1)
999				err = e2epod.DeletePodWithWait(m.cs, pod)
1000				framework.ExpectNoError(err, "while deleting the second pod")
1001
1002				ginkgo.By("Waiting for all remaining expected CSI calls")
1003				err = wait.Poll(time.Second, csiUnstageWaitTimeout, func() (done bool, err error) {
1004					_, index, err := compareCSICalls(trackedCalls, test.expectedCalls, m.driver.GetCalls)
1005					if err != nil {
1006						return true, err
1007					}
1008					if index == 0 {
1009						// No CSI call received yet
1010						return false, nil
1011					}
1012					if len(test.expectedCalls) == index {
1013						// all calls received
1014						return true, nil
1015					}
1016					return false, nil
1017				})
1018				framework.ExpectNoError(err, "while waiting for all CSI calls")
1019			})
1020		}
1021	})
1022
1023	ginkgo.Context("storage capacity", func() {
1024		tests := []struct {
1025			name              string
1026			resourceExhausted bool
1027			lateBinding       bool
1028			topology          bool
1029		}{
1030			{
1031				name: "unlimited",
1032			},
1033			{
1034				name:              "exhausted, immediate binding",
1035				resourceExhausted: true,
1036			},
1037			{
1038				name:              "exhausted, late binding, no topology",
1039				resourceExhausted: true,
1040				lateBinding:       true,
1041			},
1042			{
1043				name:              "exhausted, late binding, with topology",
1044				resourceExhausted: true,
1045				lateBinding:       true,
1046				topology:          true,
1047			},
1048		}
1049
1050		createVolume := "CreateVolume"
1051		deleteVolume := "DeleteVolume"
1052		// publishVolume := "NodePublishVolume"
1053		// unpublishVolume := "NodeUnpublishVolume"
1054		// stageVolume := "NodeStageVolume"
1055		// unstageVolume := "NodeUnstageVolume"
1056
1057		// These calls are assumed to occur in this order for
1058		// each test run. NodeStageVolume and
1059		// NodePublishVolume should also be deterministic and
1060		// only get called once, but sometimes kubelet calls
1061		// both multiple times, which breaks this test
1062		// (https://github.com/kubernetes/kubernetes/issues/90250).
1063		// Therefore they are temporarily commented out until
1064		// that issue is resolved.
1065		//
1066		// NodeUnpublishVolume and NodeUnstageVolume are racing
1067		// with DeleteVolume, so we cannot assume a deterministic
1068		// order and have to ignore them
1069		// (https://github.com/kubernetes/kubernetes/issues/94108).
1070		deterministicCalls := []string{
1071			createVolume,
1072			// stageVolume,
1073			// publishVolume,
1074			// unpublishVolume,
1075			// unstageVolume,
1076			deleteVolume,
1077		}
1078
1079		for _, t := range tests {
1080			test := t
1081			ginkgo.It(test.name, func() {
1082				var err error
1083				params := testParameters{
1084					lateBinding:    test.lateBinding,
1085					enableTopology: test.topology,
1086
1087					// Not strictly necessary, but runs a bit faster this way
1088					// and for a while there also was a problem with a two minuted delay
1089					// due to a bug (https://github.com/kubernetes-csi/csi-test/pull/250).
1090					disableAttach:  true,
1091					registerDriver: true,
1092				}
1093
1094				if test.resourceExhausted {
1095					params.hooks = createPreHook("CreateVolume", func(counter int64) error {
1096						if counter%2 != 0 {
1097							return status.Error(codes.ResourceExhausted, "fake error")
1098						}
1099						return nil
1100					})
1101				}
1102
1103				init(params)
1104				defer cleanup()
1105
1106				ctx, cancel := context.WithTimeout(context.Background(), csiPodRunningTimeout)
1107				defer cancel()
1108
1109				// In contrast to the raw watch, RetryWatcher is expected to deliver all events even
1110				// when the underlying raw watch gets closed prematurely
1111				// (https://github.com/kubernetes/kubernetes/pull/93777#discussion_r467932080).
1112				// This is important because below the test is going to make assertions about the
1113				// PVC state changes.
1114				initResource, err := f.ClientSet.CoreV1().PersistentVolumeClaims(f.Namespace.Name).List(ctx, metav1.ListOptions{})
1115				framework.ExpectNoError(err, "Failed to fetch initial PVC resource")
1116				listWatcher := &cachetools.ListWatch{
1117					WatchFunc: func(listOptions metav1.ListOptions) (watch.Interface, error) {
1118						return f.ClientSet.CoreV1().PersistentVolumeClaims(f.Namespace.Name).Watch(ctx, listOptions)
1119					},
1120				}
1121				pvcWatch, err := watchtools.NewRetryWatcher(initResource.GetResourceVersion(), listWatcher)
1122				framework.ExpectNoError(err, "create PVC watch")
1123				defer pvcWatch.Stop()
1124
1125				sc, claim, pod := createPod(false)
1126				gomega.Expect(pod).NotTo(gomega.BeNil(), "while creating pod")
1127				bindingMode := storagev1.VolumeBindingImmediate
1128				if test.lateBinding {
1129					bindingMode = storagev1.VolumeBindingWaitForFirstConsumer
1130				}
1131				framework.ExpectEqual(*sc.VolumeBindingMode, bindingMode, "volume binding mode")
1132
1133				err = e2epod.WaitForPodNameRunningInNamespace(m.cs, pod.Name, pod.Namespace)
1134				framework.ExpectNoError(err, "failed to start pod")
1135				err = e2epod.DeletePodWithWait(m.cs, pod)
1136				framework.ExpectNoError(err, "failed to delete pod")
1137				err = m.cs.CoreV1().PersistentVolumeClaims(claim.Namespace).Delete(ctx, claim.Name, metav1.DeleteOptions{})
1138				framework.ExpectNoError(err, "failed to delete claim")
1139
1140				normal := []csiCall{}
1141				for _, method := range deterministicCalls {
1142					normal = append(normal, csiCall{expectedMethod: method})
1143				}
1144				expected := normal
1145				// When simulating limited capacity,
1146				// we expect exactly two CreateVolume
1147				// calls because the first one should
1148				// have failed.
1149				if test.resourceExhausted {
1150					expected = []csiCall{
1151						{expectedMethod: createVolume, expectedError: codes.ResourceExhausted},
1152					}
1153					expected = append(expected, normal...)
1154				}
1155
1156				var calls []drivers.MockCSICall
1157				err = wait.PollImmediateUntil(time.Second, func() (done bool, err error) {
1158					c, index, err := compareCSICalls(deterministicCalls, expected, m.driver.GetCalls)
1159					if err != nil {
1160						return true, fmt.Errorf("error waiting for expected CSI calls: %s", err)
1161					}
1162					calls = c
1163					if index == 0 {
1164						// No CSI call received yet
1165						return false, nil
1166					}
1167					if len(expected) == index {
1168						// all calls received
1169						return true, nil
1170					}
1171					return false, nil
1172				}, ctx.Done())
1173				framework.ExpectNoError(err, "while waiting for all CSI calls")
1174
1175				// The capacity error is dealt with in two different ways.
1176				//
1177				// For delayed binding, the external-provisioner should unset the node annotation
1178				// to give the scheduler the opportunity to reschedule the pod onto a different
1179				// node.
1180				//
1181				// For immediate binding, the external-scheduler must keep retrying.
1182				//
1183				// Unfortunately, the call log is the same in both cases. We have to collect
1184				// additional evidence that rescheduling really happened. What we have observed
1185				// above is how the PVC changed over time. Now we can analyze that.
1186				ginkgo.By("Checking PVC events")
1187				nodeAnnotationSet := false
1188				nodeAnnotationReset := false
1189				watchFailed := false
1190			loop:
1191				for {
1192					select {
1193					case event, ok := <-pvcWatch.ResultChan():
1194						if !ok {
1195							watchFailed = true
1196							break loop
1197						}
1198
1199						framework.Logf("PVC event %s: %#v", event.Type, event.Object)
1200						switch event.Type {
1201						case watch.Modified:
1202							pvc, ok := event.Object.(*v1.PersistentVolumeClaim)
1203							if !ok {
1204								framework.Failf("PVC watch sent %#v instead of a PVC", event.Object)
1205							}
1206							_, set := pvc.Annotations["volume.kubernetes.io/selected-node"]
1207							if set {
1208								nodeAnnotationSet = true
1209							} else if nodeAnnotationSet {
1210								nodeAnnotationReset = true
1211							}
1212						case watch.Deleted:
1213							break loop
1214						case watch.Error:
1215							watchFailed = true
1216							break
1217						}
1218					case <-ctx.Done():
1219						framework.Failf("Timeout while waiting to observe PVC list")
1220					}
1221				}
1222
1223				// More tests when capacity is limited.
1224				if test.resourceExhausted {
1225					for _, call := range calls {
1226						if call.Method == createVolume {
1227							gomega.Expect(call.Error).To(gomega.ContainSubstring("code = ResourceExhausted"), "first CreateVolume error in\n%s", calls)
1228							break
1229						}
1230					}
1231
1232					switch {
1233					case watchFailed:
1234						// If the watch failed or stopped prematurely (which can happen at any time), then we cannot
1235						// verify whether the annotation was set as expected. This is still considered a successful
1236						// test.
1237						framework.Logf("PVC watch delivered incomplete data, cannot check annotation")
1238					case test.lateBinding:
1239						gomega.Expect(nodeAnnotationSet).To(gomega.BeTrue(), "selected-node should have been set")
1240						// Whether it gets reset depends on whether we have topology enabled. Without
1241						// it, rescheduling is unnecessary.
1242						if test.topology {
1243							gomega.Expect(nodeAnnotationReset).To(gomega.BeTrue(), "selected-node should have been set")
1244						} else {
1245							gomega.Expect(nodeAnnotationReset).To(gomega.BeFalse(), "selected-node should not have been reset")
1246						}
1247					default:
1248						gomega.Expect(nodeAnnotationSet).To(gomega.BeFalse(), "selected-node should not have been set")
1249						gomega.Expect(nodeAnnotationReset).To(gomega.BeFalse(), "selected-node should not have been reset")
1250					}
1251				}
1252			})
1253		}
1254	})
1255
1256	// These tests *only* work on a cluster which has the CSIStorageCapacity feature enabled.
1257	ginkgo.Context("CSIStorageCapacity", func() {
1258		var (
1259			err error
1260			yes = true
1261			no  = false
1262		)
1263		// Tests that expect a failure are slow because we have to wait for a while
1264		// to be sure that the volume isn't getting created.
1265		tests := []struct {
1266			name            string
1267			storageCapacity *bool
1268			capacities      []string
1269			expectFailure   bool
1270		}{
1271			{
1272				name: "CSIStorageCapacity unused",
1273			},
1274			{
1275				name:            "CSIStorageCapacity disabled",
1276				storageCapacity: &no,
1277			},
1278			{
1279				name:            "CSIStorageCapacity used, no capacity",
1280				storageCapacity: &yes,
1281				expectFailure:   true,
1282			},
1283			{
1284				name:            "CSIStorageCapacity used, insufficient capacity",
1285				storageCapacity: &yes,
1286				expectFailure:   true,
1287				capacities:      []string{"1Mi"},
1288			},
1289			{
1290				name:            "CSIStorageCapacity used, have capacity",
1291				storageCapacity: &yes,
1292				capacities:      []string{"100Gi"},
1293			},
1294			// We could add more test cases here for
1295			// various situations, but covering those via
1296			// the scheduler binder unit tests is faster.
1297		}
1298		for _, t := range tests {
1299			test := t
1300			ginkgo.It(t.name, func() {
1301				scName := "mock-csi-storage-capacity-" + f.UniqueName
1302				init(testParameters{
1303					registerDriver:  true,
1304					scName:          scName,
1305					storageCapacity: test.storageCapacity,
1306					lateBinding:     true,
1307				})
1308				defer cleanup()
1309
1310				// The storage class uses a random name, therefore we have to create it first
1311				// before adding CSIStorageCapacity objects for it.
1312				for _, capacityStr := range test.capacities {
1313					capacityQuantity := resource.MustParse(capacityStr)
1314					capacity := &storagev1beta1.CSIStorageCapacity{
1315						ObjectMeta: metav1.ObjectMeta{
1316							GenerateName: "fake-capacity-",
1317						},
1318						// Empty topology, usable by any node.
1319						StorageClassName: scName,
1320						NodeTopology:     &metav1.LabelSelector{},
1321						Capacity:         &capacityQuantity,
1322					}
1323					createdCapacity, err := f.ClientSet.StorageV1beta1().CSIStorageCapacities(f.Namespace.Name).Create(context.Background(), capacity, metav1.CreateOptions{})
1324					framework.ExpectNoError(err, "create CSIStorageCapacity %+v", *capacity)
1325					m.testCleanups = append(m.testCleanups, func() {
1326						f.ClientSet.StorageV1beta1().CSIStorageCapacities(f.Namespace.Name).Delete(context.Background(), createdCapacity.Name, metav1.DeleteOptions{})
1327					})
1328				}
1329
1330				// kube-scheduler may need some time before it gets the CSIDriver and CSIStorageCapacity objects.
1331				// Without them, scheduling doesn't run as expected by the test.
1332				syncDelay := 5 * time.Second
1333				time.Sleep(syncDelay)
1334
1335				sc, _, pod := createPod(false /* persistent volume, late binding as specified above */)
1336				framework.ExpectEqual(sc.Name, scName, "pre-selected storage class name not used")
1337
1338				waitCtx, cancel := context.WithTimeout(context.Background(), f.Timeouts.PodStart)
1339				defer cancel()
1340				condition := anyOf(
1341					podRunning(waitCtx, f.ClientSet, pod.Name, pod.Namespace),
1342					// We only just created the CSIStorageCapacity objects, therefore
1343					// we have to ignore all older events, plus the syncDelay as our
1344					// safety margin.
1345					podHasStorage(waitCtx, f.ClientSet, pod.Name, pod.Namespace, time.Now().Add(syncDelay)),
1346				)
1347				err = wait.PollImmediateUntil(poll, condition, waitCtx.Done())
1348				if test.expectFailure {
1349					switch {
1350					case errors.Is(err, context.DeadlineExceeded),
1351						errors.Is(err, wait.ErrWaitTimeout),
1352						errors.Is(err, errNotEnoughSpace):
1353						// Okay, we expected that.
1354					case err == nil:
1355						framework.Fail("pod unexpectedly started to run")
1356					default:
1357						framework.Failf("unexpected error while waiting for pod: %v", err)
1358					}
1359				} else {
1360					framework.ExpectNoError(err, "failed to start pod")
1361				}
1362
1363				ginkgo.By("Deleting the previously created pod")
1364				err = e2epod.DeletePodWithWait(m.cs, pod)
1365				framework.ExpectNoError(err, "while deleting")
1366			})
1367		}
1368	})
1369
1370	ginkgo.Context("CSI Volume Snapshots [Feature:VolumeSnapshotDataSource]", func() {
1371		tests := []struct {
1372			name               string
1373			createSnapshotHook func(counter int64) error
1374		}{
1375			{
1376				name: "volumesnapshotcontent and pvc in Bound state with deletion timestamp set should not get deleted while snapshot finalizer exists",
1377				createSnapshotHook: func(counter int64) error {
1378					if counter < 8 {
1379						return status.Error(codes.DeadlineExceeded, "fake error")
1380					}
1381					return nil
1382				},
1383			},
1384		}
1385		for _, test := range tests {
1386			test := test
1387			ginkgo.It(test.name, func() {
1388				var hooks *drivers.Hooks
1389				if test.createSnapshotHook != nil {
1390					hooks = createPreHook("CreateSnapshot", test.createSnapshotHook)
1391				}
1392				init(testParameters{
1393					disableAttach:  true,
1394					registerDriver: true,
1395					enableSnapshot: true,
1396					hooks:          hooks,
1397				})
1398				sDriver, ok := m.driver.(storageframework.SnapshottableTestDriver)
1399				if !ok {
1400					e2eskipper.Skipf("mock driver %s does not support snapshots -- skipping", m.driver.GetDriverInfo().Name)
1401
1402				}
1403				ctx, cancel := context.WithTimeout(context.Background(), csiPodRunningTimeout)
1404				defer cancel()
1405				defer cleanup()
1406
1407				sc := m.driver.GetDynamicProvisionStorageClass(m.config, "")
1408				ginkgo.By("Creating storage class")
1409				class, err := m.cs.StorageV1().StorageClasses().Create(context.TODO(), sc, metav1.CreateOptions{})
1410				framework.ExpectNoError(err, "Failed to create class: %v", err)
1411				m.sc[class.Name] = class
1412				claim := e2epv.MakePersistentVolumeClaim(e2epv.PersistentVolumeClaimConfig{
1413					// Use static name so that the volumesnapshot can be created before the pvc.
1414					Name:             "snapshot-test-pvc",
1415					StorageClassName: &(class.Name),
1416				}, f.Namespace.Name)
1417
1418				ginkgo.By("Creating snapshot")
1419				// TODO: Test VolumeSnapshots with Retain policy
1420				parameters := map[string]string{}
1421				snapshotClass, snapshot := storageframework.CreateSnapshot(sDriver, m.config, storageframework.DynamicSnapshotDelete, claim.Name, claim.Namespace, f.Timeouts, parameters)
1422				framework.ExpectNoError(err, "failed to create snapshot")
1423				m.vsc[snapshotClass.GetName()] = snapshotClass
1424				volumeSnapshotName := snapshot.GetName()
1425
1426				ginkgo.By(fmt.Sprintf("Creating PVC %s/%s", claim.Namespace, claim.Name))
1427				claim, err = m.cs.CoreV1().PersistentVolumeClaims(f.Namespace.Name).Create(context.TODO(), claim, metav1.CreateOptions{})
1428				framework.ExpectNoError(err, "Failed to create claim: %v", err)
1429
1430				ginkgo.By(fmt.Sprintf("Wait for finalizer to be added to claim %s/%s", claim.Namespace, claim.Name))
1431				err = e2epv.WaitForPVCFinalizer(ctx, m.cs, claim.Name, claim.Namespace, pvcAsSourceProtectionFinalizer, 1*time.Millisecond, 1*time.Minute)
1432				framework.ExpectNoError(err)
1433
1434				ginkgo.By("Wait for PVC to be Bound")
1435				_, err = e2epv.WaitForPVClaimBoundPhase(m.cs, []*v1.PersistentVolumeClaim{claim}, 1*time.Minute)
1436				framework.ExpectNoError(err, "Failed to create claim: %v", err)
1437
1438				ginkgo.By(fmt.Sprintf("Delete PVC %s", claim.Name))
1439				err = e2epv.DeletePersistentVolumeClaim(m.cs, claim.Name, claim.Namespace)
1440				framework.ExpectNoError(err, "failed to delete pvc")
1441
1442				ginkgo.By("Get PVC from API server and verify deletion timestamp is set")
1443				claim, err = m.cs.CoreV1().PersistentVolumeClaims(f.Namespace.Name).Get(context.TODO(), claim.Name, metav1.GetOptions{})
1444				if err != nil {
1445					if !apierrors.IsNotFound(err) {
1446						framework.ExpectNoError(err, "Failed to get claim: %v", err)
1447					}
1448					framework.Logf("PVC not found. Continuing to test VolumeSnapshotContent finalizer")
1449				} else if claim.DeletionTimestamp == nil {
1450					framework.Failf("Expected deletion timestamp to be set on PVC: %v", claim)
1451				}
1452
1453				ginkgo.By(fmt.Sprintf("Get VolumeSnapshotContent bound to VolumeSnapshot %s", snapshot.GetName()))
1454				snapshotContent := utils.GetSnapshotContentFromSnapshot(m.config.Framework.DynamicClient, snapshot)
1455				volumeSnapshotContentName := snapshotContent.GetName()
1456
1457				ginkgo.By(fmt.Sprintf("Verify VolumeSnapshotContent %s contains finalizer %s", snapshot.GetName(), volumeSnapshotContentFinalizer))
1458				err = utils.WaitForGVRFinalizer(ctx, m.config.Framework.DynamicClient, utils.SnapshotContentGVR, volumeSnapshotContentName, "", volumeSnapshotContentFinalizer, 1*time.Millisecond, 1*time.Minute)
1459				framework.ExpectNoError(err)
1460
1461				ginkgo.By(fmt.Sprintf("Delete VolumeSnapshotContent %s", snapshotContent.GetName()))
1462				err = m.config.Framework.DynamicClient.Resource(utils.SnapshotContentGVR).Delete(ctx, snapshotContent.GetName(), metav1.DeleteOptions{})
1463				framework.ExpectNoError(err, "Failed to delete snapshotcontent: %v", err)
1464
1465				ginkgo.By("Get VolumeSnapshotContent from API server and verify deletion timestamp is set")
1466				snapshotContent, err = m.config.Framework.DynamicClient.Resource(utils.SnapshotContentGVR).Get(context.TODO(), snapshotContent.GetName(), metav1.GetOptions{})
1467				framework.ExpectNoError(err)
1468
1469				if snapshotContent.GetDeletionTimestamp() == nil {
1470					framework.Failf("Expected deletion timestamp to be set on snapshotcontent")
1471				}
1472
1473				// If the claim is non existent, the Get() call on the API server returns
1474				// an non-nil claim object with all fields unset.
1475				// Refer https://github.com/kubernetes/kubernetes/pull/99167#issuecomment-781670012
1476				if claim != nil && claim.Spec.VolumeName != "" {
1477					ginkgo.By(fmt.Sprintf("Wait for PV %s to be deleted", claim.Spec.VolumeName))
1478					err = e2epv.WaitForPersistentVolumeDeleted(m.cs, claim.Spec.VolumeName, framework.Poll, 3*time.Minute)
1479					framework.ExpectNoError(err, fmt.Sprintf("failed to delete PV %s", claim.Spec.VolumeName))
1480				}
1481
1482				ginkgo.By(fmt.Sprintf("Verify VolumeSnapshot %s contains finalizer %s", snapshot.GetName(), volumeSnapshotBoundFinalizer))
1483				err = utils.WaitForGVRFinalizer(ctx, m.config.Framework.DynamicClient, utils.SnapshotGVR, volumeSnapshotName, f.Namespace.Name, volumeSnapshotBoundFinalizer, 1*time.Millisecond, 1*time.Minute)
1484				framework.ExpectNoError(err)
1485
1486				ginkgo.By("Delete VolumeSnapshot")
1487				err = utils.DeleteAndWaitSnapshot(m.config.Framework.DynamicClient, f.Namespace.Name, volumeSnapshotName, framework.Poll, framework.SnapshotDeleteTimeout)
1488				framework.ExpectNoError(err, fmt.Sprintf("failed to delete VolumeSnapshot %s", volumeSnapshotName))
1489
1490				ginkgo.By(fmt.Sprintf("Wait for VolumeSnapshotContent %s to be deleted", volumeSnapshotContentName))
1491				err = utils.WaitForGVRDeletion(m.config.Framework.DynamicClient, utils.SnapshotContentGVR, volumeSnapshotContentName, framework.Poll, framework.SnapshotDeleteTimeout)
1492				framework.ExpectNoError(err, fmt.Sprintf("failed to delete VolumeSnapshotContent %s", volumeSnapshotContentName))
1493			})
1494		}
1495	})
1496
1497	ginkgo.Context("CSIServiceAccountToken", func() {
1498		var (
1499			err error
1500		)
1501		tests := []struct {
1502			desc                  string
1503			deployCSIDriverObject bool
1504			tokenRequests         []storagev1.TokenRequest
1505		}{
1506			{
1507				desc:                  "token should not be plumbed down when csiServiceAccountTokenEnabled=false",
1508				deployCSIDriverObject: true,
1509				tokenRequests:         nil,
1510			},
1511			{
1512				desc:                  "token should not be plumbed down when CSIDriver is not deployed",
1513				deployCSIDriverObject: false,
1514				tokenRequests:         []storagev1.TokenRequest{{}},
1515			},
1516			{
1517				desc:                  "token should be plumbed down when csiServiceAccountTokenEnabled=true",
1518				deployCSIDriverObject: true,
1519				tokenRequests:         []storagev1.TokenRequest{{ExpirationSeconds: utilptr.Int64Ptr(60 * 10)}},
1520			},
1521		}
1522		for _, test := range tests {
1523			test := test
1524			csiServiceAccountTokenEnabled := test.tokenRequests != nil
1525			ginkgo.It(test.desc, func() {
1526				init(testParameters{
1527					registerDriver:    test.deployCSIDriverObject,
1528					tokenRequests:     test.tokenRequests,
1529					requiresRepublish: &csiServiceAccountTokenEnabled,
1530				})
1531
1532				defer cleanup()
1533
1534				_, _, pod := createPod(false)
1535				if pod == nil {
1536					return
1537				}
1538				err = e2epod.WaitForPodNameRunningInNamespace(m.cs, pod.Name, pod.Namespace)
1539				framework.ExpectNoError(err, "Failed to start pod: %v", err)
1540
1541				// sleep to make sure RequiresRepublish triggers more than 1 NodePublishVolume
1542				numNodePublishVolume := 1
1543				if test.deployCSIDriverObject && csiServiceAccountTokenEnabled {
1544					time.Sleep(time.Second)
1545					numNodePublishVolume = 2
1546				}
1547
1548				ginkgo.By("Deleting the previously created pod")
1549				err = e2epod.DeletePodWithWait(m.cs, pod)
1550				framework.ExpectNoError(err, "while deleting")
1551
1552				ginkgo.By("Checking CSI driver logs")
1553				err = checkPodLogs(m.driver.GetCalls, pod, false, false, false, test.deployCSIDriverObject && csiServiceAccountTokenEnabled, numNodePublishVolume)
1554				framework.ExpectNoError(err)
1555			})
1556		}
1557	})
1558	// These tests *only* work on a cluster which has the CSIVolumeFSGroupPolicy feature enabled.
1559	ginkgo.Context("CSI FSGroupPolicy [LinuxOnly]", func() {
1560		tests := []struct {
1561			name          string
1562			fsGroupPolicy storagev1.FSGroupPolicy
1563			modified      bool
1564		}{
1565			{
1566				name:          "should modify fsGroup if fsGroupPolicy=default",
1567				fsGroupPolicy: storagev1.ReadWriteOnceWithFSTypeFSGroupPolicy,
1568				modified:      true,
1569			},
1570			{
1571				name:          "should modify fsGroup if fsGroupPolicy=File",
1572				fsGroupPolicy: storagev1.FileFSGroupPolicy,
1573				modified:      true,
1574			},
1575			{
1576				name:          "should not modify fsGroup if fsGroupPolicy=None",
1577				fsGroupPolicy: storagev1.NoneFSGroupPolicy,
1578				modified:      false,
1579			},
1580		}
1581		for _, t := range tests {
1582			test := t
1583			ginkgo.It(test.name, func() {
1584				if framework.NodeOSDistroIs("windows") {
1585					e2eskipper.Skipf("FSGroupPolicy is only applied on linux nodes -- skipping")
1586				}
1587				init(testParameters{
1588					disableAttach:  true,
1589					registerDriver: true,
1590					fsGroupPolicy:  &test.fsGroupPolicy,
1591				})
1592				defer cleanup()
1593
1594				// kube-scheduler may need some time before it gets the CSIDriver object.
1595				// Without them, scheduling doesn't run as expected by the test.
1596				syncDelay := 5 * time.Second
1597				time.Sleep(syncDelay)
1598
1599				fsGroupVal := int64(rand.Int63n(20000) + 1024)
1600				fsGroup := &fsGroupVal
1601
1602				_, _, pod := createPodWithFSGroup(fsGroup) /* persistent volume */
1603
1604				mountPath := pod.Spec.Containers[0].VolumeMounts[0].MountPath
1605				dirName := mountPath + "/" + f.UniqueName
1606				fileName := dirName + "/" + f.UniqueName
1607
1608				err := e2epod.WaitForPodNameRunningInNamespace(m.cs, pod.Name, pod.Namespace)
1609				framework.ExpectNoError(err, "failed to start pod")
1610
1611				// Create the subdirectory to ensure that fsGroup propagates
1612				createDirectory := fmt.Sprintf("mkdir %s", dirName)
1613				_, _, err = e2evolume.PodExec(f, pod, createDirectory)
1614				framework.ExpectNoError(err, "failed: creating the directory: %s", err)
1615
1616				// Inject the contents onto the mount
1617				createFile := fmt.Sprintf("echo '%s' > '%s'; sync", "filecontents", fileName)
1618				_, _, err = e2evolume.PodExec(f, pod, createFile)
1619				framework.ExpectNoError(err, "failed: writing the contents: %s", err)
1620
1621				// Delete the created file. This step is mandatory, as the mock driver
1622				// won't clean up the contents automatically.
1623				defer func() {
1624					delete := fmt.Sprintf("rm -fr %s", dirName)
1625					_, _, err = e2evolume.PodExec(f, pod, delete)
1626					framework.ExpectNoError(err, "failed: deleting the directory: %s", err)
1627				}()
1628
1629				// Ensure that the fsGroup matches what we expect
1630				if test.modified {
1631					utils.VerifyFSGroupInPod(f, fileName, strconv.FormatInt(*fsGroup, 10), pod)
1632				} else {
1633					utils.VerifyFSGroupInPod(f, fileName, "root", pod)
1634				}
1635
1636				// The created resources will be removed by the cleanup() function,
1637				// so need to delete anything here.
1638			})
1639		}
1640	})
1641
1642	ginkgo.Context("CSI Volume Snapshots secrets [Feature:VolumeSnapshotDataSource]", func() {
1643
1644		var (
1645			// CSISnapshotterSecretName is the name of the secret to be created
1646			CSISnapshotterSecretName string = "snapshot-secret"
1647
1648			// CSISnapshotterSecretNameAnnotation is the annotation key for the CSI snapshotter secret name in VolumeSnapshotClass.parameters
1649			CSISnapshotterSecretNameAnnotation string = "csi.storage.k8s.io/snapshotter-secret-name"
1650
1651			// CSISnapshotterSecretNamespaceAnnotation is the annotation key for the CSI snapshotter secret namespace in VolumeSnapshotClass.parameters
1652			CSISnapshotterSecretNamespaceAnnotation string = "csi.storage.k8s.io/snapshotter-secret-namespace"
1653
1654			// anotations holds the annotations object
1655			annotations interface{}
1656		)
1657
1658		tests := []struct {
1659			name               string
1660			createSnapshotHook func(counter int64) error
1661		}{
1662			{
1663				// volume snapshot should be created using secrets successfully even if there is a failure in the first few attempts,
1664				name: "volume snapshot create/delete with secrets",
1665				// Fail the first 8 calls to create snapshot and succeed the  9th call.
1666				createSnapshotHook: func(counter int64) error {
1667					if counter < 8 {
1668						return status.Error(codes.DeadlineExceeded, "fake error")
1669					}
1670					return nil
1671				},
1672			},
1673		}
1674		for _, test := range tests {
1675			ginkgo.It(test.name, func() {
1676				hooks := createPreHook("CreateSnapshot", test.createSnapshotHook)
1677				init(testParameters{
1678					disableAttach:  true,
1679					registerDriver: true,
1680					enableSnapshot: true,
1681					hooks:          hooks,
1682				})
1683
1684				sDriver, ok := m.driver.(storageframework.SnapshottableTestDriver)
1685				if !ok {
1686					e2eskipper.Skipf("mock driver does not support snapshots -- skipping")
1687				}
1688				defer cleanup()
1689
1690				var sc *storagev1.StorageClass
1691				if dDriver, ok := m.driver.(storageframework.DynamicPVTestDriver); ok {
1692					sc = dDriver.GetDynamicProvisionStorageClass(m.config, "")
1693				}
1694				ginkgo.By("Creating storage class")
1695				class, err := m.cs.StorageV1().StorageClasses().Create(context.TODO(), sc, metav1.CreateOptions{})
1696				framework.ExpectNoError(err, "Failed to create storage class: %v", err)
1697				m.sc[class.Name] = class
1698				pvc := e2epv.MakePersistentVolumeClaim(e2epv.PersistentVolumeClaimConfig{
1699					Name:             "snapshot-test-pvc",
1700					StorageClassName: &(class.Name),
1701				}, f.Namespace.Name)
1702
1703				ginkgo.By(fmt.Sprintf("Creating PVC %s/%s", pvc.Namespace, pvc.Name))
1704				pvc, err = m.cs.CoreV1().PersistentVolumeClaims(f.Namespace.Name).Create(context.TODO(), pvc, metav1.CreateOptions{})
1705				framework.ExpectNoError(err, "Failed to create claim: %v", err)
1706
1707				ginkgo.By("Wait for PVC to be Bound")
1708				_, err = e2epv.WaitForPVClaimBoundPhase(m.cs, []*v1.PersistentVolumeClaim{pvc}, 1*time.Minute)
1709				framework.ExpectNoError(err, "Failed to create claim: %v", err)
1710
1711				m.pvcs = append(m.pvcs, pvc)
1712
1713				ginkgo.By("Creating Secret")
1714				secret := &v1.Secret{
1715					ObjectMeta: metav1.ObjectMeta{
1716						Namespace: f.Namespace.Name,
1717						Name:      CSISnapshotterSecretName,
1718					},
1719					Data: map[string][]byte{
1720						"secret-data": []byte("secret-value-1"),
1721					},
1722				}
1723
1724				if secret, err := m.cs.CoreV1().Secrets(f.Namespace.Name).Create(context.TODO(), secret, metav1.CreateOptions{}); err != nil {
1725					framework.Failf("unable to create test secret %s: %v", secret.Name, err)
1726				}
1727
1728				ginkgo.By("Creating snapshot with secrets")
1729				parameters := map[string]string{
1730					CSISnapshotterSecretNameAnnotation:      CSISnapshotterSecretName,
1731					CSISnapshotterSecretNamespaceAnnotation: f.Namespace.Name,
1732				}
1733
1734				_, snapshot := storageframework.CreateSnapshot(sDriver, m.config, storageframework.DynamicSnapshotDelete, pvc.Name, pvc.Namespace, f.Timeouts, parameters)
1735				framework.ExpectNoError(err, "failed to create snapshot")
1736				snapshotcontent := utils.GetSnapshotContentFromSnapshot(m.config.Framework.DynamicClient, snapshot)
1737				if annotations, ok = snapshotcontent.Object["metadata"].(map[string]interface{})["annotations"]; !ok {
1738					framework.Failf("Unable to get volume snapshot content annotations")
1739				}
1740
1741				// checks if delete snapshot secrets annotation is applied to the VolumeSnapshotContent.
1742				checkDeleteSnapshotSecrets(m.cs, annotations)
1743
1744				// delete the snapshot and check if the snapshot is deleted.
1745				deleteSnapshot(m.cs, m.config, snapshot)
1746			})
1747		}
1748	})
1749
1750	ginkgo.Context("CSI Snapshot Controller metrics [Feature:VolumeSnapshotDataSource]", func() {
1751		tests := []struct {
1752			name    string
1753			pattern storageframework.TestPattern
1754		}{
1755			{
1756				name:    "snapshot controller should emit dynamic CreateSnapshot, CreateSnapshotAndReady, and DeleteSnapshot metrics",
1757				pattern: storageframework.DynamicSnapshotDelete,
1758			},
1759			{
1760				name:    "snapshot controller should emit pre-provisioned CreateSnapshot, CreateSnapshotAndReady, and DeleteSnapshot metrics",
1761				pattern: storageframework.PreprovisionedSnapshotDelete,
1762			},
1763		}
1764		for _, test := range tests {
1765			ginkgo.It(test.name, func() {
1766				init(testParameters{
1767					disableAttach:  true,
1768					registerDriver: true,
1769					enableSnapshot: true,
1770				})
1771
1772				sDriver, ok := m.driver.(storageframework.SnapshottableTestDriver)
1773				if !ok {
1774					e2eskipper.Skipf("mock driver does not support snapshots -- skipping")
1775				}
1776				defer cleanup()
1777
1778				metricsGrabber, err := e2emetrics.NewMetricsGrabber(m.config.Framework.ClientSet, nil, f.ClientConfig(), false, false, false, false, false, true)
1779				if err != nil {
1780					framework.Failf("Error creating metrics grabber : %v", err)
1781				}
1782
1783				// Grab initial metrics - if this fails, snapshot controller metrics are not setup. Skip in this case.
1784				_, err = metricsGrabber.GrabFromSnapshotController(framework.TestContext.SnapshotControllerPodName, framework.TestContext.SnapshotControllerHTTPPort)
1785				if err != nil {
1786					e2eskipper.Skipf("Snapshot controller metrics not found -- skipping")
1787				}
1788
1789				ginkgo.By("getting all initial metric values")
1790				metricsTestConfig := newSnapshotMetricsTestConfig("snapshot_controller_operation_total_seconds_count",
1791					"count",
1792					m.config.GetUniqueDriverName(),
1793					"CreateSnapshot",
1794					"success",
1795					"",
1796					test.pattern)
1797				createSnapshotMetrics := newSnapshotControllerMetrics(metricsTestConfig, metricsGrabber)
1798				originalCreateSnapshotCount, _ := createSnapshotMetrics.getSnapshotControllerMetricValue()
1799				metricsTestConfig.operationName = "CreateSnapshotAndReady"
1800				createSnapshotAndReadyMetrics := newSnapshotControllerMetrics(metricsTestConfig, metricsGrabber)
1801				originalCreateSnapshotAndReadyCount, _ := createSnapshotAndReadyMetrics.getSnapshotControllerMetricValue()
1802
1803				metricsTestConfig.operationName = "DeleteSnapshot"
1804				deleteSnapshotMetrics := newSnapshotControllerMetrics(metricsTestConfig, metricsGrabber)
1805				originalDeleteSnapshotCount, _ := deleteSnapshotMetrics.getSnapshotControllerMetricValue()
1806
1807				ginkgo.By("Creating storage class")
1808				var sc *storagev1.StorageClass
1809				if dDriver, ok := m.driver.(storageframework.DynamicPVTestDriver); ok {
1810					sc = dDriver.GetDynamicProvisionStorageClass(m.config, "")
1811				}
1812				class, err := m.cs.StorageV1().StorageClasses().Create(context.TODO(), sc, metav1.CreateOptions{})
1813				framework.ExpectNoError(err, "Failed to create storage class: %v", err)
1814				m.sc[class.Name] = class
1815				pvc := e2epv.MakePersistentVolumeClaim(e2epv.PersistentVolumeClaimConfig{
1816					Name:             "snapshot-test-pvc",
1817					StorageClassName: &(class.Name),
1818				}, f.Namespace.Name)
1819
1820				ginkgo.By(fmt.Sprintf("Creating PVC %s/%s", pvc.Namespace, pvc.Name))
1821				pvc, err = m.cs.CoreV1().PersistentVolumeClaims(f.Namespace.Name).Create(context.TODO(), pvc, metav1.CreateOptions{})
1822				framework.ExpectNoError(err, "Failed to create claim: %v", err)
1823
1824				ginkgo.By("Wait for PVC to be Bound")
1825				_, err = e2epv.WaitForPVClaimBoundPhase(m.cs, []*v1.PersistentVolumeClaim{pvc}, 1*time.Minute)
1826				framework.ExpectNoError(err, "Failed to create claim: %v", err)
1827
1828				ginkgo.By("Creating snapshot")
1829				parameters := map[string]string{}
1830				sr := storageframework.CreateSnapshotResource(sDriver, m.config, test.pattern, pvc.Name, pvc.Namespace, f.Timeouts, parameters)
1831				framework.ExpectNoError(err, "failed to create snapshot")
1832
1833				ginkgo.By("Checking for CreateSnapshot metrics")
1834				createSnapshotMetrics.waitForSnapshotControllerMetric(originalCreateSnapshotCount+1.0, f.Timeouts.SnapshotControllerMetrics)
1835
1836				ginkgo.By("Checking for CreateSnapshotAndReady metrics")
1837				err = utils.WaitForSnapshotReady(m.config.Framework.DynamicClient, pvc.Namespace, sr.Vs.GetName(), framework.Poll, f.Timeouts.SnapshotCreate)
1838				framework.ExpectNoError(err, "failed to wait for snapshot ready")
1839				createSnapshotAndReadyMetrics.waitForSnapshotControllerMetric(originalCreateSnapshotAndReadyCount+1.0, f.Timeouts.SnapshotControllerMetrics)
1840
1841				// delete the snapshot and check if the snapshot is deleted
1842				deleteSnapshot(m.cs, m.config, sr.Vs)
1843
1844				ginkgo.By("check for delete metrics")
1845				metricsTestConfig.operationName = "DeleteSnapshot"
1846				deleteSnapshotMetrics.waitForSnapshotControllerMetric(originalDeleteSnapshotCount+1.0, f.Timeouts.SnapshotControllerMetrics)
1847			})
1848		}
1849	})
1850})
1851
1852func deleteSnapshot(cs clientset.Interface, config *storageframework.PerTestConfig, snapshot *unstructured.Unstructured) {
1853	// delete the given snapshot
1854	dc := config.Framework.DynamicClient
1855	err := dc.Resource(utils.SnapshotGVR).Namespace(snapshot.GetNamespace()).Delete(context.TODO(), snapshot.GetName(), metav1.DeleteOptions{})
1856	framework.ExpectNoError(err)
1857
1858	// check if the snapshot is deleted
1859	_, err = dc.Resource(utils.SnapshotGVR).Get(context.TODO(), snapshot.GetName(), metav1.GetOptions{})
1860	framework.ExpectError(err)
1861}
1862
1863// A lot of this code was copied from e2e/framework. It would be nicer
1864// if it could be reused - see https://github.com/kubernetes/kubernetes/issues/92754
1865func podRunning(ctx context.Context, c clientset.Interface, podName, namespace string) wait.ConditionFunc {
1866	return func() (bool, error) {
1867		pod, err := c.CoreV1().Pods(namespace).Get(ctx, podName, metav1.GetOptions{})
1868		if err != nil {
1869			return false, err
1870		}
1871		switch pod.Status.Phase {
1872		case v1.PodRunning:
1873			return true, nil
1874		case v1.PodFailed, v1.PodSucceeded:
1875			return false, errPodCompleted
1876		}
1877		return false, nil
1878	}
1879}
1880
1881const (
1882	poll                           = 2 * time.Second
1883	pvcAsSourceProtectionFinalizer = "snapshot.storage.kubernetes.io/pvc-as-source-protection"
1884	volumeSnapshotContentFinalizer = "snapshot.storage.kubernetes.io/volumesnapshotcontent-bound-protection"
1885	volumeSnapshotBoundFinalizer   = "snapshot.storage.kubernetes.io/volumesnapshot-bound-protection"
1886)
1887
1888var (
1889	errPodCompleted   = fmt.Errorf("pod ran to completion")
1890	errNotEnoughSpace = errors.New(scheduling.ErrReasonNotEnoughSpace)
1891)
1892
1893func podHasStorage(ctx context.Context, c clientset.Interface, podName, namespace string, when time.Time) wait.ConditionFunc {
1894	// Check for events of this pod. Copied from test/e2e/common/container_probe.go.
1895	expectedEvent := fields.Set{
1896		"involvedObject.kind":      "Pod",
1897		"involvedObject.name":      podName,
1898		"involvedObject.namespace": namespace,
1899		"reason":                   "FailedScheduling",
1900	}.AsSelector().String()
1901	options := metav1.ListOptions{
1902		FieldSelector: expectedEvent,
1903	}
1904	// copied from test/e2e/framework/events/events.go
1905	return func() (bool, error) {
1906		// We cannot be sure here whether it has enough storage, only when
1907		// it hasn't. In that case we abort waiting with a special error.
1908		events, err := c.CoreV1().Events(namespace).List(ctx, options)
1909		if err != nil {
1910			return false, fmt.Errorf("got error while getting events: %w", err)
1911		}
1912		for _, event := range events.Items {
1913			if /* event.CreationTimestamp.After(when) &&
1914			 */strings.Contains(event.Message, scheduling.ErrReasonNotEnoughSpace) {
1915				return false, errNotEnoughSpace
1916			}
1917		}
1918		return false, nil
1919	}
1920}
1921
1922func anyOf(conditions ...wait.ConditionFunc) wait.ConditionFunc {
1923	return func() (bool, error) {
1924		for _, condition := range conditions {
1925			done, err := condition()
1926			if err != nil {
1927				return false, err
1928			}
1929			if done {
1930				return true, nil
1931			}
1932		}
1933		return false, nil
1934	}
1935}
1936
1937func waitForMaxVolumeCondition(pod *v1.Pod, cs clientset.Interface) error {
1938	waitErr := wait.PollImmediate(10*time.Second, csiPodUnschedulableTimeout, func() (bool, error) {
1939		pod, err := cs.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{})
1940		if err != nil {
1941			return false, err
1942		}
1943		for _, c := range pod.Status.Conditions {
1944			// Conformance tests cannot rely on specific output of optional fields (e.g., Reason
1945			// and Message) because these fields are not suject to the deprecation policy.
1946			if c.Type == v1.PodScheduled && c.Status == v1.ConditionFalse && c.Reason != "" && c.Message != "" {
1947				return true, nil
1948			}
1949		}
1950		return false, nil
1951	})
1952	if waitErr != nil {
1953		return fmt.Errorf("error waiting for pod %s/%s to have max volume condition: %v", pod.Namespace, pod.Name, waitErr)
1954	}
1955	return nil
1956}
1957
1958func waitForVolumeAttachmentTerminated(attachmentName string, cs clientset.Interface) error {
1959	waitErr := wait.PollImmediate(10*time.Second, csiVolumeAttachmentTimeout, func() (bool, error) {
1960		_, err := cs.StorageV1().VolumeAttachments().Get(context.TODO(), attachmentName, metav1.GetOptions{})
1961		if err != nil {
1962			// if the volumeattachment object is not found, it means it has been terminated.
1963			if apierrors.IsNotFound(err) {
1964				return true, nil
1965			}
1966			return false, err
1967		}
1968		return false, nil
1969	})
1970	if waitErr != nil {
1971		return fmt.Errorf("error waiting volume attachment %v to terminate: %v", attachmentName, waitErr)
1972	}
1973	return nil
1974}
1975
1976func checkCSINodeForLimits(nodeName string, driverName string, cs clientset.Interface) (int32, error) {
1977	var attachLimit int32
1978
1979	waitErr := wait.PollImmediate(10*time.Second, csiNodeLimitUpdateTimeout, func() (bool, error) {
1980		csiNode, err := cs.StorageV1().CSINodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
1981		if err != nil && !apierrors.IsNotFound(err) {
1982			return false, err
1983		}
1984		attachLimit = getVolumeLimitFromCSINode(csiNode, driverName)
1985		if attachLimit > 0 {
1986			return true, nil
1987		}
1988		return false, nil
1989	})
1990	if waitErr != nil {
1991		return 0, fmt.Errorf("error waiting for non-zero volume limit of driver %s on node %s: %v", driverName, nodeName, waitErr)
1992	}
1993	return attachLimit, nil
1994}
1995
1996func createClaim(cs clientset.Interface, t testsuites.StorageClassTest, node e2epod.NodeSelection, scName, ns string) (*storagev1.StorageClass, *v1.PersistentVolumeClaim) {
1997	class := newStorageClass(t, ns, "")
1998	if scName != "" {
1999		class.Name = scName
2000	}
2001	var err error
2002	_, err = cs.StorageV1().StorageClasses().Get(context.TODO(), class.Name, metav1.GetOptions{})
2003	if err != nil {
2004		class, err = cs.StorageV1().StorageClasses().Create(context.TODO(), class, metav1.CreateOptions{})
2005		framework.ExpectNoError(err, "Failed to create class: %v", err)
2006	}
2007
2008	claim := e2epv.MakePersistentVolumeClaim(e2epv.PersistentVolumeClaimConfig{
2009		ClaimSize:        t.ClaimSize,
2010		StorageClassName: &(class.Name),
2011		VolumeMode:       &t.VolumeMode,
2012	}, ns)
2013	claim, err = cs.CoreV1().PersistentVolumeClaims(ns).Create(context.TODO(), claim, metav1.CreateOptions{})
2014	framework.ExpectNoError(err, "Failed to create claim: %v", err)
2015
2016	if !t.DelayBinding {
2017		pvcClaims := []*v1.PersistentVolumeClaim{claim}
2018		_, err = e2epv.WaitForPVClaimBoundPhase(cs, pvcClaims, framework.ClaimProvisionTimeout)
2019		framework.ExpectNoError(err, "Failed waiting for PVC to be bound: %v", err)
2020	}
2021	return class, claim
2022}
2023
2024func startPausePod(cs clientset.Interface, t testsuites.StorageClassTest, node e2epod.NodeSelection, scName, ns string) (*storagev1.StorageClass, *v1.PersistentVolumeClaim, *v1.Pod) {
2025	class, claim := createClaim(cs, t, node, scName, ns)
2026
2027	pod, err := startPausePodWithClaim(cs, claim, node, ns)
2028	framework.ExpectNoError(err, "Failed to create pause pod: %v", err)
2029	return class, claim, pod
2030}
2031
2032func startBusyBoxPod(cs clientset.Interface, t testsuites.StorageClassTest, node e2epod.NodeSelection, scName, ns string, fsGroup *int64) (*storagev1.StorageClass, *v1.PersistentVolumeClaim, *v1.Pod) {
2033	class, claim := createClaim(cs, t, node, scName, ns)
2034	pod, err := startBusyBoxPodWithClaim(cs, claim, node, ns, fsGroup)
2035	framework.ExpectNoError(err, "Failed to create busybox pod: %v", err)
2036	return class, claim, pod
2037}
2038
2039func startPausePodInline(cs clientset.Interface, t testsuites.StorageClassTest, node e2epod.NodeSelection, ns string) *v1.Pod {
2040	pod, err := startPausePodWithInlineVolume(cs,
2041		&v1.CSIVolumeSource{
2042			Driver: t.Provisioner,
2043		},
2044		node, ns)
2045	framework.ExpectNoError(err, "Failed to create pod: %v", err)
2046	return pod
2047}
2048
2049func startPausePodWithClaim(cs clientset.Interface, pvc *v1.PersistentVolumeClaim, node e2epod.NodeSelection, ns string) (*v1.Pod, error) {
2050	return startPausePodWithVolumeSource(cs,
2051		v1.VolumeSource{
2052			PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
2053				ClaimName: pvc.Name,
2054				ReadOnly:  false,
2055			},
2056		},
2057		node, ns)
2058}
2059
2060func startBusyBoxPodWithClaim(cs clientset.Interface, pvc *v1.PersistentVolumeClaim, node e2epod.NodeSelection, ns string, fsGroup *int64) (*v1.Pod, error) {
2061	return startBusyBoxPodWithVolumeSource(cs,
2062		v1.VolumeSource{
2063			PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
2064				ClaimName: pvc.Name,
2065				ReadOnly:  false,
2066			},
2067		},
2068		node, ns, fsGroup)
2069}
2070
2071func startPausePodWithInlineVolume(cs clientset.Interface, inlineVolume *v1.CSIVolumeSource, node e2epod.NodeSelection, ns string) (*v1.Pod, error) {
2072	return startPausePodWithVolumeSource(cs,
2073		v1.VolumeSource{
2074			CSI: inlineVolume,
2075		},
2076		node, ns)
2077}
2078
2079func startPausePodWithVolumeSource(cs clientset.Interface, volumeSource v1.VolumeSource, node e2epod.NodeSelection, ns string) (*v1.Pod, error) {
2080	pod := &v1.Pod{
2081		ObjectMeta: metav1.ObjectMeta{
2082			GenerateName: "pvc-volume-tester-",
2083		},
2084		Spec: v1.PodSpec{
2085			Containers: []v1.Container{
2086				{
2087					Name:  "volume-tester",
2088					Image: imageutils.GetE2EImage(imageutils.Pause),
2089					VolumeMounts: []v1.VolumeMount{
2090						{
2091							Name:      "my-volume",
2092							MountPath: "/mnt/test",
2093						},
2094					},
2095				},
2096			},
2097			RestartPolicy: v1.RestartPolicyNever,
2098			Volumes: []v1.Volume{
2099				{
2100					Name:         "my-volume",
2101					VolumeSource: volumeSource,
2102				},
2103			},
2104		},
2105	}
2106	e2epod.SetNodeSelection(&pod.Spec, node)
2107	return cs.CoreV1().Pods(ns).Create(context.TODO(), pod, metav1.CreateOptions{})
2108}
2109
2110func startBusyBoxPodWithVolumeSource(cs clientset.Interface, volumeSource v1.VolumeSource, node e2epod.NodeSelection, ns string, fsGroup *int64) (*v1.Pod, error) {
2111	pod := &v1.Pod{
2112		ObjectMeta: metav1.ObjectMeta{
2113			GenerateName: "pvc-volume-tester-",
2114		},
2115		Spec: v1.PodSpec{
2116			Containers: []v1.Container{
2117				{
2118					Name:  "volume-tester",
2119					Image: framework.BusyBoxImage,
2120					VolumeMounts: []v1.VolumeMount{
2121						{
2122							Name:      "my-volume",
2123							MountPath: "/mnt/test",
2124						},
2125					},
2126					Command: e2epod.GenerateScriptCmd("while true ; do sleep 2; done"),
2127				},
2128			},
2129			SecurityContext: &v1.PodSecurityContext{
2130				FSGroup: fsGroup,
2131			},
2132			RestartPolicy: v1.RestartPolicyNever,
2133			Volumes: []v1.Volume{
2134				{
2135					Name:         "my-volume",
2136					VolumeSource: volumeSource,
2137				},
2138			},
2139		},
2140	}
2141	e2epod.SetNodeSelection(&pod.Spec, node)
2142	return cs.CoreV1().Pods(ns).Create(context.TODO(), pod, metav1.CreateOptions{})
2143}
2144
2145// checkPodLogs tests that NodePublish was called with expected volume_context and (for ephemeral inline volumes)
2146// has the matching NodeUnpublish
2147func checkPodLogs(getCalls func() ([]drivers.MockCSICall, error), pod *v1.Pod, expectPodInfo, ephemeralVolume, csiInlineVolumesEnabled, csiServiceAccountTokenEnabled bool, expectedNumNodePublish int) error {
2148	expectedAttributes := map[string]string{}
2149	if expectPodInfo {
2150		expectedAttributes["csi.storage.k8s.io/pod.name"] = pod.Name
2151		expectedAttributes["csi.storage.k8s.io/pod.namespace"] = pod.Namespace
2152		expectedAttributes["csi.storage.k8s.io/pod.uid"] = string(pod.UID)
2153		expectedAttributes["csi.storage.k8s.io/serviceAccount.name"] = "default"
2154
2155	}
2156	if csiInlineVolumesEnabled {
2157		// This is only passed in 1.15 when the CSIInlineVolume feature gate is set.
2158		expectedAttributes["csi.storage.k8s.io/ephemeral"] = strconv.FormatBool(ephemeralVolume)
2159	}
2160
2161	if csiServiceAccountTokenEnabled {
2162		expectedAttributes["csi.storage.k8s.io/serviceAccount.tokens"] = "<nonempty>"
2163	}
2164
2165	// Find NodePublish in the GRPC calls.
2166	foundAttributes := sets.NewString()
2167	numNodePublishVolume := 0
2168	numNodeUnpublishVolume := 0
2169	calls, err := getCalls()
2170	if err != nil {
2171		return err
2172	}
2173
2174	for _, call := range calls {
2175		switch call.Method {
2176		case "NodePublishVolume":
2177			numNodePublishVolume++
2178			if numNodePublishVolume == 1 {
2179				// Check that NodePublish had expected attributes for first volume
2180				for k, v := range expectedAttributes {
2181					vv, found := call.Request.VolumeContext[k]
2182					if found && (v == vv || (v == "<nonempty>" && len(vv) != 0)) {
2183						foundAttributes.Insert(k)
2184						framework.Logf("Found volume attribute %s: %s", k, vv)
2185					}
2186				}
2187			}
2188		case "NodeUnpublishVolume":
2189			framework.Logf("Found NodeUnpublishVolume: %+v", call)
2190			numNodeUnpublishVolume++
2191		}
2192	}
2193	if numNodePublishVolume < expectedNumNodePublish {
2194		return fmt.Errorf("NodePublish should be called at least %d", expectedNumNodePublish)
2195	}
2196
2197	if numNodeUnpublishVolume == 0 {
2198		return fmt.Errorf("NodeUnpublish was never called")
2199	}
2200	if foundAttributes.Len() != len(expectedAttributes) {
2201		return fmt.Errorf("number of found volume attributes does not match, expected %d, got %d", len(expectedAttributes), foundAttributes.Len())
2202	}
2203	return nil
2204}
2205
2206// compareCSICalls compares expectedCalls with logs of the mock driver.
2207// It returns index of the first expectedCall that was *not* received
2208// yet or error when calls do not match.
2209// All repeated calls to the CSI mock driver (e.g. due to exponential backoff)
2210// are squashed and checked against single expectedCallSequence item.
2211//
2212// Only permanent errors are returned. Other errors are logged and no
2213// calls are returned. The caller is expected to retry.
2214func compareCSICalls(trackedCalls []string, expectedCallSequence []csiCall, getCalls func() ([]drivers.MockCSICall, error)) ([]drivers.MockCSICall, int, error) {
2215	allCalls, err := getCalls()
2216	if err != nil {
2217		framework.Logf("intermittent (?) log retrieval error, proceeding without output: %v", err)
2218		return nil, 0, nil
2219	}
2220
2221	// Remove all repeated and ignored calls
2222	tracked := sets.NewString(trackedCalls...)
2223	var calls []drivers.MockCSICall
2224	var last drivers.MockCSICall
2225	for _, c := range allCalls {
2226		if !tracked.Has(c.Method) {
2227			continue
2228		}
2229		if c.Method != last.Method || c.FullError.Code != last.FullError.Code {
2230			last = c
2231			calls = append(calls, c)
2232		}
2233		// This call is the same as the last one, ignore it.
2234	}
2235
2236	for i, c := range calls {
2237		if i >= len(expectedCallSequence) {
2238			// Log all unexpected calls first, return error below outside the loop.
2239			framework.Logf("Unexpected CSI driver call: %s (%d)", c.Method, c.FullError)
2240			continue
2241		}
2242
2243		// Compare current call with expected call
2244		expectedCall := expectedCallSequence[i]
2245		if c.Method != expectedCall.expectedMethod || c.FullError.Code != expectedCall.expectedError {
2246			return allCalls, i, fmt.Errorf("Unexpected CSI call %d: expected %s (%d), got %s (%d)", i, expectedCall.expectedMethod, expectedCall.expectedError, c.Method, c.FullError.Code)
2247		}
2248	}
2249	if len(calls) > len(expectedCallSequence) {
2250		return allCalls, len(expectedCallSequence), fmt.Errorf("Received %d unexpected CSI driver calls", len(calls)-len(expectedCallSequence))
2251	}
2252	// All calls were correct
2253	return allCalls, len(calls), nil
2254
2255}
2256
2257func waitForCSIDriver(cs clientset.Interface, driverName string) error {
2258	timeout := 4 * time.Minute
2259
2260	framework.Logf("waiting up to %v for CSIDriver %q", timeout, driverName)
2261	for start := time.Now(); time.Since(start) < timeout; time.Sleep(framework.Poll) {
2262		_, err := cs.StorageV1().CSIDrivers().Get(context.TODO(), driverName, metav1.GetOptions{})
2263		if !apierrors.IsNotFound(err) {
2264			return err
2265		}
2266	}
2267	return fmt.Errorf("gave up after waiting %v for CSIDriver %q", timeout, driverName)
2268}
2269
2270func destroyCSIDriver(cs clientset.Interface, driverName string) {
2271	driverGet, err := cs.StorageV1().CSIDrivers().Get(context.TODO(), driverName, metav1.GetOptions{})
2272	if err == nil {
2273		framework.Logf("deleting %s.%s: %s", driverGet.TypeMeta.APIVersion, driverGet.TypeMeta.Kind, driverGet.ObjectMeta.Name)
2274		// Uncomment the following line to get full dump of CSIDriver object
2275		// framework.Logf("%s", framework.PrettyPrint(driverGet))
2276		cs.StorageV1().CSIDrivers().Delete(context.TODO(), driverName, metav1.DeleteOptions{})
2277	}
2278}
2279
2280func getVolumeHandle(cs clientset.Interface, claim *v1.PersistentVolumeClaim) string {
2281	// re-get the claim to the latest state with bound volume
2282	claim, err := cs.CoreV1().PersistentVolumeClaims(claim.Namespace).Get(context.TODO(), claim.Name, metav1.GetOptions{})
2283	if err != nil {
2284		framework.ExpectNoError(err, "Cannot get PVC")
2285		return ""
2286	}
2287	pvName := claim.Spec.VolumeName
2288	pv, err := cs.CoreV1().PersistentVolumes().Get(context.TODO(), pvName, metav1.GetOptions{})
2289	if err != nil {
2290		framework.ExpectNoError(err, "Cannot get PV")
2291		return ""
2292	}
2293	if pv.Spec.CSI == nil {
2294		gomega.Expect(pv.Spec.CSI).NotTo(gomega.BeNil())
2295		return ""
2296	}
2297	return pv.Spec.CSI.VolumeHandle
2298}
2299
2300func getVolumeLimitFromCSINode(csiNode *storagev1.CSINode, driverName string) int32 {
2301	for _, d := range csiNode.Spec.Drivers {
2302		if d.Name != driverName {
2303			continue
2304		}
2305		if d.Allocatable != nil && d.Allocatable.Count != nil {
2306			return *d.Allocatable.Count
2307		}
2308	}
2309	return 0
2310}
2311
2312// checkDeleteSnapshotSecrets checks if delete snapshot secrets annotation is applied to the VolumeSnapshotContent.
2313func checkDeleteSnapshotSecrets(cs clientset.Interface, annotations interface{}) error {
2314	ginkgo.By("checking if delete snapshot secrets annotation is applied to the VolumeSnapshotContent")
2315
2316	var (
2317		annDeletionSecretName      string
2318		annDeletionSecretNamespace string
2319		ok                         bool
2320		err                        error
2321
2322		// CSISnapshotterDeleteSecretNameAnnotation is the annotation key for the CSI snapshotter delete secret name in VolumeSnapshotClass.parameters
2323		CSISnapshotterDeleteSecretNameAnnotation string = "snapshot.storage.kubernetes.io/deletion-secret-name"
2324
2325		// CSISnapshotterDeleteSecretNamespaceAnnotation is the annotation key for the CSI snapshotter delete secret namespace in VolumeSnapshotClass.parameters
2326		CSISnapshotterDeleteSecretNamespaceAnnotation string = "snapshot.storage.kubernetes.io/deletion-secret-namespace"
2327	)
2328
2329	annotationsObj, ok := annotations.(map[string]interface{})
2330	if !ok {
2331		framework.Failf("failed to get annotations from annotations object")
2332	}
2333
2334	if annDeletionSecretName, ok = annotationsObj[CSISnapshotterDeleteSecretNameAnnotation].(string); !ok {
2335		framework.Failf("unable to get secret annotation name")
2336	}
2337	if annDeletionSecretNamespace, ok = annotationsObj[CSISnapshotterDeleteSecretNamespaceAnnotation].(string); !ok {
2338		framework.Failf("unable to get secret annotation namespace")
2339	}
2340
2341	// verify if secrets exists
2342	if _, err = cs.CoreV1().Secrets(annDeletionSecretNamespace).Get(context.TODO(), annDeletionSecretName, metav1.GetOptions{}); err != nil {
2343		framework.Failf("unable to get test secret %s: %v", annDeletionSecretName, err)
2344	}
2345
2346	return err
2347}
2348
2349// createPreHook counts invocations of a certain method (identified by a substring in the full gRPC method name).
2350func createPreHook(method string, callback func(counter int64) error) *drivers.Hooks {
2351	var counter int64
2352
2353	return &drivers.Hooks{
2354		Pre: func() func(ctx context.Context, fullMethod string, request interface{}) (reply interface{}, err error) {
2355			return func(ctx context.Context, fullMethod string, request interface{}) (reply interface{}, err error) {
2356				if strings.Contains(fullMethod, method) {
2357					counter := atomic.AddInt64(&counter, 1)
2358					return nil, callback(counter)
2359				}
2360				return nil, nil
2361			}
2362		}(),
2363	}
2364}
2365
2366type snapshotMetricsTestConfig struct {
2367	// expected values
2368	metricName      string
2369	metricType      string
2370	driverName      string
2371	operationName   string
2372	operationStatus string
2373	snapshotType    string
2374	le              string
2375}
2376
2377type snapshotControllerMetrics struct {
2378	// configuration for metric
2379	cfg            snapshotMetricsTestConfig
2380	metricsGrabber *e2emetrics.Grabber
2381
2382	// results
2383	countMetrics  map[string]float64
2384	sumMetrics    map[string]float64
2385	bucketMetrics map[string]float64
2386}
2387
2388func newSnapshotMetricsTestConfig(metricName, metricType, driverName, operationName, operationStatus, le string, pattern storageframework.TestPattern) snapshotMetricsTestConfig {
2389	var snapshotType string
2390	switch pattern.SnapshotType {
2391	case storageframework.DynamicCreatedSnapshot:
2392		snapshotType = "dynamic"
2393
2394	case storageframework.PreprovisionedCreatedSnapshot:
2395		snapshotType = "pre-provisioned"
2396
2397	default:
2398		framework.Failf("invalid snapshotType: %v", pattern.SnapshotType)
2399	}
2400
2401	return snapshotMetricsTestConfig{
2402		metricName:      metricName,
2403		metricType:      metricType,
2404		driverName:      driverName,
2405		operationName:   operationName,
2406		operationStatus: operationStatus,
2407		snapshotType:    snapshotType,
2408		le:              le,
2409	}
2410}
2411
2412func newSnapshotControllerMetrics(cfg snapshotMetricsTestConfig, metricsGrabber *e2emetrics.Grabber) *snapshotControllerMetrics {
2413	return &snapshotControllerMetrics{
2414		cfg:            cfg,
2415		metricsGrabber: metricsGrabber,
2416
2417		countMetrics:  make(map[string]float64),
2418		sumMetrics:    make(map[string]float64),
2419		bucketMetrics: make(map[string]float64),
2420	}
2421}
2422
2423func (scm *snapshotControllerMetrics) waitForSnapshotControllerMetric(expectedValue float64, timeout time.Duration) {
2424	metricKey := scm.getMetricKey()
2425	if successful := utils.WaitUntil(10*time.Second, timeout, func() bool {
2426		// get metric value
2427		actualValue, err := scm.getSnapshotControllerMetricValue()
2428		if err != nil {
2429			return false
2430		}
2431
2432		// Another operation could have finished from a previous test,
2433		// so we check if we have at least the expected value.
2434		if actualValue < expectedValue {
2435			return false
2436		}
2437
2438		return true
2439	}); successful {
2440		return
2441	}
2442
2443	scm.showMetricsFailure(metricKey)
2444	framework.Failf("Unable to get valid snapshot controller metrics after %v", timeout)
2445}
2446
2447func (scm *snapshotControllerMetrics) getSnapshotControllerMetricValue() (float64, error) {
2448	metricKey := scm.getMetricKey()
2449
2450	// grab and parse into readable format
2451	err := scm.grabSnapshotControllerMetrics()
2452	if err != nil {
2453		return 0, err
2454	}
2455
2456	metrics := scm.getMetricsTable()
2457	actual, ok := metrics[metricKey]
2458	if !ok {
2459		return 0, fmt.Errorf("did not find metric for key %s", metricKey)
2460	}
2461
2462	return actual, nil
2463}
2464
2465func (scm *snapshotControllerMetrics) getMetricsTable() map[string]float64 {
2466	var metrics map[string]float64
2467	switch scm.cfg.metricType {
2468	case "count":
2469		metrics = scm.countMetrics
2470
2471	case "sum":
2472		metrics = scm.sumMetrics
2473
2474	case "bucket":
2475		metrics = scm.bucketMetrics
2476	}
2477
2478	return metrics
2479}
2480
2481func (scm *snapshotControllerMetrics) showMetricsFailure(metricKey string) {
2482	framework.Logf("failed to find metric key %s inside of the following metrics:", metricKey)
2483
2484	metrics := scm.getMetricsTable()
2485	for k, v := range metrics {
2486		framework.Logf("%s: %v", k, v)
2487	}
2488}
2489
2490func (scm *snapshotControllerMetrics) grabSnapshotControllerMetrics() error {
2491	// pull all metrics
2492	metrics, err := scm.metricsGrabber.GrabFromSnapshotController(framework.TestContext.SnapshotControllerPodName, framework.TestContext.SnapshotControllerHTTPPort)
2493	if err != nil {
2494		return err
2495	}
2496
2497	for method, samples := range metrics {
2498
2499		for _, sample := range samples {
2500			operationName := string(sample.Metric["operation_name"])
2501			driverName := string(sample.Metric["driver_name"])
2502			operationStatus := string(sample.Metric["operation_status"])
2503			snapshotType := string(sample.Metric["snapshot_type"])
2504			le := string(sample.Metric["le"])
2505			key := snapshotMetricKey(scm.cfg.metricName, driverName, operationName, operationStatus, snapshotType, le)
2506
2507			switch method {
2508			case "snapshot_controller_operation_total_seconds_count":
2509				for _, sample := range samples {
2510					scm.countMetrics[key] = float64(sample.Value)
2511				}
2512
2513			case "snapshot_controller_operation_total_seconds_sum":
2514				for _, sample := range samples {
2515					scm.sumMetrics[key] = float64(sample.Value)
2516				}
2517
2518			case "snapshot_controller_operation_total_seconds_bucket":
2519				for _, sample := range samples {
2520					scm.bucketMetrics[key] = float64(sample.Value)
2521				}
2522			}
2523		}
2524	}
2525
2526	return nil
2527}
2528
2529func (scm *snapshotControllerMetrics) getMetricKey() string {
2530	return snapshotMetricKey(scm.cfg.metricName, scm.cfg.driverName, scm.cfg.operationName, scm.cfg.operationStatus, scm.cfg.snapshotType, scm.cfg.le)
2531}
2532
2533func snapshotMetricKey(metricName, driverName, operationName, operationStatus, snapshotType, le string) string {
2534	key := driverName
2535
2536	// build key for shorthand metrics storage
2537	for _, s := range []string{metricName, operationName, operationStatus, snapshotType, le} {
2538		if s != "" {
2539			key = fmt.Sprintf("%s_%s", key, s)
2540		}
2541	}
2542
2543	return key
2544}
2545