1/* 2Copyright 2018 The Kubernetes Authors. 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package storage 18 19import ( 20 "context" 21 "fmt" 22 "math/rand" 23 "path/filepath" 24 "strconv" 25 "strings" 26 "sync" 27 "time" 28 29 "github.com/onsi/ginkgo" 30 "github.com/onsi/gomega" 31 32 appsv1 "k8s.io/api/apps/v1" 33 v1 "k8s.io/api/core/v1" 34 storagev1 "k8s.io/api/storage/v1" 35 apierrors "k8s.io/apimachinery/pkg/api/errors" 36 "k8s.io/apimachinery/pkg/api/resource" 37 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" 38 utilerrors "k8s.io/apimachinery/pkg/util/errors" 39 "k8s.io/apimachinery/pkg/util/sets" 40 "k8s.io/apimachinery/pkg/util/wait" 41 "k8s.io/apimachinery/pkg/watch" 42 clientset "k8s.io/client-go/kubernetes" 43 "k8s.io/kubernetes/test/e2e/framework" 44 e2enode "k8s.io/kubernetes/test/e2e/framework/node" 45 e2epod "k8s.io/kubernetes/test/e2e/framework/pod" 46 e2epv "k8s.io/kubernetes/test/e2e/framework/pv" 47 e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper" 48 e2estatefulset "k8s.io/kubernetes/test/e2e/framework/statefulset" 49 e2evolume "k8s.io/kubernetes/test/e2e/framework/volume" 50 "k8s.io/kubernetes/test/e2e/storage/utils" 51 imageutils "k8s.io/kubernetes/test/utils/image" 52) 53 54type localTestConfig struct { 55 ns string 56 nodes []v1.Node 57 randomNode *v1.Node 58 client clientset.Interface 59 timeouts *framework.TimeoutContext 60 scName string 61 discoveryDir string 62 hostExec utils.HostExec 63 ltrMgr utils.LocalTestResourceManager 64} 65 66type localVolumeType string 67 68const ( 69 // DirectoryLocalVolumeType is the default local volume type, aka a directory 70 DirectoryLocalVolumeType localVolumeType = "dir" 71 // DirectoryLinkLocalVolumeType is like DirectoryLocalVolumeType, 72 // but it's a symbolic link to directory 73 DirectoryLinkLocalVolumeType localVolumeType = "dir-link" 74 // DirectoryBindMountedLocalVolumeType is like DirectoryLocalVolumeType 75 // but bind mounted 76 DirectoryBindMountedLocalVolumeType localVolumeType = "dir-bindmounted" 77 // DirectoryLinkBindMountedLocalVolumeType is like DirectoryLocalVolumeType, 78 // but it's a symbolic link to self bind mounted directory 79 // Note that bind mounting at symbolic link actually mounts at directory it 80 // links to. 81 DirectoryLinkBindMountedLocalVolumeType localVolumeType = "dir-link-bindmounted" 82 // TmpfsLocalVolumeType creates a tmpfs and mounts it 83 TmpfsLocalVolumeType localVolumeType = "tmpfs" 84 // GCELocalSSDVolumeType tests based on local ssd at /mnt/disks/by-uuid/ 85 GCELocalSSDVolumeType localVolumeType = "gce-localssd-scsi-fs" 86 // BlockLocalVolumeType creates a local file, formats it, and maps it as a block device. 87 BlockLocalVolumeType localVolumeType = "block" 88 // BlockFsWithFormatLocalVolumeType creates a local file serving as the backing for block device, 89 // formats it, and mounts it to use as FS mode local volume. 90 BlockFsWithFormatLocalVolumeType localVolumeType = "blockfswithformat" 91 // BlockFsWithoutFormatLocalVolumeType creates a local file serving as the backing for block device, 92 // does not format it manually, and mounts it to use as FS mode local volume. 93 BlockFsWithoutFormatLocalVolumeType localVolumeType = "blockfswithoutformat" 94) 95 96// map to local test resource type 97var setupLocalVolumeMap = map[localVolumeType]utils.LocalVolumeType{ 98 GCELocalSSDVolumeType: utils.LocalVolumeGCELocalSSD, 99 TmpfsLocalVolumeType: utils.LocalVolumeTmpfs, 100 DirectoryLocalVolumeType: utils.LocalVolumeDirectory, 101 DirectoryLinkLocalVolumeType: utils.LocalVolumeDirectoryLink, 102 DirectoryBindMountedLocalVolumeType: utils.LocalVolumeDirectoryBindMounted, 103 DirectoryLinkBindMountedLocalVolumeType: utils.LocalVolumeDirectoryLinkBindMounted, 104 BlockLocalVolumeType: utils.LocalVolumeBlock, // block device in Block mode 105 BlockFsWithFormatLocalVolumeType: utils.LocalVolumeBlockFS, 106 BlockFsWithoutFormatLocalVolumeType: utils.LocalVolumeBlock, // block device in Filesystem mode (default in this test suite) 107} 108 109type localTestVolume struct { 110 // Local test resource 111 ltr *utils.LocalTestResource 112 // PVC for this volume 113 pvc *v1.PersistentVolumeClaim 114 // PV for this volume 115 pv *v1.PersistentVolume 116 // Type of local volume 117 localVolumeType localVolumeType 118} 119 120const ( 121 // TODO: This may not be available/writable on all images. 122 hostBase = "/tmp" 123 // Path to the first volume in the test containers 124 // created via createLocalPod or makeLocalPod 125 // leveraging pv_util.MakePod 126 volumeDir = "/mnt/volume1" 127 // testFile created in setupLocalVolume 128 testFile = "test-file" 129 // testFileContent written into testFile 130 testFileContent = "test-file-content" 131 testSCPrefix = "local-volume-test-storageclass" 132 133 // A sample request size 134 testRequestSize = "10Mi" 135 136 // Max number of nodes to use for testing 137 maxNodes = 5 138) 139 140var ( 141 // storage class volume binding modes 142 waitMode = storagev1.VolumeBindingWaitForFirstConsumer 143 immediateMode = storagev1.VolumeBindingImmediate 144 145 // Common selinux labels 146 selinuxLabel = &v1.SELinuxOptions{ 147 Level: "s0:c0,c1"} 148) 149 150var _ = utils.SIGDescribe("PersistentVolumes-local ", func() { 151 f := framework.NewDefaultFramework("persistent-local-volumes-test") 152 153 var ( 154 config *localTestConfig 155 scName string 156 ) 157 158 ginkgo.BeforeEach(func() { 159 nodes, err := e2enode.GetBoundedReadySchedulableNodes(f.ClientSet, maxNodes) 160 framework.ExpectNoError(err) 161 162 scName = fmt.Sprintf("%v-%v", testSCPrefix, f.Namespace.Name) 163 // Choose a random node 164 randomNode := &nodes.Items[rand.Intn(len(nodes.Items))] 165 166 hostExec := utils.NewHostExec(f) 167 ltrMgr := utils.NewLocalResourceManager("local-volume-test", hostExec, hostBase) 168 config = &localTestConfig{ 169 ns: f.Namespace.Name, 170 client: f.ClientSet, 171 timeouts: f.Timeouts, 172 nodes: nodes.Items, 173 randomNode: randomNode, 174 scName: scName, 175 discoveryDir: filepath.Join(hostBase, f.Namespace.Name), 176 hostExec: hostExec, 177 ltrMgr: ltrMgr, 178 } 179 }) 180 181 for tempTestVolType := range setupLocalVolumeMap { 182 183 // New variable required for gingko test closures 184 testVolType := tempTestVolType 185 serialStr := "" 186 if testVolType == GCELocalSSDVolumeType { 187 serialStr = " [Serial]" 188 } 189 ctxString := fmt.Sprintf("[Volume type: %s]%v", testVolType, serialStr) 190 testMode := immediateMode 191 192 ginkgo.Context(ctxString, func() { 193 var testVol *localTestVolume 194 195 ginkgo.BeforeEach(func() { 196 if testVolType == GCELocalSSDVolumeType { 197 SkipUnlessLocalSSDExists(config, "scsi", "fs", config.randomNode) 198 } 199 setupStorageClass(config, &testMode) 200 testVols := setupLocalVolumesPVCsPVs(config, testVolType, config.randomNode, 1, testMode) 201 testVol = testVols[0] 202 }) 203 204 ginkgo.AfterEach(func() { 205 cleanupLocalVolumes(config, []*localTestVolume{testVol}) 206 cleanupStorageClass(config) 207 }) 208 209 ginkgo.Context("One pod requesting one prebound PVC", func() { 210 var ( 211 pod1 *v1.Pod 212 pod1Err error 213 ) 214 215 ginkgo.BeforeEach(func() { 216 ginkgo.By("Creating pod1") 217 pod1, pod1Err = createLocalPod(config, testVol, nil) 218 framework.ExpectNoError(pod1Err) 219 verifyLocalPod(config, testVol, pod1, config.randomNode.Name) 220 221 writeCmd := createWriteCmd(volumeDir, testFile, testFileContent, testVol.localVolumeType) 222 223 ginkgo.By("Writing in pod1") 224 podRWCmdExec(f, pod1, writeCmd) 225 }) 226 227 ginkgo.AfterEach(func() { 228 ginkgo.By("Deleting pod1") 229 e2epod.DeletePodOrFail(config.client, config.ns, pod1.Name) 230 }) 231 232 ginkgo.It("should be able to mount volume and read from pod1", func() { 233 ginkgo.By("Reading in pod1") 234 // testFileContent was written in BeforeEach 235 testReadFileContent(f, volumeDir, testFile, testFileContent, pod1, testVolType) 236 }) 237 238 ginkgo.It("should be able to mount volume and write from pod1", func() { 239 // testFileContent was written in BeforeEach 240 testReadFileContent(f, volumeDir, testFile, testFileContent, pod1, testVolType) 241 242 ginkgo.By("Writing in pod1") 243 writeCmd := createWriteCmd(volumeDir, testFile, testVol.ltr.Path /*writeTestFileContent*/, testVolType) 244 podRWCmdExec(f, pod1, writeCmd) 245 }) 246 }) 247 248 ginkgo.Context("Two pods mounting a local volume at the same time", func() { 249 ginkgo.It("should be able to write from pod1 and read from pod2", func() { 250 twoPodsReadWriteTest(f, config, testVol) 251 }) 252 }) 253 254 ginkgo.Context("Two pods mounting a local volume one after the other", func() { 255 ginkgo.It("should be able to write from pod1 and read from pod2", func() { 256 twoPodsReadWriteSerialTest(f, config, testVol) 257 }) 258 }) 259 260 ginkgo.Context("Set fsGroup for local volume", func() { 261 ginkgo.BeforeEach(func() { 262 if testVolType == BlockLocalVolumeType { 263 e2eskipper.Skipf("We don't set fsGroup on block device, skipped.") 264 } 265 }) 266 267 ginkgo.It("should set fsGroup for one pod [Slow]", func() { 268 ginkgo.By("Checking fsGroup is set") 269 pod := createPodWithFsGroupTest(config, testVol, 1234, 1234) 270 ginkgo.By("Deleting pod") 271 e2epod.DeletePodOrFail(config.client, config.ns, pod.Name) 272 }) 273 274 ginkgo.It("should set same fsGroup for two pods simultaneously [Slow]", func() { 275 fsGroup := int64(1234) 276 ginkgo.By("Create first pod and check fsGroup is set") 277 pod1 := createPodWithFsGroupTest(config, testVol, fsGroup, fsGroup) 278 ginkgo.By("Create second pod with same fsGroup and check fsGroup is correct") 279 pod2 := createPodWithFsGroupTest(config, testVol, fsGroup, fsGroup) 280 ginkgo.By("Deleting first pod") 281 e2epod.DeletePodOrFail(config.client, config.ns, pod1.Name) 282 ginkgo.By("Deleting second pod") 283 e2epod.DeletePodOrFail(config.client, config.ns, pod2.Name) 284 }) 285 286 ginkgo.It("should set different fsGroup for second pod if first pod is deleted", func() { 287 e2eskipper.Skipf("Disabled temporarily, reopen after #73168 is fixed") 288 fsGroup1, fsGroup2 := int64(1234), int64(4321) 289 ginkgo.By("Create first pod and check fsGroup is set") 290 pod1 := createPodWithFsGroupTest(config, testVol, fsGroup1, fsGroup1) 291 ginkgo.By("Deleting first pod") 292 err := e2epod.DeletePodWithWait(config.client, pod1) 293 framework.ExpectNoError(err, "while deleting first pod") 294 ginkgo.By("Create second pod and check fsGroup is the new one") 295 pod2 := createPodWithFsGroupTest(config, testVol, fsGroup2, fsGroup2) 296 ginkgo.By("Deleting second pod") 297 e2epod.DeletePodOrFail(config.client, config.ns, pod2.Name) 298 }) 299 }) 300 301 }) 302 } 303 304 ginkgo.Context("Local volume that cannot be mounted [Slow]", func() { 305 // TODO: 306 // - check for these errors in unit tests instead 307 ginkgo.It("should fail due to non-existent path", func() { 308 testVol := &localTestVolume{ 309 ltr: &utils.LocalTestResource{ 310 Node: config.randomNode, 311 Path: "/non-existent/location/nowhere", 312 }, 313 localVolumeType: DirectoryLocalVolumeType, 314 } 315 ginkgo.By("Creating local PVC and PV") 316 createLocalPVCsPVs(config, []*localTestVolume{testVol}, immediateMode) 317 pod, err := createLocalPod(config, testVol, nil) 318 framework.ExpectError(err) 319 err = e2epod.WaitTimeoutForPodRunningInNamespace(config.client, pod.Name, pod.Namespace, f.Timeouts.PodStart) 320 framework.ExpectError(err) 321 cleanupLocalPVCsPVs(config, []*localTestVolume{testVol}) 322 }) 323 324 ginkgo.It("should fail due to wrong node", func() { 325 if len(config.nodes) < 2 { 326 e2eskipper.Skipf("Runs only when number of nodes >= 2") 327 } 328 329 testVols := setupLocalVolumesPVCsPVs(config, DirectoryLocalVolumeType, config.randomNode, 1, immediateMode) 330 testVol := testVols[0] 331 332 conflictNodeName := config.nodes[0].Name 333 if conflictNodeName == config.randomNode.Name { 334 conflictNodeName = config.nodes[1].Name 335 } 336 pod := makeLocalPodWithNodeName(config, testVol, conflictNodeName) 337 pod, err := config.client.CoreV1().Pods(config.ns).Create(context.TODO(), pod, metav1.CreateOptions{}) 338 framework.ExpectNoError(err) 339 340 err = e2epod.WaitTimeoutForPodRunningInNamespace(config.client, pod.Name, pod.Namespace, f.Timeouts.PodStart) 341 framework.ExpectError(err) 342 343 cleanupLocalVolumes(config, []*localTestVolume{testVol}) 344 }) 345 }) 346 347 ginkgo.Context("Pod with node different from PV's NodeAffinity", func() { 348 var ( 349 testVol *localTestVolume 350 volumeType localVolumeType 351 conflictNodeName string 352 ) 353 354 ginkgo.BeforeEach(func() { 355 if len(config.nodes) < 2 { 356 e2eskipper.Skipf("Runs only when number of nodes >= 2") 357 } 358 359 volumeType = DirectoryLocalVolumeType 360 setupStorageClass(config, &immediateMode) 361 testVols := setupLocalVolumesPVCsPVs(config, volumeType, config.randomNode, 1, immediateMode) 362 conflictNodeName = config.nodes[0].Name 363 if conflictNodeName == config.randomNode.Name { 364 conflictNodeName = config.nodes[1].Name 365 } 366 367 testVol = testVols[0] 368 }) 369 370 ginkgo.AfterEach(func() { 371 cleanupLocalVolumes(config, []*localTestVolume{testVol}) 372 cleanupStorageClass(config) 373 }) 374 375 ginkgo.It("should fail scheduling due to different NodeAffinity", func() { 376 testPodWithNodeConflict(config, testVol, conflictNodeName, makeLocalPodWithNodeAffinity) 377 }) 378 379 ginkgo.It("should fail scheduling due to different NodeSelector", func() { 380 testPodWithNodeConflict(config, testVol, conflictNodeName, makeLocalPodWithNodeSelector) 381 }) 382 }) 383 384 ginkgo.Context("StatefulSet with pod affinity [Slow]", func() { 385 var testVols map[string][]*localTestVolume 386 const ( 387 ssReplicas = 3 388 volsPerNode = 6 389 ) 390 391 ginkgo.BeforeEach(func() { 392 setupStorageClass(config, &waitMode) 393 394 testVols = map[string][]*localTestVolume{} 395 for i, node := range config.nodes { 396 // The PVCs created here won't be used 397 ginkgo.By(fmt.Sprintf("Setting up local volumes on node %q", node.Name)) 398 vols := setupLocalVolumesPVCsPVs(config, DirectoryLocalVolumeType, &config.nodes[i], volsPerNode, waitMode) 399 testVols[node.Name] = vols 400 } 401 }) 402 403 ginkgo.AfterEach(func() { 404 for _, vols := range testVols { 405 cleanupLocalVolumes(config, vols) 406 } 407 cleanupStorageClass(config) 408 }) 409 410 ginkgo.It("should use volumes spread across nodes when pod has anti-affinity", func() { 411 if len(config.nodes) < ssReplicas { 412 e2eskipper.Skipf("Runs only when number of nodes >= %v", ssReplicas) 413 } 414 ginkgo.By("Creating a StatefulSet with pod anti-affinity on nodes") 415 ss := createStatefulSet(config, ssReplicas, volsPerNode, true, false) 416 validateStatefulSet(config, ss, true) 417 }) 418 419 ginkgo.It("should use volumes on one node when pod has affinity", func() { 420 ginkgo.By("Creating a StatefulSet with pod affinity on nodes") 421 ss := createStatefulSet(config, ssReplicas, volsPerNode/ssReplicas, false, false) 422 validateStatefulSet(config, ss, false) 423 }) 424 425 ginkgo.It("should use volumes spread across nodes when pod management is parallel and pod has anti-affinity", func() { 426 if len(config.nodes) < ssReplicas { 427 e2eskipper.Skipf("Runs only when number of nodes >= %v", ssReplicas) 428 } 429 ginkgo.By("Creating a StatefulSet with pod anti-affinity on nodes") 430 ss := createStatefulSet(config, ssReplicas, 1, true, true) 431 validateStatefulSet(config, ss, true) 432 }) 433 434 ginkgo.It("should use volumes on one node when pod management is parallel and pod has affinity", func() { 435 ginkgo.By("Creating a StatefulSet with pod affinity on nodes") 436 ss := createStatefulSet(config, ssReplicas, 1, false, true) 437 validateStatefulSet(config, ss, false) 438 }) 439 }) 440 441 ginkgo.Context("Stress with local volumes [Serial]", func() { 442 var ( 443 allLocalVolumes = make(map[string][]*localTestVolume) 444 volType = TmpfsLocalVolumeType 445 stopCh = make(chan struct{}) 446 wg sync.WaitGroup 447 ) 448 449 const ( 450 volsPerNode = 10 // Make this non-divisable by volsPerPod to increase changes of partial binding failure 451 volsPerPod = 3 452 podsFactor = 4 453 ) 454 455 ginkgo.BeforeEach(func() { 456 setupStorageClass(config, &waitMode) 457 for i, node := range config.nodes { 458 ginkgo.By(fmt.Sprintf("Setting up %d local volumes on node %q", volsPerNode, node.Name)) 459 allLocalVolumes[node.Name] = setupLocalVolumes(config, volType, &config.nodes[i], volsPerNode) 460 } 461 ginkgo.By(fmt.Sprintf("Create %d PVs", volsPerNode*len(config.nodes))) 462 var err error 463 for _, localVolumes := range allLocalVolumes { 464 for _, localVolume := range localVolumes { 465 pvConfig := makeLocalPVConfig(config, localVolume) 466 localVolume.pv, err = e2epv.CreatePV(config.client, e2epv.MakePersistentVolume(pvConfig)) 467 framework.ExpectNoError(err) 468 } 469 } 470 ginkgo.By("Start a goroutine to recycle unbound PVs") 471 wg.Add(1) 472 go func() { 473 defer ginkgo.GinkgoRecover() 474 defer wg.Done() 475 w, err := config.client.CoreV1().PersistentVolumes().Watch(context.TODO(), metav1.ListOptions{}) 476 framework.ExpectNoError(err) 477 if w == nil { 478 return 479 } 480 defer w.Stop() 481 for { 482 select { 483 case event := <-w.ResultChan(): 484 if event.Type != watch.Modified { 485 continue 486 } 487 pv, ok := event.Object.(*v1.PersistentVolume) 488 if !ok { 489 continue 490 } 491 if pv.Status.Phase == v1.VolumeBound || pv.Status.Phase == v1.VolumeAvailable { 492 continue 493 } 494 pv, err = config.client.CoreV1().PersistentVolumes().Get(context.TODO(), pv.Name, metav1.GetOptions{}) 495 if apierrors.IsNotFound(err) { 496 continue 497 } 498 // Delete and create a new PV for same local volume storage 499 ginkgo.By(fmt.Sprintf("Delete %q and create a new PV for same local volume storage", pv.Name)) 500 for _, localVolumes := range allLocalVolumes { 501 for _, localVolume := range localVolumes { 502 if localVolume.pv.Name != pv.Name { 503 continue 504 } 505 err = config.client.CoreV1().PersistentVolumes().Delete(context.TODO(), pv.Name, metav1.DeleteOptions{}) 506 framework.ExpectNoError(err) 507 pvConfig := makeLocalPVConfig(config, localVolume) 508 localVolume.pv, err = e2epv.CreatePV(config.client, e2epv.MakePersistentVolume(pvConfig)) 509 framework.ExpectNoError(err) 510 } 511 } 512 case <-stopCh: 513 return 514 } 515 } 516 }() 517 }) 518 519 ginkgo.AfterEach(func() { 520 ginkgo.By("Stop and wait for recycle goroutine to finish") 521 close(stopCh) 522 wg.Wait() 523 ginkgo.By("Clean all PVs") 524 for nodeName, localVolumes := range allLocalVolumes { 525 ginkgo.By(fmt.Sprintf("Cleaning up %d local volumes on node %q", len(localVolumes), nodeName)) 526 cleanupLocalVolumes(config, localVolumes) 527 } 528 cleanupStorageClass(config) 529 }) 530 531 ginkgo.It("should be able to process many pods and reuse local volumes", func() { 532 var ( 533 podsLock sync.Mutex 534 // Have one extra pod pending 535 numConcurrentPods = volsPerNode/volsPerPod*len(config.nodes) + 1 536 totalPods = numConcurrentPods * podsFactor 537 numCreated = 0 538 numFinished = 0 539 pods = map[string]*v1.Pod{} 540 ) 541 542 // Create pods gradually instead of all at once because scheduler has 543 // exponential backoff 544 ginkgo.By(fmt.Sprintf("Creating %v pods periodically", numConcurrentPods)) 545 stop := make(chan struct{}) 546 go wait.Until(func() { 547 defer ginkgo.GinkgoRecover() 548 podsLock.Lock() 549 defer podsLock.Unlock() 550 551 if numCreated >= totalPods { 552 // Created all the pods for the test 553 return 554 } 555 556 if len(pods) > numConcurrentPods/2 { 557 // Too many outstanding pods 558 return 559 } 560 561 for i := 0; i < numConcurrentPods; i++ { 562 pvcs := []*v1.PersistentVolumeClaim{} 563 for j := 0; j < volsPerPod; j++ { 564 pvc := e2epv.MakePersistentVolumeClaim(makeLocalPVCConfig(config, volType), config.ns) 565 pvc, err := e2epv.CreatePVC(config.client, config.ns, pvc) 566 framework.ExpectNoError(err) 567 pvcs = append(pvcs, pvc) 568 } 569 podConfig := e2epod.Config{ 570 NS: config.ns, 571 PVCs: pvcs, 572 Command: "sleep 1", 573 SeLinuxLabel: selinuxLabel, 574 } 575 pod, err := e2epod.MakeSecPod(&podConfig) 576 framework.ExpectNoError(err) 577 pod, err = config.client.CoreV1().Pods(config.ns).Create(context.TODO(), pod, metav1.CreateOptions{}) 578 framework.ExpectNoError(err) 579 pods[pod.Name] = pod 580 numCreated++ 581 } 582 }, 2*time.Second, stop) 583 584 defer func() { 585 close(stop) 586 podsLock.Lock() 587 defer podsLock.Unlock() 588 589 for _, pod := range pods { 590 if err := deletePodAndPVCs(config, pod); err != nil { 591 framework.Logf("Deleting pod %v failed: %v", pod.Name, err) 592 } 593 } 594 }() 595 596 ginkgo.By("Waiting for all pods to complete successfully") 597 const completeTimeout = 5 * time.Minute 598 waitErr := wait.PollImmediate(time.Second, completeTimeout, func() (done bool, err error) { 599 podsList, err := config.client.CoreV1().Pods(config.ns).List(context.TODO(), metav1.ListOptions{}) 600 if err != nil { 601 return false, err 602 } 603 604 podsLock.Lock() 605 defer podsLock.Unlock() 606 607 for _, pod := range podsList.Items { 608 if pod.Status.Phase == v1.PodSucceeded { 609 // Delete pod and its PVCs 610 if err := deletePodAndPVCs(config, &pod); err != nil { 611 return false, err 612 } 613 delete(pods, pod.Name) 614 numFinished++ 615 framework.Logf("%v/%v pods finished", numFinished, totalPods) 616 } 617 } 618 619 return numFinished == totalPods, nil 620 }) 621 framework.ExpectNoError(waitErr, "some pods failed to complete within %v", completeTimeout) 622 }) 623 }) 624 625 ginkgo.Context("Pods sharing a single local PV [Serial]", func() { 626 var ( 627 pv *v1.PersistentVolume 628 ) 629 630 ginkgo.BeforeEach(func() { 631 localVolume := &localTestVolume{ 632 ltr: &utils.LocalTestResource{ 633 Node: config.randomNode, 634 Path: "/tmp", 635 }, 636 localVolumeType: DirectoryLocalVolumeType, 637 } 638 pvConfig := makeLocalPVConfig(config, localVolume) 639 var err error 640 pv, err = e2epv.CreatePV(config.client, e2epv.MakePersistentVolume(pvConfig)) 641 framework.ExpectNoError(err) 642 }) 643 644 ginkgo.AfterEach(func() { 645 if pv == nil { 646 return 647 } 648 ginkgo.By(fmt.Sprintf("Clean PV %s", pv.Name)) 649 err := config.client.CoreV1().PersistentVolumes().Delete(context.TODO(), pv.Name, metav1.DeleteOptions{}) 650 framework.ExpectNoError(err) 651 }) 652 653 ginkgo.It("all pods should be running", func() { 654 var ( 655 pvc *v1.PersistentVolumeClaim 656 pods = map[string]*v1.Pod{} 657 count = 2 658 err error 659 ) 660 pvc = e2epv.MakePersistentVolumeClaim(makeLocalPVCConfig(config, DirectoryLocalVolumeType), config.ns) 661 ginkgo.By(fmt.Sprintf("Create a PVC %s", pvc.Name)) 662 pvc, err = e2epv.CreatePVC(config.client, config.ns, pvc) 663 framework.ExpectNoError(err) 664 ginkgo.By(fmt.Sprintf("Create %d pods to use this PVC", count)) 665 podConfig := e2epod.Config{ 666 NS: config.ns, 667 PVCs: []*v1.PersistentVolumeClaim{pvc}, 668 SeLinuxLabel: selinuxLabel, 669 } 670 for i := 0; i < count; i++ { 671 672 pod, err := e2epod.MakeSecPod(&podConfig) 673 framework.ExpectNoError(err) 674 pod, err = config.client.CoreV1().Pods(config.ns).Create(context.TODO(), pod, metav1.CreateOptions{}) 675 framework.ExpectNoError(err) 676 pods[pod.Name] = pod 677 } 678 ginkgo.By("Wait for all pods are running") 679 const runningTimeout = 5 * time.Minute 680 waitErr := wait.PollImmediate(time.Second, runningTimeout, func() (done bool, err error) { 681 podsList, err := config.client.CoreV1().Pods(config.ns).List(context.TODO(), metav1.ListOptions{}) 682 if err != nil { 683 return false, err 684 } 685 runningPods := 0 686 for _, pod := range podsList.Items { 687 switch pod.Status.Phase { 688 case v1.PodRunning: 689 runningPods++ 690 } 691 } 692 return runningPods == count, nil 693 }) 694 framework.ExpectNoError(waitErr, "Some pods are not running within %v", runningTimeout) 695 }) 696 }) 697}) 698 699func deletePodAndPVCs(config *localTestConfig, pod *v1.Pod) error { 700 framework.Logf("Deleting pod %v", pod.Name) 701 if err := config.client.CoreV1().Pods(config.ns).Delete(context.TODO(), pod.Name, metav1.DeleteOptions{}); err != nil { 702 return err 703 } 704 705 // Delete PVCs 706 for _, vol := range pod.Spec.Volumes { 707 pvcSource := vol.VolumeSource.PersistentVolumeClaim 708 if pvcSource != nil { 709 if err := e2epv.DeletePersistentVolumeClaim(config.client, pvcSource.ClaimName, config.ns); err != nil { 710 return err 711 } 712 } 713 } 714 return nil 715} 716 717type makeLocalPodWith func(config *localTestConfig, volume *localTestVolume, nodeName string) *v1.Pod 718 719func testPodWithNodeConflict(config *localTestConfig, testVol *localTestVolume, nodeName string, makeLocalPodFunc makeLocalPodWith) { 720 ginkgo.By(fmt.Sprintf("local-volume-type: %s", testVol.localVolumeType)) 721 722 pod := makeLocalPodFunc(config, testVol, nodeName) 723 pod, err := config.client.CoreV1().Pods(config.ns).Create(context.TODO(), pod, metav1.CreateOptions{}) 724 framework.ExpectNoError(err) 725 726 err = e2epod.WaitForPodNameUnschedulableInNamespace(config.client, pod.Name, pod.Namespace) 727 framework.ExpectNoError(err) 728} 729 730// The tests below are run against multiple mount point types 731 732// Test two pods at the same time, write from pod1, and read from pod2 733func twoPodsReadWriteTest(f *framework.Framework, config *localTestConfig, testVol *localTestVolume) { 734 ginkgo.By("Creating pod1 to write to the PV") 735 pod1, pod1Err := createLocalPod(config, testVol, nil) 736 framework.ExpectNoError(pod1Err) 737 verifyLocalPod(config, testVol, pod1, config.randomNode.Name) 738 739 writeCmd := createWriteCmd(volumeDir, testFile, testFileContent, testVol.localVolumeType) 740 741 ginkgo.By("Writing in pod1") 742 podRWCmdExec(f, pod1, writeCmd) 743 744 // testFileContent was written after creating pod1 745 testReadFileContent(f, volumeDir, testFile, testFileContent, pod1, testVol.localVolumeType) 746 747 ginkgo.By("Creating pod2 to read from the PV") 748 pod2, pod2Err := createLocalPod(config, testVol, nil) 749 framework.ExpectNoError(pod2Err) 750 verifyLocalPod(config, testVol, pod2, config.randomNode.Name) 751 752 // testFileContent was written after creating pod1 753 testReadFileContent(f, volumeDir, testFile, testFileContent, pod2, testVol.localVolumeType) 754 755 writeCmd = createWriteCmd(volumeDir, testFile, testVol.ltr.Path /*writeTestFileContent*/, testVol.localVolumeType) 756 757 ginkgo.By("Writing in pod2") 758 podRWCmdExec(f, pod2, writeCmd) 759 760 ginkgo.By("Reading in pod1") 761 testReadFileContent(f, volumeDir, testFile, testVol.ltr.Path, pod1, testVol.localVolumeType) 762 763 ginkgo.By("Deleting pod1") 764 e2epod.DeletePodOrFail(config.client, config.ns, pod1.Name) 765 ginkgo.By("Deleting pod2") 766 e2epod.DeletePodOrFail(config.client, config.ns, pod2.Name) 767} 768 769// Test two pods one after other, write from pod1, and read from pod2 770func twoPodsReadWriteSerialTest(f *framework.Framework, config *localTestConfig, testVol *localTestVolume) { 771 ginkgo.By("Creating pod1") 772 pod1, pod1Err := createLocalPod(config, testVol, nil) 773 framework.ExpectNoError(pod1Err) 774 verifyLocalPod(config, testVol, pod1, config.randomNode.Name) 775 776 writeCmd := createWriteCmd(volumeDir, testFile, testFileContent, testVol.localVolumeType) 777 778 ginkgo.By("Writing in pod1") 779 podRWCmdExec(f, pod1, writeCmd) 780 781 // testFileContent was written after creating pod1 782 testReadFileContent(f, volumeDir, testFile, testFileContent, pod1, testVol.localVolumeType) 783 784 ginkgo.By("Deleting pod1") 785 e2epod.DeletePodOrFail(config.client, config.ns, pod1.Name) 786 787 ginkgo.By("Creating pod2") 788 pod2, pod2Err := createLocalPod(config, testVol, nil) 789 framework.ExpectNoError(pod2Err) 790 verifyLocalPod(config, testVol, pod2, config.randomNode.Name) 791 792 ginkgo.By("Reading in pod2") 793 testReadFileContent(f, volumeDir, testFile, testFileContent, pod2, testVol.localVolumeType) 794 795 ginkgo.By("Deleting pod2") 796 e2epod.DeletePodOrFail(config.client, config.ns, pod2.Name) 797} 798 799// Test creating pod with fsGroup, and check fsGroup is expected fsGroup. 800func createPodWithFsGroupTest(config *localTestConfig, testVol *localTestVolume, fsGroup int64, expectedFsGroup int64) *v1.Pod { 801 pod, err := createLocalPod(config, testVol, &fsGroup) 802 framework.ExpectNoError(err) 803 _, err = framework.LookForStringInPodExec(config.ns, pod.Name, []string{"stat", "-c", "%g", volumeDir}, strconv.FormatInt(expectedFsGroup, 10), time.Second*3) 804 framework.ExpectNoError(err, "failed to get expected fsGroup %d on directory %s in pod %s", fsGroup, volumeDir, pod.Name) 805 return pod 806} 807 808func setupStorageClass(config *localTestConfig, mode *storagev1.VolumeBindingMode) { 809 sc := &storagev1.StorageClass{ 810 ObjectMeta: metav1.ObjectMeta{ 811 Name: config.scName, 812 }, 813 Provisioner: "kubernetes.io/no-provisioner", 814 VolumeBindingMode: mode, 815 } 816 817 _, err := config.client.StorageV1().StorageClasses().Create(context.TODO(), sc, metav1.CreateOptions{}) 818 framework.ExpectNoError(err) 819} 820 821func cleanupStorageClass(config *localTestConfig) { 822 framework.ExpectNoError(config.client.StorageV1().StorageClasses().Delete(context.TODO(), config.scName, metav1.DeleteOptions{})) 823} 824 825// podNode wraps RunKubectl to get node where pod is running 826func podNodeName(config *localTestConfig, pod *v1.Pod) (string, error) { 827 runtimePod, runtimePodErr := config.client.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{}) 828 return runtimePod.Spec.NodeName, runtimePodErr 829} 830 831// setupLocalVolumes sets up directories to use for local PV 832func setupLocalVolumes(config *localTestConfig, localVolumeType localVolumeType, node *v1.Node, count int) []*localTestVolume { 833 vols := []*localTestVolume{} 834 for i := 0; i < count; i++ { 835 ltrType, ok := setupLocalVolumeMap[localVolumeType] 836 framework.ExpectEqual(ok, true) 837 ltr := config.ltrMgr.Create(node, ltrType, nil) 838 vols = append(vols, &localTestVolume{ 839 ltr: ltr, 840 localVolumeType: localVolumeType, 841 }) 842 } 843 return vols 844} 845 846func cleanupLocalPVCsPVs(config *localTestConfig, volumes []*localTestVolume) { 847 for _, volume := range volumes { 848 ginkgo.By("Cleaning up PVC and PV") 849 errs := e2epv.PVPVCCleanup(config.client, config.ns, volume.pv, volume.pvc) 850 if len(errs) > 0 { 851 framework.Failf("Failed to delete PV and/or PVC: %v", utilerrors.NewAggregate(errs)) 852 } 853 } 854} 855 856// Deletes the PVC/PV, and launches a pod with hostpath volume to remove the test directory 857func cleanupLocalVolumes(config *localTestConfig, volumes []*localTestVolume) { 858 cleanupLocalPVCsPVs(config, volumes) 859 860 for _, volume := range volumes { 861 config.ltrMgr.Remove(volume.ltr) 862 } 863} 864 865func verifyLocalVolume(config *localTestConfig, volume *localTestVolume) { 866 framework.ExpectNoError(e2epv.WaitOnPVandPVC(config.client, config.timeouts, config.ns, volume.pv, volume.pvc)) 867} 868 869func verifyLocalPod(config *localTestConfig, volume *localTestVolume, pod *v1.Pod, expectedNodeName string) { 870 podNodeName, err := podNodeName(config, pod) 871 framework.ExpectNoError(err) 872 framework.Logf("pod %q created on Node %q", pod.Name, podNodeName) 873 framework.ExpectEqual(podNodeName, expectedNodeName) 874} 875 876func makeLocalPVCConfig(config *localTestConfig, volumeType localVolumeType) e2epv.PersistentVolumeClaimConfig { 877 pvcConfig := e2epv.PersistentVolumeClaimConfig{ 878 AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce}, 879 StorageClassName: &config.scName, 880 } 881 if volumeType == BlockLocalVolumeType { 882 pvcVolumeMode := v1.PersistentVolumeBlock 883 pvcConfig.VolumeMode = &pvcVolumeMode 884 } 885 return pvcConfig 886} 887 888func makeLocalPVConfig(config *localTestConfig, volume *localTestVolume) e2epv.PersistentVolumeConfig { 889 // TODO: hostname may not be the best option 890 nodeKey := "kubernetes.io/hostname" 891 if volume.ltr.Node.Labels == nil { 892 framework.Failf("Node does not have labels") 893 } 894 nodeValue, found := volume.ltr.Node.Labels[nodeKey] 895 if !found { 896 framework.Failf("Node does not have required label %q", nodeKey) 897 } 898 899 pvConfig := e2epv.PersistentVolumeConfig{ 900 PVSource: v1.PersistentVolumeSource{ 901 Local: &v1.LocalVolumeSource{ 902 Path: volume.ltr.Path, 903 }, 904 }, 905 NamePrefix: "local-pv", 906 StorageClassName: config.scName, 907 NodeAffinity: &v1.VolumeNodeAffinity{ 908 Required: &v1.NodeSelector{ 909 NodeSelectorTerms: []v1.NodeSelectorTerm{ 910 { 911 MatchExpressions: []v1.NodeSelectorRequirement{ 912 { 913 Key: nodeKey, 914 Operator: v1.NodeSelectorOpIn, 915 Values: []string{nodeValue}, 916 }, 917 }, 918 }, 919 }, 920 }, 921 }, 922 } 923 924 if volume.localVolumeType == BlockLocalVolumeType { 925 pvVolumeMode := v1.PersistentVolumeBlock 926 pvConfig.VolumeMode = &pvVolumeMode 927 } 928 return pvConfig 929} 930 931// Creates a PVC and PV with prebinding 932func createLocalPVCsPVs(config *localTestConfig, volumes []*localTestVolume, mode storagev1.VolumeBindingMode) { 933 var err error 934 935 for _, volume := range volumes { 936 pvcConfig := makeLocalPVCConfig(config, volume.localVolumeType) 937 pvConfig := makeLocalPVConfig(config, volume) 938 939 volume.pv, volume.pvc, err = e2epv.CreatePVPVC(config.client, pvConfig, pvcConfig, config.ns, false) 940 framework.ExpectNoError(err) 941 } 942 943 if mode == storagev1.VolumeBindingImmediate { 944 for _, volume := range volumes { 945 verifyLocalVolume(config, volume) 946 } 947 } else { 948 // Verify PVCs are not bound by waiting for phase==bound with a timeout and asserting that we hit the timeout. 949 // There isn't really a great way to verify this without making the test be slow... 950 const bindTimeout = 10 * time.Second 951 waitErr := wait.PollImmediate(time.Second, bindTimeout, func() (done bool, err error) { 952 for _, volume := range volumes { 953 pvc, err := config.client.CoreV1().PersistentVolumeClaims(volume.pvc.Namespace).Get(context.TODO(), volume.pvc.Name, metav1.GetOptions{}) 954 if err != nil { 955 return false, fmt.Errorf("failed to get PVC %s/%s: %v", volume.pvc.Namespace, volume.pvc.Name, err) 956 } 957 if pvc.Status.Phase != v1.ClaimPending { 958 return true, nil 959 } 960 } 961 return false, nil 962 }) 963 if waitErr == wait.ErrWaitTimeout { 964 framework.Logf("PVCs were not bound within %v (that's good)", bindTimeout) 965 waitErr = nil 966 } 967 framework.ExpectNoError(waitErr, "Error making sure PVCs are not bound") 968 } 969} 970 971func makeLocalPodWithNodeAffinity(config *localTestConfig, volume *localTestVolume, nodeName string) (pod *v1.Pod) { 972 affinity := &v1.Affinity{ 973 NodeAffinity: &v1.NodeAffinity{ 974 RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ 975 NodeSelectorTerms: []v1.NodeSelectorTerm{ 976 { 977 MatchExpressions: []v1.NodeSelectorRequirement{ 978 { 979 Key: "kubernetes.io/hostname", 980 Operator: v1.NodeSelectorOpIn, 981 Values: []string{nodeName}, 982 }, 983 }, 984 }, 985 }, 986 }, 987 }, 988 } 989 podConfig := e2epod.Config{ 990 NS: config.ns, 991 PVCs: []*v1.PersistentVolumeClaim{volume.pvc}, 992 SeLinuxLabel: selinuxLabel, 993 NodeSelection: e2epod.NodeSelection{Affinity: affinity}, 994 } 995 pod, err := e2epod.MakeSecPod(&podConfig) 996 if pod == nil || err != nil { 997 return 998 } 999 pod.Spec.Affinity = affinity 1000 return 1001} 1002 1003func makeLocalPodWithNodeSelector(config *localTestConfig, volume *localTestVolume, nodeName string) (pod *v1.Pod) { 1004 ns := map[string]string{ 1005 "kubernetes.io/hostname": nodeName, 1006 } 1007 podConfig := e2epod.Config{ 1008 NS: config.ns, 1009 PVCs: []*v1.PersistentVolumeClaim{volume.pvc}, 1010 SeLinuxLabel: selinuxLabel, 1011 NodeSelection: e2epod.NodeSelection{Selector: ns}, 1012 } 1013 pod, err := e2epod.MakeSecPod(&podConfig) 1014 if pod == nil || err != nil { 1015 return 1016 } 1017 return 1018} 1019 1020func makeLocalPodWithNodeName(config *localTestConfig, volume *localTestVolume, nodeName string) (pod *v1.Pod) { 1021 podConfig := e2epod.Config{ 1022 NS: config.ns, 1023 PVCs: []*v1.PersistentVolumeClaim{volume.pvc}, 1024 SeLinuxLabel: selinuxLabel, 1025 } 1026 pod, err := e2epod.MakeSecPod(&podConfig) 1027 if pod == nil || err != nil { 1028 return 1029 } 1030 1031 e2epod.SetNodeAffinity(&pod.Spec, nodeName) 1032 return 1033} 1034 1035func createLocalPod(config *localTestConfig, volume *localTestVolume, fsGroup *int64) (*v1.Pod, error) { 1036 ginkgo.By("Creating a pod") 1037 podConfig := e2epod.Config{ 1038 NS: config.ns, 1039 PVCs: []*v1.PersistentVolumeClaim{volume.pvc}, 1040 SeLinuxLabel: selinuxLabel, 1041 FsGroup: fsGroup, 1042 } 1043 return e2epod.CreateSecPod(config.client, &podConfig, config.timeouts.PodStart) 1044} 1045 1046func createWriteCmd(testDir string, testFile string, writeTestFileContent string, volumeType localVolumeType) string { 1047 if volumeType == BlockLocalVolumeType { 1048 // testDir is the block device. 1049 testFileDir := filepath.Join("/tmp", testDir) 1050 testFilePath := filepath.Join(testFileDir, testFile) 1051 // Create a file containing the testFileContent. 1052 writeTestFileCmd := fmt.Sprintf("mkdir -p %s; echo %s > %s", testFileDir, writeTestFileContent, testFilePath) 1053 // sudo is needed when using ssh exec to node. 1054 // sudo is not needed and does not exist in some containers (e.g. busybox), when using pod exec. 1055 sudoCmd := fmt.Sprintf("SUDO_CMD=$(which sudo); echo ${SUDO_CMD}") 1056 // Write the testFileContent into the block device. 1057 writeBlockCmd := fmt.Sprintf("${SUDO_CMD} dd if=%s of=%s bs=512 count=100", testFilePath, testDir) 1058 // Cleanup the file containing testFileContent. 1059 deleteTestFileCmd := fmt.Sprintf("rm %s", testFilePath) 1060 return fmt.Sprintf("%s && %s && %s && %s", writeTestFileCmd, sudoCmd, writeBlockCmd, deleteTestFileCmd) 1061 } 1062 testFilePath := filepath.Join(testDir, testFile) 1063 return fmt.Sprintf("mkdir -p %s; echo %s > %s", testDir, writeTestFileContent, testFilePath) 1064} 1065 1066func createReadCmd(testFileDir string, testFile string, volumeType localVolumeType) string { 1067 if volumeType == BlockLocalVolumeType { 1068 // Create the command to read the beginning of the block device and print it in ascii. 1069 return fmt.Sprintf("hexdump -n 100 -e '100 \"%%_p\"' %s | head -1", testFileDir) 1070 } 1071 // Create the command to read (aka cat) a file. 1072 testFilePath := filepath.Join(testFileDir, testFile) 1073 return fmt.Sprintf("cat %s", testFilePath) 1074} 1075 1076// Read testFile and evaluate whether it contains the testFileContent 1077func testReadFileContent(f *framework.Framework, testFileDir string, testFile string, testFileContent string, pod *v1.Pod, volumeType localVolumeType) { 1078 readCmd := createReadCmd(testFileDir, testFile, volumeType) 1079 readOut := podRWCmdExec(f, pod, readCmd) 1080 gomega.Expect(readOut).To(gomega.ContainSubstring(testFileContent)) 1081} 1082 1083// Execute a read or write command in a pod. 1084// Fail on error 1085func podRWCmdExec(f *framework.Framework, pod *v1.Pod, cmd string) string { 1086 stdout, stderr, err := e2evolume.PodExec(f, pod, cmd) 1087 framework.Logf("podRWCmdExec cmd: %q, out: %q, stderr: %q, err: %v", cmd, stdout, stderr, err) 1088 framework.ExpectNoError(err) 1089 return stdout 1090} 1091 1092// Initialize test volume on node 1093// and create local PVC and PV 1094func setupLocalVolumesPVCsPVs( 1095 config *localTestConfig, 1096 localVolumeType localVolumeType, 1097 node *v1.Node, 1098 count int, 1099 mode storagev1.VolumeBindingMode) []*localTestVolume { 1100 1101 ginkgo.By("Initializing test volumes") 1102 testVols := setupLocalVolumes(config, localVolumeType, node, count) 1103 1104 ginkgo.By("Creating local PVCs and PVs") 1105 createLocalPVCsPVs(config, testVols, mode) 1106 1107 return testVols 1108} 1109 1110// newLocalClaim creates a new persistent volume claim. 1111func newLocalClaimWithName(config *localTestConfig, name string) *v1.PersistentVolumeClaim { 1112 claim := v1.PersistentVolumeClaim{ 1113 ObjectMeta: metav1.ObjectMeta{ 1114 Name: name, 1115 Namespace: config.ns, 1116 }, 1117 Spec: v1.PersistentVolumeClaimSpec{ 1118 StorageClassName: &config.scName, 1119 AccessModes: []v1.PersistentVolumeAccessMode{ 1120 v1.ReadWriteOnce, 1121 }, 1122 Resources: v1.ResourceRequirements{ 1123 Requests: v1.ResourceList{ 1124 v1.ResourceName(v1.ResourceStorage): resource.MustParse(testRequestSize), 1125 }, 1126 }, 1127 }, 1128 } 1129 1130 return &claim 1131} 1132 1133func createStatefulSet(config *localTestConfig, ssReplicas int32, volumeCount int, anti, parallel bool) *appsv1.StatefulSet { 1134 mounts := []v1.VolumeMount{} 1135 claims := []v1.PersistentVolumeClaim{} 1136 for i := 0; i < volumeCount; i++ { 1137 name := fmt.Sprintf("vol%v", i+1) 1138 pvc := newLocalClaimWithName(config, name) 1139 mounts = append(mounts, v1.VolumeMount{Name: name, MountPath: "/" + name}) 1140 claims = append(claims, *pvc) 1141 } 1142 1143 podAffinityTerms := []v1.PodAffinityTerm{ 1144 { 1145 LabelSelector: &metav1.LabelSelector{ 1146 MatchExpressions: []metav1.LabelSelectorRequirement{ 1147 { 1148 Key: "app", 1149 Operator: metav1.LabelSelectorOpIn, 1150 Values: []string{"local-volume-test"}, 1151 }, 1152 }, 1153 }, 1154 TopologyKey: "kubernetes.io/hostname", 1155 }, 1156 } 1157 1158 affinity := v1.Affinity{} 1159 if anti { 1160 affinity.PodAntiAffinity = &v1.PodAntiAffinity{ 1161 RequiredDuringSchedulingIgnoredDuringExecution: podAffinityTerms, 1162 } 1163 } else { 1164 affinity.PodAffinity = &v1.PodAffinity{ 1165 RequiredDuringSchedulingIgnoredDuringExecution: podAffinityTerms, 1166 } 1167 } 1168 1169 labels := map[string]string{"app": "local-volume-test"} 1170 spec := &appsv1.StatefulSet{ 1171 ObjectMeta: metav1.ObjectMeta{ 1172 Name: "local-volume-statefulset", 1173 Namespace: config.ns, 1174 }, 1175 Spec: appsv1.StatefulSetSpec{ 1176 Selector: &metav1.LabelSelector{ 1177 MatchLabels: map[string]string{"app": "local-volume-test"}, 1178 }, 1179 Replicas: &ssReplicas, 1180 Template: v1.PodTemplateSpec{ 1181 ObjectMeta: metav1.ObjectMeta{ 1182 Labels: labels, 1183 }, 1184 Spec: v1.PodSpec{ 1185 Containers: []v1.Container{ 1186 { 1187 Name: "nginx", 1188 Image: imageutils.GetE2EImage(imageutils.Nginx), 1189 VolumeMounts: mounts, 1190 }, 1191 }, 1192 Affinity: &affinity, 1193 }, 1194 }, 1195 VolumeClaimTemplates: claims, 1196 ServiceName: "test-service", 1197 }, 1198 } 1199 1200 if parallel { 1201 spec.Spec.PodManagementPolicy = appsv1.ParallelPodManagement 1202 } 1203 1204 ss, err := config.client.AppsV1().StatefulSets(config.ns).Create(context.TODO(), spec, metav1.CreateOptions{}) 1205 framework.ExpectNoError(err) 1206 1207 e2estatefulset.WaitForRunningAndReady(config.client, ssReplicas, ss) 1208 return ss 1209} 1210 1211func validateStatefulSet(config *localTestConfig, ss *appsv1.StatefulSet, anti bool) { 1212 pods := e2estatefulset.GetPodList(config.client, ss) 1213 1214 nodes := sets.NewString() 1215 for _, pod := range pods.Items { 1216 nodes.Insert(pod.Spec.NodeName) 1217 } 1218 1219 if anti { 1220 // Verify that each pod is on a different node 1221 framework.ExpectEqual(nodes.Len(), len(pods.Items)) 1222 } else { 1223 // Verify that all pods are on same node. 1224 framework.ExpectEqual(nodes.Len(), 1) 1225 } 1226 1227 // Validate all PVCs are bound 1228 for _, pod := range pods.Items { 1229 for _, volume := range pod.Spec.Volumes { 1230 pvcSource := volume.VolumeSource.PersistentVolumeClaim 1231 if pvcSource != nil { 1232 err := e2epv.WaitForPersistentVolumeClaimPhase( 1233 v1.ClaimBound, config.client, config.ns, pvcSource.ClaimName, framework.Poll, time.Second) 1234 framework.ExpectNoError(err) 1235 } 1236 } 1237 } 1238} 1239 1240// SkipUnlessLocalSSDExists takes in an ssdInterface (scsi/nvme) and a filesystemType (fs/block) 1241// and skips if a disk of that type does not exist on the node 1242func SkipUnlessLocalSSDExists(config *localTestConfig, ssdInterface, filesystemType string, node *v1.Node) { 1243 ssdCmd := fmt.Sprintf("ls -1 /mnt/disks/by-uuid/google-local-ssds-%s-%s/ | wc -l", ssdInterface, filesystemType) 1244 res, err := config.hostExec.Execute(ssdCmd, node) 1245 utils.LogResult(res) 1246 framework.ExpectNoError(err) 1247 num, err := strconv.Atoi(strings.TrimSpace(res.Stdout)) 1248 framework.ExpectNoError(err) 1249 if num < 1 { 1250 e2eskipper.Skipf("Requires at least 1 %s %s localSSD ", ssdInterface, filesystemType) 1251 } 1252} 1253