1/*
2Copyright 2018 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package storage
18
19import (
20	"context"
21	"fmt"
22	"math/rand"
23	"path/filepath"
24	"strconv"
25	"strings"
26	"sync"
27	"time"
28
29	"github.com/onsi/ginkgo"
30	"github.com/onsi/gomega"
31
32	appsv1 "k8s.io/api/apps/v1"
33	v1 "k8s.io/api/core/v1"
34	storagev1 "k8s.io/api/storage/v1"
35	apierrors "k8s.io/apimachinery/pkg/api/errors"
36	"k8s.io/apimachinery/pkg/api/resource"
37	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
38	utilerrors "k8s.io/apimachinery/pkg/util/errors"
39	"k8s.io/apimachinery/pkg/util/sets"
40	"k8s.io/apimachinery/pkg/util/wait"
41	"k8s.io/apimachinery/pkg/watch"
42	clientset "k8s.io/client-go/kubernetes"
43	"k8s.io/kubernetes/test/e2e/framework"
44	e2enode "k8s.io/kubernetes/test/e2e/framework/node"
45	e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
46	e2epv "k8s.io/kubernetes/test/e2e/framework/pv"
47	e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
48	e2estatefulset "k8s.io/kubernetes/test/e2e/framework/statefulset"
49	e2evolume "k8s.io/kubernetes/test/e2e/framework/volume"
50	"k8s.io/kubernetes/test/e2e/storage/utils"
51	imageutils "k8s.io/kubernetes/test/utils/image"
52)
53
54type localTestConfig struct {
55	ns           string
56	nodes        []v1.Node
57	randomNode   *v1.Node
58	client       clientset.Interface
59	timeouts     *framework.TimeoutContext
60	scName       string
61	discoveryDir string
62	hostExec     utils.HostExec
63	ltrMgr       utils.LocalTestResourceManager
64}
65
66type localVolumeType string
67
68const (
69	// DirectoryLocalVolumeType is the default local volume type, aka a directory
70	DirectoryLocalVolumeType localVolumeType = "dir"
71	// DirectoryLinkLocalVolumeType is like DirectoryLocalVolumeType,
72	// but it's a symbolic link to directory
73	DirectoryLinkLocalVolumeType localVolumeType = "dir-link"
74	// DirectoryBindMountedLocalVolumeType is like DirectoryLocalVolumeType
75	// but bind mounted
76	DirectoryBindMountedLocalVolumeType localVolumeType = "dir-bindmounted"
77	// DirectoryLinkBindMountedLocalVolumeType is like DirectoryLocalVolumeType,
78	// but it's a symbolic link to self bind mounted directory
79	// Note that bind mounting at symbolic link actually mounts at directory it
80	// links to.
81	DirectoryLinkBindMountedLocalVolumeType localVolumeType = "dir-link-bindmounted"
82	// TmpfsLocalVolumeType creates a tmpfs and mounts it
83	TmpfsLocalVolumeType localVolumeType = "tmpfs"
84	// GCELocalSSDVolumeType tests based on local ssd at /mnt/disks/by-uuid/
85	GCELocalSSDVolumeType localVolumeType = "gce-localssd-scsi-fs"
86	// BlockLocalVolumeType creates a local file, formats it, and maps it as a block device.
87	BlockLocalVolumeType localVolumeType = "block"
88	// BlockFsWithFormatLocalVolumeType creates a local file serving as the backing for block device,
89	// formats it, and mounts it to use as FS mode local volume.
90	BlockFsWithFormatLocalVolumeType localVolumeType = "blockfswithformat"
91	// BlockFsWithoutFormatLocalVolumeType creates a local file serving as the backing for block device,
92	// does not format it manually, and mounts it to use as FS mode local volume.
93	BlockFsWithoutFormatLocalVolumeType localVolumeType = "blockfswithoutformat"
94)
95
96// map to local test resource type
97var setupLocalVolumeMap = map[localVolumeType]utils.LocalVolumeType{
98	GCELocalSSDVolumeType:                   utils.LocalVolumeGCELocalSSD,
99	TmpfsLocalVolumeType:                    utils.LocalVolumeTmpfs,
100	DirectoryLocalVolumeType:                utils.LocalVolumeDirectory,
101	DirectoryLinkLocalVolumeType:            utils.LocalVolumeDirectoryLink,
102	DirectoryBindMountedLocalVolumeType:     utils.LocalVolumeDirectoryBindMounted,
103	DirectoryLinkBindMountedLocalVolumeType: utils.LocalVolumeDirectoryLinkBindMounted,
104	BlockLocalVolumeType:                    utils.LocalVolumeBlock, // block device in Block mode
105	BlockFsWithFormatLocalVolumeType:        utils.LocalVolumeBlockFS,
106	BlockFsWithoutFormatLocalVolumeType:     utils.LocalVolumeBlock, // block device in Filesystem mode (default in this test suite)
107}
108
109type localTestVolume struct {
110	// Local test resource
111	ltr *utils.LocalTestResource
112	// PVC for this volume
113	pvc *v1.PersistentVolumeClaim
114	// PV for this volume
115	pv *v1.PersistentVolume
116	// Type of local volume
117	localVolumeType localVolumeType
118}
119
120const (
121	// TODO: This may not be available/writable on all images.
122	hostBase = "/tmp"
123	// Path to the first volume in the test containers
124	// created via createLocalPod or makeLocalPod
125	// leveraging pv_util.MakePod
126	volumeDir = "/mnt/volume1"
127	// testFile created in setupLocalVolume
128	testFile = "test-file"
129	// testFileContent written into testFile
130	testFileContent = "test-file-content"
131	testSCPrefix    = "local-volume-test-storageclass"
132
133	// A sample request size
134	testRequestSize = "10Mi"
135
136	// Max number of nodes to use for testing
137	maxNodes = 5
138)
139
140var (
141	// storage class volume binding modes
142	waitMode      = storagev1.VolumeBindingWaitForFirstConsumer
143	immediateMode = storagev1.VolumeBindingImmediate
144
145	// Common selinux labels
146	selinuxLabel = &v1.SELinuxOptions{
147		Level: "s0:c0,c1"}
148)
149
150var _ = utils.SIGDescribe("PersistentVolumes-local ", func() {
151	f := framework.NewDefaultFramework("persistent-local-volumes-test")
152
153	var (
154		config *localTestConfig
155		scName string
156	)
157
158	ginkgo.BeforeEach(func() {
159		nodes, err := e2enode.GetBoundedReadySchedulableNodes(f.ClientSet, maxNodes)
160		framework.ExpectNoError(err)
161
162		scName = fmt.Sprintf("%v-%v", testSCPrefix, f.Namespace.Name)
163		// Choose a random node
164		randomNode := &nodes.Items[rand.Intn(len(nodes.Items))]
165
166		hostExec := utils.NewHostExec(f)
167		ltrMgr := utils.NewLocalResourceManager("local-volume-test", hostExec, hostBase)
168		config = &localTestConfig{
169			ns:           f.Namespace.Name,
170			client:       f.ClientSet,
171			timeouts:     f.Timeouts,
172			nodes:        nodes.Items,
173			randomNode:   randomNode,
174			scName:       scName,
175			discoveryDir: filepath.Join(hostBase, f.Namespace.Name),
176			hostExec:     hostExec,
177			ltrMgr:       ltrMgr,
178		}
179	})
180
181	for tempTestVolType := range setupLocalVolumeMap {
182
183		// New variable required for gingko test closures
184		testVolType := tempTestVolType
185		serialStr := ""
186		if testVolType == GCELocalSSDVolumeType {
187			serialStr = " [Serial]"
188		}
189		ctxString := fmt.Sprintf("[Volume type: %s]%v", testVolType, serialStr)
190		testMode := immediateMode
191
192		ginkgo.Context(ctxString, func() {
193			var testVol *localTestVolume
194
195			ginkgo.BeforeEach(func() {
196				if testVolType == GCELocalSSDVolumeType {
197					SkipUnlessLocalSSDExists(config, "scsi", "fs", config.randomNode)
198				}
199				setupStorageClass(config, &testMode)
200				testVols := setupLocalVolumesPVCsPVs(config, testVolType, config.randomNode, 1, testMode)
201				testVol = testVols[0]
202			})
203
204			ginkgo.AfterEach(func() {
205				cleanupLocalVolumes(config, []*localTestVolume{testVol})
206				cleanupStorageClass(config)
207			})
208
209			ginkgo.Context("One pod requesting one prebound PVC", func() {
210				var (
211					pod1    *v1.Pod
212					pod1Err error
213				)
214
215				ginkgo.BeforeEach(func() {
216					ginkgo.By("Creating pod1")
217					pod1, pod1Err = createLocalPod(config, testVol, nil)
218					framework.ExpectNoError(pod1Err)
219					verifyLocalPod(config, testVol, pod1, config.randomNode.Name)
220
221					writeCmd := createWriteCmd(volumeDir, testFile, testFileContent, testVol.localVolumeType)
222
223					ginkgo.By("Writing in pod1")
224					podRWCmdExec(f, pod1, writeCmd)
225				})
226
227				ginkgo.AfterEach(func() {
228					ginkgo.By("Deleting pod1")
229					e2epod.DeletePodOrFail(config.client, config.ns, pod1.Name)
230				})
231
232				ginkgo.It("should be able to mount volume and read from pod1", func() {
233					ginkgo.By("Reading in pod1")
234					// testFileContent was written in BeforeEach
235					testReadFileContent(f, volumeDir, testFile, testFileContent, pod1, testVolType)
236				})
237
238				ginkgo.It("should be able to mount volume and write from pod1", func() {
239					// testFileContent was written in BeforeEach
240					testReadFileContent(f, volumeDir, testFile, testFileContent, pod1, testVolType)
241
242					ginkgo.By("Writing in pod1")
243					writeCmd := createWriteCmd(volumeDir, testFile, testVol.ltr.Path /*writeTestFileContent*/, testVolType)
244					podRWCmdExec(f, pod1, writeCmd)
245				})
246			})
247
248			ginkgo.Context("Two pods mounting a local volume at the same time", func() {
249				ginkgo.It("should be able to write from pod1 and read from pod2", func() {
250					twoPodsReadWriteTest(f, config, testVol)
251				})
252			})
253
254			ginkgo.Context("Two pods mounting a local volume one after the other", func() {
255				ginkgo.It("should be able to write from pod1 and read from pod2", func() {
256					twoPodsReadWriteSerialTest(f, config, testVol)
257				})
258			})
259
260			ginkgo.Context("Set fsGroup for local volume", func() {
261				ginkgo.BeforeEach(func() {
262					if testVolType == BlockLocalVolumeType {
263						e2eskipper.Skipf("We don't set fsGroup on block device, skipped.")
264					}
265				})
266
267				ginkgo.It("should set fsGroup for one pod [Slow]", func() {
268					ginkgo.By("Checking fsGroup is set")
269					pod := createPodWithFsGroupTest(config, testVol, 1234, 1234)
270					ginkgo.By("Deleting pod")
271					e2epod.DeletePodOrFail(config.client, config.ns, pod.Name)
272				})
273
274				ginkgo.It("should set same fsGroup for two pods simultaneously [Slow]", func() {
275					fsGroup := int64(1234)
276					ginkgo.By("Create first pod and check fsGroup is set")
277					pod1 := createPodWithFsGroupTest(config, testVol, fsGroup, fsGroup)
278					ginkgo.By("Create second pod with same fsGroup and check fsGroup is correct")
279					pod2 := createPodWithFsGroupTest(config, testVol, fsGroup, fsGroup)
280					ginkgo.By("Deleting first pod")
281					e2epod.DeletePodOrFail(config.client, config.ns, pod1.Name)
282					ginkgo.By("Deleting second pod")
283					e2epod.DeletePodOrFail(config.client, config.ns, pod2.Name)
284				})
285
286				ginkgo.It("should set different fsGroup for second pod if first pod is deleted", func() {
287					e2eskipper.Skipf("Disabled temporarily, reopen after #73168 is fixed")
288					fsGroup1, fsGroup2 := int64(1234), int64(4321)
289					ginkgo.By("Create first pod and check fsGroup is set")
290					pod1 := createPodWithFsGroupTest(config, testVol, fsGroup1, fsGroup1)
291					ginkgo.By("Deleting first pod")
292					err := e2epod.DeletePodWithWait(config.client, pod1)
293					framework.ExpectNoError(err, "while deleting first pod")
294					ginkgo.By("Create second pod and check fsGroup is the new one")
295					pod2 := createPodWithFsGroupTest(config, testVol, fsGroup2, fsGroup2)
296					ginkgo.By("Deleting second pod")
297					e2epod.DeletePodOrFail(config.client, config.ns, pod2.Name)
298				})
299			})
300
301		})
302	}
303
304	ginkgo.Context("Local volume that cannot be mounted [Slow]", func() {
305		// TODO:
306		// - check for these errors in unit tests instead
307		ginkgo.It("should fail due to non-existent path", func() {
308			testVol := &localTestVolume{
309				ltr: &utils.LocalTestResource{
310					Node: config.randomNode,
311					Path: "/non-existent/location/nowhere",
312				},
313				localVolumeType: DirectoryLocalVolumeType,
314			}
315			ginkgo.By("Creating local PVC and PV")
316			createLocalPVCsPVs(config, []*localTestVolume{testVol}, immediateMode)
317			pod, err := createLocalPod(config, testVol, nil)
318			framework.ExpectError(err)
319			err = e2epod.WaitTimeoutForPodRunningInNamespace(config.client, pod.Name, pod.Namespace, f.Timeouts.PodStart)
320			framework.ExpectError(err)
321			cleanupLocalPVCsPVs(config, []*localTestVolume{testVol})
322		})
323
324		ginkgo.It("should fail due to wrong node", func() {
325			if len(config.nodes) < 2 {
326				e2eskipper.Skipf("Runs only when number of nodes >= 2")
327			}
328
329			testVols := setupLocalVolumesPVCsPVs(config, DirectoryLocalVolumeType, config.randomNode, 1, immediateMode)
330			testVol := testVols[0]
331
332			conflictNodeName := config.nodes[0].Name
333			if conflictNodeName == config.randomNode.Name {
334				conflictNodeName = config.nodes[1].Name
335			}
336			pod := makeLocalPodWithNodeName(config, testVol, conflictNodeName)
337			pod, err := config.client.CoreV1().Pods(config.ns).Create(context.TODO(), pod, metav1.CreateOptions{})
338			framework.ExpectNoError(err)
339
340			err = e2epod.WaitTimeoutForPodRunningInNamespace(config.client, pod.Name, pod.Namespace, f.Timeouts.PodStart)
341			framework.ExpectError(err)
342
343			cleanupLocalVolumes(config, []*localTestVolume{testVol})
344		})
345	})
346
347	ginkgo.Context("Pod with node different from PV's NodeAffinity", func() {
348		var (
349			testVol          *localTestVolume
350			volumeType       localVolumeType
351			conflictNodeName string
352		)
353
354		ginkgo.BeforeEach(func() {
355			if len(config.nodes) < 2 {
356				e2eskipper.Skipf("Runs only when number of nodes >= 2")
357			}
358
359			volumeType = DirectoryLocalVolumeType
360			setupStorageClass(config, &immediateMode)
361			testVols := setupLocalVolumesPVCsPVs(config, volumeType, config.randomNode, 1, immediateMode)
362			conflictNodeName = config.nodes[0].Name
363			if conflictNodeName == config.randomNode.Name {
364				conflictNodeName = config.nodes[1].Name
365			}
366
367			testVol = testVols[0]
368		})
369
370		ginkgo.AfterEach(func() {
371			cleanupLocalVolumes(config, []*localTestVolume{testVol})
372			cleanupStorageClass(config)
373		})
374
375		ginkgo.It("should fail scheduling due to different NodeAffinity", func() {
376			testPodWithNodeConflict(config, testVol, conflictNodeName, makeLocalPodWithNodeAffinity)
377		})
378
379		ginkgo.It("should fail scheduling due to different NodeSelector", func() {
380			testPodWithNodeConflict(config, testVol, conflictNodeName, makeLocalPodWithNodeSelector)
381		})
382	})
383
384	ginkgo.Context("StatefulSet with pod affinity [Slow]", func() {
385		var testVols map[string][]*localTestVolume
386		const (
387			ssReplicas  = 3
388			volsPerNode = 6
389		)
390
391		ginkgo.BeforeEach(func() {
392			setupStorageClass(config, &waitMode)
393
394			testVols = map[string][]*localTestVolume{}
395			for i, node := range config.nodes {
396				// The PVCs created here won't be used
397				ginkgo.By(fmt.Sprintf("Setting up local volumes on node %q", node.Name))
398				vols := setupLocalVolumesPVCsPVs(config, DirectoryLocalVolumeType, &config.nodes[i], volsPerNode, waitMode)
399				testVols[node.Name] = vols
400			}
401		})
402
403		ginkgo.AfterEach(func() {
404			for _, vols := range testVols {
405				cleanupLocalVolumes(config, vols)
406			}
407			cleanupStorageClass(config)
408		})
409
410		ginkgo.It("should use volumes spread across nodes when pod has anti-affinity", func() {
411			if len(config.nodes) < ssReplicas {
412				e2eskipper.Skipf("Runs only when number of nodes >= %v", ssReplicas)
413			}
414			ginkgo.By("Creating a StatefulSet with pod anti-affinity on nodes")
415			ss := createStatefulSet(config, ssReplicas, volsPerNode, true, false)
416			validateStatefulSet(config, ss, true)
417		})
418
419		ginkgo.It("should use volumes on one node when pod has affinity", func() {
420			ginkgo.By("Creating a StatefulSet with pod affinity on nodes")
421			ss := createStatefulSet(config, ssReplicas, volsPerNode/ssReplicas, false, false)
422			validateStatefulSet(config, ss, false)
423		})
424
425		ginkgo.It("should use volumes spread across nodes when pod management is parallel and pod has anti-affinity", func() {
426			if len(config.nodes) < ssReplicas {
427				e2eskipper.Skipf("Runs only when number of nodes >= %v", ssReplicas)
428			}
429			ginkgo.By("Creating a StatefulSet with pod anti-affinity on nodes")
430			ss := createStatefulSet(config, ssReplicas, 1, true, true)
431			validateStatefulSet(config, ss, true)
432		})
433
434		ginkgo.It("should use volumes on one node when pod management is parallel and pod has affinity", func() {
435			ginkgo.By("Creating a StatefulSet with pod affinity on nodes")
436			ss := createStatefulSet(config, ssReplicas, 1, false, true)
437			validateStatefulSet(config, ss, false)
438		})
439	})
440
441	ginkgo.Context("Stress with local volumes [Serial]", func() {
442		var (
443			allLocalVolumes = make(map[string][]*localTestVolume)
444			volType         = TmpfsLocalVolumeType
445			stopCh          = make(chan struct{})
446			wg              sync.WaitGroup
447		)
448
449		const (
450			volsPerNode = 10 // Make this non-divisable by volsPerPod to increase changes of partial binding failure
451			volsPerPod  = 3
452			podsFactor  = 4
453		)
454
455		ginkgo.BeforeEach(func() {
456			setupStorageClass(config, &waitMode)
457			for i, node := range config.nodes {
458				ginkgo.By(fmt.Sprintf("Setting up %d local volumes on node %q", volsPerNode, node.Name))
459				allLocalVolumes[node.Name] = setupLocalVolumes(config, volType, &config.nodes[i], volsPerNode)
460			}
461			ginkgo.By(fmt.Sprintf("Create %d PVs", volsPerNode*len(config.nodes)))
462			var err error
463			for _, localVolumes := range allLocalVolumes {
464				for _, localVolume := range localVolumes {
465					pvConfig := makeLocalPVConfig(config, localVolume)
466					localVolume.pv, err = e2epv.CreatePV(config.client, e2epv.MakePersistentVolume(pvConfig))
467					framework.ExpectNoError(err)
468				}
469			}
470			ginkgo.By("Start a goroutine to recycle unbound PVs")
471			wg.Add(1)
472			go func() {
473				defer ginkgo.GinkgoRecover()
474				defer wg.Done()
475				w, err := config.client.CoreV1().PersistentVolumes().Watch(context.TODO(), metav1.ListOptions{})
476				framework.ExpectNoError(err)
477				if w == nil {
478					return
479				}
480				defer w.Stop()
481				for {
482					select {
483					case event := <-w.ResultChan():
484						if event.Type != watch.Modified {
485							continue
486						}
487						pv, ok := event.Object.(*v1.PersistentVolume)
488						if !ok {
489							continue
490						}
491						if pv.Status.Phase == v1.VolumeBound || pv.Status.Phase == v1.VolumeAvailable {
492							continue
493						}
494						pv, err = config.client.CoreV1().PersistentVolumes().Get(context.TODO(), pv.Name, metav1.GetOptions{})
495						if apierrors.IsNotFound(err) {
496							continue
497						}
498						// Delete and create a new PV for same local volume storage
499						ginkgo.By(fmt.Sprintf("Delete %q and create a new PV for same local volume storage", pv.Name))
500						for _, localVolumes := range allLocalVolumes {
501							for _, localVolume := range localVolumes {
502								if localVolume.pv.Name != pv.Name {
503									continue
504								}
505								err = config.client.CoreV1().PersistentVolumes().Delete(context.TODO(), pv.Name, metav1.DeleteOptions{})
506								framework.ExpectNoError(err)
507								pvConfig := makeLocalPVConfig(config, localVolume)
508								localVolume.pv, err = e2epv.CreatePV(config.client, e2epv.MakePersistentVolume(pvConfig))
509								framework.ExpectNoError(err)
510							}
511						}
512					case <-stopCh:
513						return
514					}
515				}
516			}()
517		})
518
519		ginkgo.AfterEach(func() {
520			ginkgo.By("Stop and wait for recycle goroutine to finish")
521			close(stopCh)
522			wg.Wait()
523			ginkgo.By("Clean all PVs")
524			for nodeName, localVolumes := range allLocalVolumes {
525				ginkgo.By(fmt.Sprintf("Cleaning up %d local volumes on node %q", len(localVolumes), nodeName))
526				cleanupLocalVolumes(config, localVolumes)
527			}
528			cleanupStorageClass(config)
529		})
530
531		ginkgo.It("should be able to process many pods and reuse local volumes", func() {
532			var (
533				podsLock sync.Mutex
534				// Have one extra pod pending
535				numConcurrentPods = volsPerNode/volsPerPod*len(config.nodes) + 1
536				totalPods         = numConcurrentPods * podsFactor
537				numCreated        = 0
538				numFinished       = 0
539				pods              = map[string]*v1.Pod{}
540			)
541
542			// Create pods gradually instead of all at once because scheduler has
543			// exponential backoff
544			ginkgo.By(fmt.Sprintf("Creating %v pods periodically", numConcurrentPods))
545			stop := make(chan struct{})
546			go wait.Until(func() {
547				defer ginkgo.GinkgoRecover()
548				podsLock.Lock()
549				defer podsLock.Unlock()
550
551				if numCreated >= totalPods {
552					// Created all the pods for the test
553					return
554				}
555
556				if len(pods) > numConcurrentPods/2 {
557					// Too many outstanding pods
558					return
559				}
560
561				for i := 0; i < numConcurrentPods; i++ {
562					pvcs := []*v1.PersistentVolumeClaim{}
563					for j := 0; j < volsPerPod; j++ {
564						pvc := e2epv.MakePersistentVolumeClaim(makeLocalPVCConfig(config, volType), config.ns)
565						pvc, err := e2epv.CreatePVC(config.client, config.ns, pvc)
566						framework.ExpectNoError(err)
567						pvcs = append(pvcs, pvc)
568					}
569					podConfig := e2epod.Config{
570						NS:           config.ns,
571						PVCs:         pvcs,
572						Command:      "sleep 1",
573						SeLinuxLabel: selinuxLabel,
574					}
575					pod, err := e2epod.MakeSecPod(&podConfig)
576					framework.ExpectNoError(err)
577					pod, err = config.client.CoreV1().Pods(config.ns).Create(context.TODO(), pod, metav1.CreateOptions{})
578					framework.ExpectNoError(err)
579					pods[pod.Name] = pod
580					numCreated++
581				}
582			}, 2*time.Second, stop)
583
584			defer func() {
585				close(stop)
586				podsLock.Lock()
587				defer podsLock.Unlock()
588
589				for _, pod := range pods {
590					if err := deletePodAndPVCs(config, pod); err != nil {
591						framework.Logf("Deleting pod %v failed: %v", pod.Name, err)
592					}
593				}
594			}()
595
596			ginkgo.By("Waiting for all pods to complete successfully")
597			const completeTimeout = 5 * time.Minute
598			waitErr := wait.PollImmediate(time.Second, completeTimeout, func() (done bool, err error) {
599				podsList, err := config.client.CoreV1().Pods(config.ns).List(context.TODO(), metav1.ListOptions{})
600				if err != nil {
601					return false, err
602				}
603
604				podsLock.Lock()
605				defer podsLock.Unlock()
606
607				for _, pod := range podsList.Items {
608					if pod.Status.Phase == v1.PodSucceeded {
609						// Delete pod and its PVCs
610						if err := deletePodAndPVCs(config, &pod); err != nil {
611							return false, err
612						}
613						delete(pods, pod.Name)
614						numFinished++
615						framework.Logf("%v/%v pods finished", numFinished, totalPods)
616					}
617				}
618
619				return numFinished == totalPods, nil
620			})
621			framework.ExpectNoError(waitErr, "some pods failed to complete within %v", completeTimeout)
622		})
623	})
624
625	ginkgo.Context("Pods sharing a single local PV [Serial]", func() {
626		var (
627			pv *v1.PersistentVolume
628		)
629
630		ginkgo.BeforeEach(func() {
631			localVolume := &localTestVolume{
632				ltr: &utils.LocalTestResource{
633					Node: config.randomNode,
634					Path: "/tmp",
635				},
636				localVolumeType: DirectoryLocalVolumeType,
637			}
638			pvConfig := makeLocalPVConfig(config, localVolume)
639			var err error
640			pv, err = e2epv.CreatePV(config.client, e2epv.MakePersistentVolume(pvConfig))
641			framework.ExpectNoError(err)
642		})
643
644		ginkgo.AfterEach(func() {
645			if pv == nil {
646				return
647			}
648			ginkgo.By(fmt.Sprintf("Clean PV %s", pv.Name))
649			err := config.client.CoreV1().PersistentVolumes().Delete(context.TODO(), pv.Name, metav1.DeleteOptions{})
650			framework.ExpectNoError(err)
651		})
652
653		ginkgo.It("all pods should be running", func() {
654			var (
655				pvc   *v1.PersistentVolumeClaim
656				pods  = map[string]*v1.Pod{}
657				count = 2
658				err   error
659			)
660			pvc = e2epv.MakePersistentVolumeClaim(makeLocalPVCConfig(config, DirectoryLocalVolumeType), config.ns)
661			ginkgo.By(fmt.Sprintf("Create a PVC %s", pvc.Name))
662			pvc, err = e2epv.CreatePVC(config.client, config.ns, pvc)
663			framework.ExpectNoError(err)
664			ginkgo.By(fmt.Sprintf("Create %d pods to use this PVC", count))
665			podConfig := e2epod.Config{
666				NS:           config.ns,
667				PVCs:         []*v1.PersistentVolumeClaim{pvc},
668				SeLinuxLabel: selinuxLabel,
669			}
670			for i := 0; i < count; i++ {
671
672				pod, err := e2epod.MakeSecPod(&podConfig)
673				framework.ExpectNoError(err)
674				pod, err = config.client.CoreV1().Pods(config.ns).Create(context.TODO(), pod, metav1.CreateOptions{})
675				framework.ExpectNoError(err)
676				pods[pod.Name] = pod
677			}
678			ginkgo.By("Wait for all pods are running")
679			const runningTimeout = 5 * time.Minute
680			waitErr := wait.PollImmediate(time.Second, runningTimeout, func() (done bool, err error) {
681				podsList, err := config.client.CoreV1().Pods(config.ns).List(context.TODO(), metav1.ListOptions{})
682				if err != nil {
683					return false, err
684				}
685				runningPods := 0
686				for _, pod := range podsList.Items {
687					switch pod.Status.Phase {
688					case v1.PodRunning:
689						runningPods++
690					}
691				}
692				return runningPods == count, nil
693			})
694			framework.ExpectNoError(waitErr, "Some pods are not running within %v", runningTimeout)
695		})
696	})
697})
698
699func deletePodAndPVCs(config *localTestConfig, pod *v1.Pod) error {
700	framework.Logf("Deleting pod %v", pod.Name)
701	if err := config.client.CoreV1().Pods(config.ns).Delete(context.TODO(), pod.Name, metav1.DeleteOptions{}); err != nil {
702		return err
703	}
704
705	// Delete PVCs
706	for _, vol := range pod.Spec.Volumes {
707		pvcSource := vol.VolumeSource.PersistentVolumeClaim
708		if pvcSource != nil {
709			if err := e2epv.DeletePersistentVolumeClaim(config.client, pvcSource.ClaimName, config.ns); err != nil {
710				return err
711			}
712		}
713	}
714	return nil
715}
716
717type makeLocalPodWith func(config *localTestConfig, volume *localTestVolume, nodeName string) *v1.Pod
718
719func testPodWithNodeConflict(config *localTestConfig, testVol *localTestVolume, nodeName string, makeLocalPodFunc makeLocalPodWith) {
720	ginkgo.By(fmt.Sprintf("local-volume-type: %s", testVol.localVolumeType))
721
722	pod := makeLocalPodFunc(config, testVol, nodeName)
723	pod, err := config.client.CoreV1().Pods(config.ns).Create(context.TODO(), pod, metav1.CreateOptions{})
724	framework.ExpectNoError(err)
725
726	err = e2epod.WaitForPodNameUnschedulableInNamespace(config.client, pod.Name, pod.Namespace)
727	framework.ExpectNoError(err)
728}
729
730// The tests below are run against multiple mount point types
731
732// Test two pods at the same time, write from pod1, and read from pod2
733func twoPodsReadWriteTest(f *framework.Framework, config *localTestConfig, testVol *localTestVolume) {
734	ginkgo.By("Creating pod1 to write to the PV")
735	pod1, pod1Err := createLocalPod(config, testVol, nil)
736	framework.ExpectNoError(pod1Err)
737	verifyLocalPod(config, testVol, pod1, config.randomNode.Name)
738
739	writeCmd := createWriteCmd(volumeDir, testFile, testFileContent, testVol.localVolumeType)
740
741	ginkgo.By("Writing in pod1")
742	podRWCmdExec(f, pod1, writeCmd)
743
744	// testFileContent was written after creating pod1
745	testReadFileContent(f, volumeDir, testFile, testFileContent, pod1, testVol.localVolumeType)
746
747	ginkgo.By("Creating pod2 to read from the PV")
748	pod2, pod2Err := createLocalPod(config, testVol, nil)
749	framework.ExpectNoError(pod2Err)
750	verifyLocalPod(config, testVol, pod2, config.randomNode.Name)
751
752	// testFileContent was written after creating pod1
753	testReadFileContent(f, volumeDir, testFile, testFileContent, pod2, testVol.localVolumeType)
754
755	writeCmd = createWriteCmd(volumeDir, testFile, testVol.ltr.Path /*writeTestFileContent*/, testVol.localVolumeType)
756
757	ginkgo.By("Writing in pod2")
758	podRWCmdExec(f, pod2, writeCmd)
759
760	ginkgo.By("Reading in pod1")
761	testReadFileContent(f, volumeDir, testFile, testVol.ltr.Path, pod1, testVol.localVolumeType)
762
763	ginkgo.By("Deleting pod1")
764	e2epod.DeletePodOrFail(config.client, config.ns, pod1.Name)
765	ginkgo.By("Deleting pod2")
766	e2epod.DeletePodOrFail(config.client, config.ns, pod2.Name)
767}
768
769// Test two pods one after other, write from pod1, and read from pod2
770func twoPodsReadWriteSerialTest(f *framework.Framework, config *localTestConfig, testVol *localTestVolume) {
771	ginkgo.By("Creating pod1")
772	pod1, pod1Err := createLocalPod(config, testVol, nil)
773	framework.ExpectNoError(pod1Err)
774	verifyLocalPod(config, testVol, pod1, config.randomNode.Name)
775
776	writeCmd := createWriteCmd(volumeDir, testFile, testFileContent, testVol.localVolumeType)
777
778	ginkgo.By("Writing in pod1")
779	podRWCmdExec(f, pod1, writeCmd)
780
781	// testFileContent was written after creating pod1
782	testReadFileContent(f, volumeDir, testFile, testFileContent, pod1, testVol.localVolumeType)
783
784	ginkgo.By("Deleting pod1")
785	e2epod.DeletePodOrFail(config.client, config.ns, pod1.Name)
786
787	ginkgo.By("Creating pod2")
788	pod2, pod2Err := createLocalPod(config, testVol, nil)
789	framework.ExpectNoError(pod2Err)
790	verifyLocalPod(config, testVol, pod2, config.randomNode.Name)
791
792	ginkgo.By("Reading in pod2")
793	testReadFileContent(f, volumeDir, testFile, testFileContent, pod2, testVol.localVolumeType)
794
795	ginkgo.By("Deleting pod2")
796	e2epod.DeletePodOrFail(config.client, config.ns, pod2.Name)
797}
798
799// Test creating pod with fsGroup, and check fsGroup is expected fsGroup.
800func createPodWithFsGroupTest(config *localTestConfig, testVol *localTestVolume, fsGroup int64, expectedFsGroup int64) *v1.Pod {
801	pod, err := createLocalPod(config, testVol, &fsGroup)
802	framework.ExpectNoError(err)
803	_, err = framework.LookForStringInPodExec(config.ns, pod.Name, []string{"stat", "-c", "%g", volumeDir}, strconv.FormatInt(expectedFsGroup, 10), time.Second*3)
804	framework.ExpectNoError(err, "failed to get expected fsGroup %d on directory %s in pod %s", fsGroup, volumeDir, pod.Name)
805	return pod
806}
807
808func setupStorageClass(config *localTestConfig, mode *storagev1.VolumeBindingMode) {
809	sc := &storagev1.StorageClass{
810		ObjectMeta: metav1.ObjectMeta{
811			Name: config.scName,
812		},
813		Provisioner:       "kubernetes.io/no-provisioner",
814		VolumeBindingMode: mode,
815	}
816
817	_, err := config.client.StorageV1().StorageClasses().Create(context.TODO(), sc, metav1.CreateOptions{})
818	framework.ExpectNoError(err)
819}
820
821func cleanupStorageClass(config *localTestConfig) {
822	framework.ExpectNoError(config.client.StorageV1().StorageClasses().Delete(context.TODO(), config.scName, metav1.DeleteOptions{}))
823}
824
825// podNode wraps RunKubectl to get node where pod is running
826func podNodeName(config *localTestConfig, pod *v1.Pod) (string, error) {
827	runtimePod, runtimePodErr := config.client.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{})
828	return runtimePod.Spec.NodeName, runtimePodErr
829}
830
831// setupLocalVolumes sets up directories to use for local PV
832func setupLocalVolumes(config *localTestConfig, localVolumeType localVolumeType, node *v1.Node, count int) []*localTestVolume {
833	vols := []*localTestVolume{}
834	for i := 0; i < count; i++ {
835		ltrType, ok := setupLocalVolumeMap[localVolumeType]
836		framework.ExpectEqual(ok, true)
837		ltr := config.ltrMgr.Create(node, ltrType, nil)
838		vols = append(vols, &localTestVolume{
839			ltr:             ltr,
840			localVolumeType: localVolumeType,
841		})
842	}
843	return vols
844}
845
846func cleanupLocalPVCsPVs(config *localTestConfig, volumes []*localTestVolume) {
847	for _, volume := range volumes {
848		ginkgo.By("Cleaning up PVC and PV")
849		errs := e2epv.PVPVCCleanup(config.client, config.ns, volume.pv, volume.pvc)
850		if len(errs) > 0 {
851			framework.Failf("Failed to delete PV and/or PVC: %v", utilerrors.NewAggregate(errs))
852		}
853	}
854}
855
856// Deletes the PVC/PV, and launches a pod with hostpath volume to remove the test directory
857func cleanupLocalVolumes(config *localTestConfig, volumes []*localTestVolume) {
858	cleanupLocalPVCsPVs(config, volumes)
859
860	for _, volume := range volumes {
861		config.ltrMgr.Remove(volume.ltr)
862	}
863}
864
865func verifyLocalVolume(config *localTestConfig, volume *localTestVolume) {
866	framework.ExpectNoError(e2epv.WaitOnPVandPVC(config.client, config.timeouts, config.ns, volume.pv, volume.pvc))
867}
868
869func verifyLocalPod(config *localTestConfig, volume *localTestVolume, pod *v1.Pod, expectedNodeName string) {
870	podNodeName, err := podNodeName(config, pod)
871	framework.ExpectNoError(err)
872	framework.Logf("pod %q created on Node %q", pod.Name, podNodeName)
873	framework.ExpectEqual(podNodeName, expectedNodeName)
874}
875
876func makeLocalPVCConfig(config *localTestConfig, volumeType localVolumeType) e2epv.PersistentVolumeClaimConfig {
877	pvcConfig := e2epv.PersistentVolumeClaimConfig{
878		AccessModes:      []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce},
879		StorageClassName: &config.scName,
880	}
881	if volumeType == BlockLocalVolumeType {
882		pvcVolumeMode := v1.PersistentVolumeBlock
883		pvcConfig.VolumeMode = &pvcVolumeMode
884	}
885	return pvcConfig
886}
887
888func makeLocalPVConfig(config *localTestConfig, volume *localTestVolume) e2epv.PersistentVolumeConfig {
889	// TODO: hostname may not be the best option
890	nodeKey := "kubernetes.io/hostname"
891	if volume.ltr.Node.Labels == nil {
892		framework.Failf("Node does not have labels")
893	}
894	nodeValue, found := volume.ltr.Node.Labels[nodeKey]
895	if !found {
896		framework.Failf("Node does not have required label %q", nodeKey)
897	}
898
899	pvConfig := e2epv.PersistentVolumeConfig{
900		PVSource: v1.PersistentVolumeSource{
901			Local: &v1.LocalVolumeSource{
902				Path: volume.ltr.Path,
903			},
904		},
905		NamePrefix:       "local-pv",
906		StorageClassName: config.scName,
907		NodeAffinity: &v1.VolumeNodeAffinity{
908			Required: &v1.NodeSelector{
909				NodeSelectorTerms: []v1.NodeSelectorTerm{
910					{
911						MatchExpressions: []v1.NodeSelectorRequirement{
912							{
913								Key:      nodeKey,
914								Operator: v1.NodeSelectorOpIn,
915								Values:   []string{nodeValue},
916							},
917						},
918					},
919				},
920			},
921		},
922	}
923
924	if volume.localVolumeType == BlockLocalVolumeType {
925		pvVolumeMode := v1.PersistentVolumeBlock
926		pvConfig.VolumeMode = &pvVolumeMode
927	}
928	return pvConfig
929}
930
931// Creates a PVC and PV with prebinding
932func createLocalPVCsPVs(config *localTestConfig, volumes []*localTestVolume, mode storagev1.VolumeBindingMode) {
933	var err error
934
935	for _, volume := range volumes {
936		pvcConfig := makeLocalPVCConfig(config, volume.localVolumeType)
937		pvConfig := makeLocalPVConfig(config, volume)
938
939		volume.pv, volume.pvc, err = e2epv.CreatePVPVC(config.client, pvConfig, pvcConfig, config.ns, false)
940		framework.ExpectNoError(err)
941	}
942
943	if mode == storagev1.VolumeBindingImmediate {
944		for _, volume := range volumes {
945			verifyLocalVolume(config, volume)
946		}
947	} else {
948		// Verify PVCs are not bound by waiting for phase==bound with a timeout and asserting that we hit the timeout.
949		// There isn't really a great way to verify this without making the test be slow...
950		const bindTimeout = 10 * time.Second
951		waitErr := wait.PollImmediate(time.Second, bindTimeout, func() (done bool, err error) {
952			for _, volume := range volumes {
953				pvc, err := config.client.CoreV1().PersistentVolumeClaims(volume.pvc.Namespace).Get(context.TODO(), volume.pvc.Name, metav1.GetOptions{})
954				if err != nil {
955					return false, fmt.Errorf("failed to get PVC %s/%s: %v", volume.pvc.Namespace, volume.pvc.Name, err)
956				}
957				if pvc.Status.Phase != v1.ClaimPending {
958					return true, nil
959				}
960			}
961			return false, nil
962		})
963		if waitErr == wait.ErrWaitTimeout {
964			framework.Logf("PVCs were not bound within %v (that's good)", bindTimeout)
965			waitErr = nil
966		}
967		framework.ExpectNoError(waitErr, "Error making sure PVCs are not bound")
968	}
969}
970
971func makeLocalPodWithNodeAffinity(config *localTestConfig, volume *localTestVolume, nodeName string) (pod *v1.Pod) {
972	affinity := &v1.Affinity{
973		NodeAffinity: &v1.NodeAffinity{
974			RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
975				NodeSelectorTerms: []v1.NodeSelectorTerm{
976					{
977						MatchExpressions: []v1.NodeSelectorRequirement{
978							{
979								Key:      "kubernetes.io/hostname",
980								Operator: v1.NodeSelectorOpIn,
981								Values:   []string{nodeName},
982							},
983						},
984					},
985				},
986			},
987		},
988	}
989	podConfig := e2epod.Config{
990		NS:            config.ns,
991		PVCs:          []*v1.PersistentVolumeClaim{volume.pvc},
992		SeLinuxLabel:  selinuxLabel,
993		NodeSelection: e2epod.NodeSelection{Affinity: affinity},
994	}
995	pod, err := e2epod.MakeSecPod(&podConfig)
996	if pod == nil || err != nil {
997		return
998	}
999	pod.Spec.Affinity = affinity
1000	return
1001}
1002
1003func makeLocalPodWithNodeSelector(config *localTestConfig, volume *localTestVolume, nodeName string) (pod *v1.Pod) {
1004	ns := map[string]string{
1005		"kubernetes.io/hostname": nodeName,
1006	}
1007	podConfig := e2epod.Config{
1008		NS:            config.ns,
1009		PVCs:          []*v1.PersistentVolumeClaim{volume.pvc},
1010		SeLinuxLabel:  selinuxLabel,
1011		NodeSelection: e2epod.NodeSelection{Selector: ns},
1012	}
1013	pod, err := e2epod.MakeSecPod(&podConfig)
1014	if pod == nil || err != nil {
1015		return
1016	}
1017	return
1018}
1019
1020func makeLocalPodWithNodeName(config *localTestConfig, volume *localTestVolume, nodeName string) (pod *v1.Pod) {
1021	podConfig := e2epod.Config{
1022		NS:           config.ns,
1023		PVCs:         []*v1.PersistentVolumeClaim{volume.pvc},
1024		SeLinuxLabel: selinuxLabel,
1025	}
1026	pod, err := e2epod.MakeSecPod(&podConfig)
1027	if pod == nil || err != nil {
1028		return
1029	}
1030
1031	e2epod.SetNodeAffinity(&pod.Spec, nodeName)
1032	return
1033}
1034
1035func createLocalPod(config *localTestConfig, volume *localTestVolume, fsGroup *int64) (*v1.Pod, error) {
1036	ginkgo.By("Creating a pod")
1037	podConfig := e2epod.Config{
1038		NS:           config.ns,
1039		PVCs:         []*v1.PersistentVolumeClaim{volume.pvc},
1040		SeLinuxLabel: selinuxLabel,
1041		FsGroup:      fsGroup,
1042	}
1043	return e2epod.CreateSecPod(config.client, &podConfig, config.timeouts.PodStart)
1044}
1045
1046func createWriteCmd(testDir string, testFile string, writeTestFileContent string, volumeType localVolumeType) string {
1047	if volumeType == BlockLocalVolumeType {
1048		// testDir is the block device.
1049		testFileDir := filepath.Join("/tmp", testDir)
1050		testFilePath := filepath.Join(testFileDir, testFile)
1051		// Create a file containing the testFileContent.
1052		writeTestFileCmd := fmt.Sprintf("mkdir -p %s; echo %s > %s", testFileDir, writeTestFileContent, testFilePath)
1053		// sudo is needed when using ssh exec to node.
1054		// sudo is not needed and does not exist in some containers (e.g. busybox), when using pod exec.
1055		sudoCmd := fmt.Sprintf("SUDO_CMD=$(which sudo); echo ${SUDO_CMD}")
1056		// Write the testFileContent into the block device.
1057		writeBlockCmd := fmt.Sprintf("${SUDO_CMD} dd if=%s of=%s bs=512 count=100", testFilePath, testDir)
1058		// Cleanup the file containing testFileContent.
1059		deleteTestFileCmd := fmt.Sprintf("rm %s", testFilePath)
1060		return fmt.Sprintf("%s && %s && %s && %s", writeTestFileCmd, sudoCmd, writeBlockCmd, deleteTestFileCmd)
1061	}
1062	testFilePath := filepath.Join(testDir, testFile)
1063	return fmt.Sprintf("mkdir -p %s; echo %s > %s", testDir, writeTestFileContent, testFilePath)
1064}
1065
1066func createReadCmd(testFileDir string, testFile string, volumeType localVolumeType) string {
1067	if volumeType == BlockLocalVolumeType {
1068		// Create the command to read the beginning of the block device and print it in ascii.
1069		return fmt.Sprintf("hexdump -n 100 -e '100 \"%%_p\"' %s | head -1", testFileDir)
1070	}
1071	// Create the command to read (aka cat) a file.
1072	testFilePath := filepath.Join(testFileDir, testFile)
1073	return fmt.Sprintf("cat %s", testFilePath)
1074}
1075
1076// Read testFile and evaluate whether it contains the testFileContent
1077func testReadFileContent(f *framework.Framework, testFileDir string, testFile string, testFileContent string, pod *v1.Pod, volumeType localVolumeType) {
1078	readCmd := createReadCmd(testFileDir, testFile, volumeType)
1079	readOut := podRWCmdExec(f, pod, readCmd)
1080	gomega.Expect(readOut).To(gomega.ContainSubstring(testFileContent))
1081}
1082
1083// Execute a read or write command in a pod.
1084// Fail on error
1085func podRWCmdExec(f *framework.Framework, pod *v1.Pod, cmd string) string {
1086	stdout, stderr, err := e2evolume.PodExec(f, pod, cmd)
1087	framework.Logf("podRWCmdExec cmd: %q, out: %q, stderr: %q, err: %v", cmd, stdout, stderr, err)
1088	framework.ExpectNoError(err)
1089	return stdout
1090}
1091
1092// Initialize test volume on node
1093// and create local PVC and PV
1094func setupLocalVolumesPVCsPVs(
1095	config *localTestConfig,
1096	localVolumeType localVolumeType,
1097	node *v1.Node,
1098	count int,
1099	mode storagev1.VolumeBindingMode) []*localTestVolume {
1100
1101	ginkgo.By("Initializing test volumes")
1102	testVols := setupLocalVolumes(config, localVolumeType, node, count)
1103
1104	ginkgo.By("Creating local PVCs and PVs")
1105	createLocalPVCsPVs(config, testVols, mode)
1106
1107	return testVols
1108}
1109
1110// newLocalClaim creates a new persistent volume claim.
1111func newLocalClaimWithName(config *localTestConfig, name string) *v1.PersistentVolumeClaim {
1112	claim := v1.PersistentVolumeClaim{
1113		ObjectMeta: metav1.ObjectMeta{
1114			Name:      name,
1115			Namespace: config.ns,
1116		},
1117		Spec: v1.PersistentVolumeClaimSpec{
1118			StorageClassName: &config.scName,
1119			AccessModes: []v1.PersistentVolumeAccessMode{
1120				v1.ReadWriteOnce,
1121			},
1122			Resources: v1.ResourceRequirements{
1123				Requests: v1.ResourceList{
1124					v1.ResourceName(v1.ResourceStorage): resource.MustParse(testRequestSize),
1125				},
1126			},
1127		},
1128	}
1129
1130	return &claim
1131}
1132
1133func createStatefulSet(config *localTestConfig, ssReplicas int32, volumeCount int, anti, parallel bool) *appsv1.StatefulSet {
1134	mounts := []v1.VolumeMount{}
1135	claims := []v1.PersistentVolumeClaim{}
1136	for i := 0; i < volumeCount; i++ {
1137		name := fmt.Sprintf("vol%v", i+1)
1138		pvc := newLocalClaimWithName(config, name)
1139		mounts = append(mounts, v1.VolumeMount{Name: name, MountPath: "/" + name})
1140		claims = append(claims, *pvc)
1141	}
1142
1143	podAffinityTerms := []v1.PodAffinityTerm{
1144		{
1145			LabelSelector: &metav1.LabelSelector{
1146				MatchExpressions: []metav1.LabelSelectorRequirement{
1147					{
1148						Key:      "app",
1149						Operator: metav1.LabelSelectorOpIn,
1150						Values:   []string{"local-volume-test"},
1151					},
1152				},
1153			},
1154			TopologyKey: "kubernetes.io/hostname",
1155		},
1156	}
1157
1158	affinity := v1.Affinity{}
1159	if anti {
1160		affinity.PodAntiAffinity = &v1.PodAntiAffinity{
1161			RequiredDuringSchedulingIgnoredDuringExecution: podAffinityTerms,
1162		}
1163	} else {
1164		affinity.PodAffinity = &v1.PodAffinity{
1165			RequiredDuringSchedulingIgnoredDuringExecution: podAffinityTerms,
1166		}
1167	}
1168
1169	labels := map[string]string{"app": "local-volume-test"}
1170	spec := &appsv1.StatefulSet{
1171		ObjectMeta: metav1.ObjectMeta{
1172			Name:      "local-volume-statefulset",
1173			Namespace: config.ns,
1174		},
1175		Spec: appsv1.StatefulSetSpec{
1176			Selector: &metav1.LabelSelector{
1177				MatchLabels: map[string]string{"app": "local-volume-test"},
1178			},
1179			Replicas: &ssReplicas,
1180			Template: v1.PodTemplateSpec{
1181				ObjectMeta: metav1.ObjectMeta{
1182					Labels: labels,
1183				},
1184				Spec: v1.PodSpec{
1185					Containers: []v1.Container{
1186						{
1187							Name:         "nginx",
1188							Image:        imageutils.GetE2EImage(imageutils.Nginx),
1189							VolumeMounts: mounts,
1190						},
1191					},
1192					Affinity: &affinity,
1193				},
1194			},
1195			VolumeClaimTemplates: claims,
1196			ServiceName:          "test-service",
1197		},
1198	}
1199
1200	if parallel {
1201		spec.Spec.PodManagementPolicy = appsv1.ParallelPodManagement
1202	}
1203
1204	ss, err := config.client.AppsV1().StatefulSets(config.ns).Create(context.TODO(), spec, metav1.CreateOptions{})
1205	framework.ExpectNoError(err)
1206
1207	e2estatefulset.WaitForRunningAndReady(config.client, ssReplicas, ss)
1208	return ss
1209}
1210
1211func validateStatefulSet(config *localTestConfig, ss *appsv1.StatefulSet, anti bool) {
1212	pods := e2estatefulset.GetPodList(config.client, ss)
1213
1214	nodes := sets.NewString()
1215	for _, pod := range pods.Items {
1216		nodes.Insert(pod.Spec.NodeName)
1217	}
1218
1219	if anti {
1220		// Verify that each pod is on a different node
1221		framework.ExpectEqual(nodes.Len(), len(pods.Items))
1222	} else {
1223		// Verify that all pods are on same node.
1224		framework.ExpectEqual(nodes.Len(), 1)
1225	}
1226
1227	// Validate all PVCs are bound
1228	for _, pod := range pods.Items {
1229		for _, volume := range pod.Spec.Volumes {
1230			pvcSource := volume.VolumeSource.PersistentVolumeClaim
1231			if pvcSource != nil {
1232				err := e2epv.WaitForPersistentVolumeClaimPhase(
1233					v1.ClaimBound, config.client, config.ns, pvcSource.ClaimName, framework.Poll, time.Second)
1234				framework.ExpectNoError(err)
1235			}
1236		}
1237	}
1238}
1239
1240// SkipUnlessLocalSSDExists takes in an ssdInterface (scsi/nvme) and a filesystemType (fs/block)
1241// and skips if a disk of that type does not exist on the node
1242func SkipUnlessLocalSSDExists(config *localTestConfig, ssdInterface, filesystemType string, node *v1.Node) {
1243	ssdCmd := fmt.Sprintf("ls -1 /mnt/disks/by-uuid/google-local-ssds-%s-%s/ | wc -l", ssdInterface, filesystemType)
1244	res, err := config.hostExec.Execute(ssdCmd, node)
1245	utils.LogResult(res)
1246	framework.ExpectNoError(err)
1247	num, err := strconv.Atoi(strings.TrimSpace(res.Stdout))
1248	framework.ExpectNoError(err)
1249	if num < 1 {
1250		e2eskipper.Skipf("Requires at least 1 %s %s localSSD ", ssdInterface, filesystemType)
1251	}
1252}
1253