1/* 2Copyright 2018 The Kubernetes Authors. 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17/* 18 * This file defines various csi volume test drivers for TestSuites. 19 * 20 * There are two ways, how to prepare test drivers: 21 * 1) With containerized server (NFS, Ceph, Gluster, iSCSI, ...) 22 * It creates a server pod which defines one volume for the tests. 23 * These tests work only when privileged containers are allowed, exporting 24 * various filesystems (NFS, GlusterFS, ...) usually needs some mounting or 25 * other privileged magic in the server pod. 26 * 27 * Note that the server containers are for testing purposes only and should not 28 * be used in production. 29 * 30 * 2) With server or cloud provider outside of Kubernetes (Cinder, GCE, AWS, Azure, ...) 31 * Appropriate server or cloud provider must exist somewhere outside 32 * the tested Kubernetes cluster. CreateVolume will create a new volume to be 33 * used in the TestSuites for inlineVolume or DynamicPV tests. 34 */ 35 36package drivers 37 38import ( 39 "context" 40 "encoding/json" 41 "errors" 42 "fmt" 43 "strconv" 44 "strings" 45 "sync" 46 "time" 47 48 "github.com/onsi/ginkgo" 49 spb "google.golang.org/genproto/googleapis/rpc/status" 50 "google.golang.org/grpc/codes" 51 grpcstatus "google.golang.org/grpc/status" 52 53 appsv1 "k8s.io/api/apps/v1" 54 v1 "k8s.io/api/core/v1" 55 rbacv1 "k8s.io/api/rbac/v1" 56 storagev1 "k8s.io/api/storage/v1" 57 apierrors "k8s.io/apimachinery/pkg/api/errors" 58 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" 59 "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" 60 "k8s.io/apimachinery/pkg/util/sets" 61 "k8s.io/apimachinery/pkg/util/wait" 62 "k8s.io/client-go/kubernetes" 63 clientset "k8s.io/client-go/kubernetes" 64 "k8s.io/klog/v2" 65 "k8s.io/kubernetes/test/e2e/framework" 66 e2enode "k8s.io/kubernetes/test/e2e/framework/node" 67 e2epod "k8s.io/kubernetes/test/e2e/framework/pod" 68 e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper" 69 e2evolume "k8s.io/kubernetes/test/e2e/framework/volume" 70 mockdriver "k8s.io/kubernetes/test/e2e/storage/drivers/csi-test/driver" 71 mockservice "k8s.io/kubernetes/test/e2e/storage/drivers/csi-test/mock/service" 72 "k8s.io/kubernetes/test/e2e/storage/drivers/proxy" 73 storageframework "k8s.io/kubernetes/test/e2e/storage/framework" 74 "k8s.io/kubernetes/test/e2e/storage/utils" 75 76 "google.golang.org/grpc" 77) 78 79const ( 80 // GCEPDCSIDriverName is the name of GCE Persistent Disk CSI driver 81 GCEPDCSIDriverName = "pd.csi.storage.gke.io" 82 // GCEPDCSIZoneTopologyKey is the key of GCE Persistent Disk CSI zone topology 83 GCEPDCSIZoneTopologyKey = "topology.gke.io/zone" 84 85 // Prefix of the mock driver grpc log 86 grpcCallPrefix = "gRPCCall:" 87) 88 89// hostpathCSI 90type hostpathCSIDriver struct { 91 driverInfo storageframework.DriverInfo 92 manifests []string 93 volumeAttributes []map[string]string 94} 95 96func initHostPathCSIDriver(name string, capabilities map[storageframework.Capability]bool, volumeAttributes []map[string]string, manifests ...string) storageframework.TestDriver { 97 return &hostpathCSIDriver{ 98 driverInfo: storageframework.DriverInfo{ 99 Name: name, 100 FeatureTag: "", 101 MaxFileSize: storageframework.FileSizeMedium, 102 SupportedFsType: sets.NewString( 103 "", // Default fsType 104 ), 105 SupportedSizeRange: e2evolume.SizeRange{ 106 Min: "1Mi", 107 }, 108 Capabilities: capabilities, 109 StressTestOptions: &storageframework.StressTestOptions{ 110 NumPods: 10, 111 NumRestarts: 10, 112 }, 113 VolumeSnapshotStressTestOptions: &storageframework.VolumeSnapshotStressTestOptions{ 114 NumPods: 10, 115 NumSnapshots: 10, 116 }, 117 PerformanceTestOptions: &storageframework.PerformanceTestOptions{ 118 ProvisioningOptions: &storageframework.PerformanceTestProvisioningOptions{ 119 VolumeSize: "1Mi", 120 Count: 300, 121 // Volume provisioning metrics are compared to a high baseline. 122 // Failure to pass would suggest a performance regression. 123 ExpectedMetrics: &storageframework.Metrics{ 124 AvgLatency: 2 * time.Minute, 125 Throughput: 0.5, 126 }, 127 }, 128 }, 129 }, 130 manifests: manifests, 131 volumeAttributes: volumeAttributes, 132 } 133} 134 135var _ storageframework.TestDriver = &hostpathCSIDriver{} 136var _ storageframework.DynamicPVTestDriver = &hostpathCSIDriver{} 137var _ storageframework.SnapshottableTestDriver = &hostpathCSIDriver{} 138var _ storageframework.EphemeralTestDriver = &hostpathCSIDriver{} 139 140// InitHostPathCSIDriver returns hostpathCSIDriver that implements TestDriver interface 141func InitHostPathCSIDriver() storageframework.TestDriver { 142 capabilities := map[storageframework.Capability]bool{ 143 storageframework.CapPersistence: true, 144 storageframework.CapSnapshotDataSource: true, 145 storageframework.CapMultiPODs: true, 146 storageframework.CapBlock: true, 147 storageframework.CapPVCDataSource: true, 148 storageframework.CapControllerExpansion: true, 149 storageframework.CapOnlineExpansion: true, 150 storageframework.CapSingleNodeVolume: true, 151 152 // This is needed for the 153 // testsuites/volumelimits.go `should support volume limits` 154 // test. --maxvolumespernode=10 gets 155 // added when patching the deployment. 156 storageframework.CapVolumeLimits: true, 157 } 158 return initHostPathCSIDriver("csi-hostpath", 159 capabilities, 160 // Volume attributes don't matter, but we have to provide at least one map. 161 []map[string]string{ 162 {"foo": "bar"}, 163 }, 164 "test/e2e/testing-manifests/storage-csi/external-attacher/rbac.yaml", 165 "test/e2e/testing-manifests/storage-csi/external-provisioner/rbac.yaml", 166 "test/e2e/testing-manifests/storage-csi/external-snapshotter/csi-snapshotter/rbac-csi-snapshotter.yaml", 167 "test/e2e/testing-manifests/storage-csi/external-health-monitor/external-health-monitor-controller/rbac.yaml", 168 "test/e2e/testing-manifests/storage-csi/external-resizer/rbac.yaml", 169 "test/e2e/testing-manifests/storage-csi/hostpath/hostpath/csi-hostpath-driverinfo.yaml", 170 "test/e2e/testing-manifests/storage-csi/hostpath/hostpath/csi-hostpath-plugin.yaml", 171 "test/e2e/testing-manifests/storage-csi/hostpath/hostpath/e2e-test-rbac.yaml", 172 ) 173} 174 175func (h *hostpathCSIDriver) GetDriverInfo() *storageframework.DriverInfo { 176 return &h.driverInfo 177} 178 179func (h *hostpathCSIDriver) SkipUnsupportedTest(pattern storageframework.TestPattern) { 180 if pattern.VolType == storageframework.CSIInlineVolume && len(h.volumeAttributes) == 0 { 181 e2eskipper.Skipf("%s has no volume attributes defined, doesn't support ephemeral inline volumes", h.driverInfo.Name) 182 } 183} 184 185func (h *hostpathCSIDriver) GetDynamicProvisionStorageClass(config *storageframework.PerTestConfig, fsType string) *storagev1.StorageClass { 186 provisioner := config.GetUniqueDriverName() 187 parameters := map[string]string{} 188 ns := config.Framework.Namespace.Name 189 190 return storageframework.GetStorageClass(provisioner, parameters, nil, ns) 191} 192 193func (h *hostpathCSIDriver) GetVolume(config *storageframework.PerTestConfig, volumeNumber int) (map[string]string, bool, bool) { 194 return h.volumeAttributes[volumeNumber%len(h.volumeAttributes)], false /* not shared */, false /* read-write */ 195} 196 197func (h *hostpathCSIDriver) GetCSIDriverName(config *storageframework.PerTestConfig) string { 198 return config.GetUniqueDriverName() 199} 200 201func (h *hostpathCSIDriver) GetSnapshotClass(config *storageframework.PerTestConfig, parameters map[string]string) *unstructured.Unstructured { 202 snapshotter := config.GetUniqueDriverName() 203 ns := config.Framework.Namespace.Name 204 205 return utils.GenerateSnapshotClassSpec(snapshotter, parameters, ns) 206} 207 208func (h *hostpathCSIDriver) PrepareTest(f *framework.Framework) (*storageframework.PerTestConfig, func()) { 209 // Create secondary namespace which will be used for creating driver 210 driverNamespace := utils.CreateDriverNamespace(f) 211 driverns := driverNamespace.Name 212 testns := f.Namespace.Name 213 214 ginkgo.By(fmt.Sprintf("deploying %s driver", h.driverInfo.Name)) 215 cancelLogging := utils.StartPodLogs(f, driverNamespace) 216 cs := f.ClientSet 217 218 // The hostpath CSI driver only works when everything runs on the same node. 219 node, err := e2enode.GetRandomReadySchedulableNode(cs) 220 framework.ExpectNoError(err) 221 config := &storageframework.PerTestConfig{ 222 Driver: h, 223 Prefix: "hostpath", 224 Framework: f, 225 ClientNodeSelection: e2epod.NodeSelection{Name: node.Name}, 226 DriverNamespace: driverNamespace, 227 } 228 229 o := utils.PatchCSIOptions{ 230 OldDriverName: h.driverInfo.Name, 231 NewDriverName: config.GetUniqueDriverName(), 232 DriverContainerName: "hostpath", 233 DriverContainerArguments: []string{"--drivername=" + config.GetUniqueDriverName(), 234 // This is needed for the 235 // testsuites/volumelimits.go `should support volume limits` 236 // test. 237 "--maxvolumespernode=10", 238 // Disable volume lifecycle checks due to issue #103651 239 // TODO: enable this check once issue is resolved for csi-host-path driver. 240 "--check-volume-lifecycle=false", 241 }, 242 ProvisionerContainerName: "csi-provisioner", 243 SnapshotterContainerName: "csi-snapshotter", 244 NodeName: node.Name, 245 } 246 cleanup, err := utils.CreateFromManifests(config.Framework, driverNamespace, func(item interface{}) error { 247 if err := utils.PatchCSIDeployment(config.Framework, o, item); err != nil { 248 return err 249 } 250 251 // Remove csi-external-health-monitor-agent and 252 // csi-external-health-monitor-controller 253 // containers. The agent is obsolete. 254 // The controller is not needed for any of the 255 // tests and is causing too much overhead when 256 // running in a large cluster (see 257 // https://github.com/kubernetes/kubernetes/issues/102452#issuecomment-856991009). 258 switch item := item.(type) { 259 case *appsv1.StatefulSet: 260 var containers []v1.Container 261 for _, container := range item.Spec.Template.Spec.Containers { 262 switch container.Name { 263 case "csi-external-health-monitor-agent", "csi-external-health-monitor-controller": 264 // Remove these containers. 265 default: 266 // Keep the others. 267 containers = append(containers, container) 268 } 269 } 270 item.Spec.Template.Spec.Containers = containers 271 } 272 return nil 273 }, h.manifests...) 274 275 if err != nil { 276 framework.Failf("deploying %s driver: %v", h.driverInfo.Name, err) 277 } 278 279 cleanupFunc := generateDriverCleanupFunc( 280 f, 281 h.driverInfo.Name, 282 testns, 283 driverns, 284 cleanup, 285 cancelLogging) 286 287 return config, cleanupFunc 288} 289 290// mockCSI 291type mockCSIDriver struct { 292 driverInfo storageframework.DriverInfo 293 manifests []string 294 podInfo *bool 295 storageCapacity *bool 296 attachable bool 297 attachLimit int 298 enableTopology bool 299 enableNodeExpansion bool 300 hooks Hooks 301 tokenRequests []storagev1.TokenRequest 302 requiresRepublish *bool 303 fsGroupPolicy *storagev1.FSGroupPolicy 304 embedded bool 305 calls MockCSICalls 306 embeddedCSIDriver *mockdriver.CSIDriver 307 308 // Additional values set during PrepareTest 309 clientSet kubernetes.Interface 310 driverNamespace *v1.Namespace 311} 312 313// Hooks to be run to execute while handling gRPC calls. 314// 315// At the moment, only generic pre- and post-function call 316// hooks are implemented. Those hooks can cast the request and 317// response values if needed. More hooks inside specific 318// functions could be added if needed. 319type Hooks struct { 320 // Pre is called before invoking the mock driver's implementation of a method. 321 // If either a non-nil reply or error are returned, then those are returned to the caller. 322 Pre func(ctx context.Context, method string, request interface{}) (reply interface{}, err error) 323 324 // Post is called after invoking the mock driver's implementation of a method. 325 // What it returns is used as actual result. 326 Post func(ctx context.Context, method string, request, reply interface{}, err error) (finalReply interface{}, finalErr error) 327} 328 329// MockCSITestDriver provides additional functions specific to the CSI mock driver. 330type MockCSITestDriver interface { 331 storageframework.DynamicPVTestDriver 332 333 // GetCalls returns all currently observed gRPC calls. Only valid 334 // after PrepareTest. 335 GetCalls() ([]MockCSICall, error) 336} 337 338// CSIMockDriverOpts defines options used for csi driver 339type CSIMockDriverOpts struct { 340 RegisterDriver bool 341 DisableAttach bool 342 PodInfo *bool 343 StorageCapacity *bool 344 AttachLimit int 345 EnableTopology bool 346 EnableResizing bool 347 EnableNodeExpansion bool 348 EnableSnapshot bool 349 TokenRequests []storagev1.TokenRequest 350 RequiresRepublish *bool 351 FSGroupPolicy *storagev1.FSGroupPolicy 352 353 // Embedded defines whether the CSI mock driver runs 354 // inside the cluster (false, the default) or just a proxy 355 // runs inside the cluster and all gRPC calls are handled 356 // inside the e2e.test binary. 357 Embedded bool 358 359 // Hooks that will be called if (and only if!) the embedded 360 // mock driver is used. Beware that hooks are invoked 361 // asynchronously in different goroutines. 362 Hooks Hooks 363} 364 365// Dummy structure that parses just volume_attributes and error code out of logged CSI call 366type MockCSICall struct { 367 json string // full log entry 368 369 Method string 370 Request struct { 371 VolumeContext map[string]string `json:"volume_context"` 372 } 373 FullError struct { 374 Code codes.Code `json:"code"` 375 Message string `json:"message"` 376 } 377 Error string 378} 379 380// MockCSICalls is a Thread-safe storage for MockCSICall instances. 381type MockCSICalls struct { 382 calls []MockCSICall 383 mutex sync.Mutex 384} 385 386// Get returns all currently recorded calls. 387func (c *MockCSICalls) Get() []MockCSICall { 388 c.mutex.Lock() 389 defer c.mutex.Unlock() 390 391 return c.calls[:] 392} 393 394// Add appens one new call at the end. 395func (c *MockCSICalls) Add(call MockCSICall) { 396 c.mutex.Lock() 397 defer c.mutex.Unlock() 398 399 c.calls = append(c.calls, call) 400} 401 402// LogGRPC takes individual parameters from the mock CSI driver and adds them. 403func (c *MockCSICalls) LogGRPC(method string, request, reply interface{}, err error) { 404 // Encoding to JSON and decoding mirrors the traditional way of capturing calls. 405 // Probably could be simplified now... 406 logMessage := struct { 407 Method string 408 Request interface{} 409 Response interface{} 410 // Error as string, for backward compatibility. 411 // "" on no error. 412 Error string 413 // Full error dump, to be able to parse out full gRPC error code and message separately in a test. 414 FullError *spb.Status 415 }{ 416 Method: method, 417 Request: request, 418 Response: reply, 419 } 420 421 if err != nil { 422 logMessage.Error = err.Error() 423 logMessage.FullError = grpcstatus.Convert(err).Proto() 424 } 425 426 msg, _ := json.Marshal(logMessage) 427 call := MockCSICall{ 428 json: string(msg), 429 } 430 json.Unmarshal(msg, &call) 431 432 klog.Infof("%s %s", grpcCallPrefix, string(msg)) 433 434 // Trim gRPC service name, i.e. "/csi.v1.Identity/Probe" -> "Probe" 435 methodParts := strings.Split(call.Method, "/") 436 call.Method = methodParts[len(methodParts)-1] 437 438 c.Add(call) 439} 440 441var _ storageframework.TestDriver = &mockCSIDriver{} 442var _ storageframework.DynamicPVTestDriver = &mockCSIDriver{} 443var _ storageframework.SnapshottableTestDriver = &mockCSIDriver{} 444 445// InitMockCSIDriver returns a mockCSIDriver that implements TestDriver interface 446func InitMockCSIDriver(driverOpts CSIMockDriverOpts) MockCSITestDriver { 447 driverManifests := []string{ 448 "test/e2e/testing-manifests/storage-csi/external-attacher/rbac.yaml", 449 "test/e2e/testing-manifests/storage-csi/external-provisioner/rbac.yaml", 450 "test/e2e/testing-manifests/storage-csi/external-resizer/rbac.yaml", 451 "test/e2e/testing-manifests/storage-csi/external-snapshotter/csi-snapshotter/rbac-csi-snapshotter.yaml", 452 "test/e2e/testing-manifests/storage-csi/mock/csi-mock-rbac.yaml", 453 "test/e2e/testing-manifests/storage-csi/mock/csi-storageclass.yaml", 454 } 455 if driverOpts.Embedded { 456 driverManifests = append(driverManifests, "test/e2e/testing-manifests/storage-csi/mock/csi-mock-proxy.yaml") 457 } else { 458 driverManifests = append(driverManifests, "test/e2e/testing-manifests/storage-csi/mock/csi-mock-driver.yaml") 459 } 460 461 if driverOpts.RegisterDriver { 462 driverManifests = append(driverManifests, "test/e2e/testing-manifests/storage-csi/mock/csi-mock-driverinfo.yaml") 463 } 464 465 if !driverOpts.DisableAttach { 466 driverManifests = append(driverManifests, "test/e2e/testing-manifests/storage-csi/mock/csi-mock-driver-attacher.yaml") 467 } 468 469 if driverOpts.EnableResizing { 470 driverManifests = append(driverManifests, "test/e2e/testing-manifests/storage-csi/mock/csi-mock-driver-resizer.yaml") 471 } 472 473 if driverOpts.EnableSnapshot { 474 driverManifests = append(driverManifests, "test/e2e/testing-manifests/storage-csi/mock/csi-mock-driver-snapshotter.yaml") 475 } 476 477 return &mockCSIDriver{ 478 driverInfo: storageframework.DriverInfo{ 479 Name: "csi-mock", 480 FeatureTag: "", 481 MaxFileSize: storageframework.FileSizeMedium, 482 SupportedFsType: sets.NewString( 483 "", // Default fsType 484 ), 485 Capabilities: map[storageframework.Capability]bool{ 486 storageframework.CapPersistence: false, 487 storageframework.CapFsGroup: false, 488 storageframework.CapExec: false, 489 storageframework.CapVolumeLimits: true, 490 }, 491 }, 492 manifests: driverManifests, 493 podInfo: driverOpts.PodInfo, 494 storageCapacity: driverOpts.StorageCapacity, 495 enableTopology: driverOpts.EnableTopology, 496 attachable: !driverOpts.DisableAttach, 497 attachLimit: driverOpts.AttachLimit, 498 enableNodeExpansion: driverOpts.EnableNodeExpansion, 499 tokenRequests: driverOpts.TokenRequests, 500 requiresRepublish: driverOpts.RequiresRepublish, 501 fsGroupPolicy: driverOpts.FSGroupPolicy, 502 embedded: driverOpts.Embedded, 503 hooks: driverOpts.Hooks, 504 } 505} 506 507func (m *mockCSIDriver) GetDriverInfo() *storageframework.DriverInfo { 508 return &m.driverInfo 509} 510 511func (m *mockCSIDriver) SkipUnsupportedTest(pattern storageframework.TestPattern) { 512} 513 514func (m *mockCSIDriver) GetDynamicProvisionStorageClass(config *storageframework.PerTestConfig, fsType string) *storagev1.StorageClass { 515 provisioner := config.GetUniqueDriverName() 516 parameters := map[string]string{} 517 ns := config.Framework.Namespace.Name 518 519 return storageframework.GetStorageClass(provisioner, parameters, nil, ns) 520} 521 522func (m *mockCSIDriver) GetSnapshotClass(config *storageframework.PerTestConfig, parameters map[string]string) *unstructured.Unstructured { 523 snapshotter := m.driverInfo.Name + "-" + config.Framework.UniqueName 524 ns := config.Framework.Namespace.Name 525 526 return utils.GenerateSnapshotClassSpec(snapshotter, parameters, ns) 527} 528 529func (m *mockCSIDriver) PrepareTest(f *framework.Framework) (*storageframework.PerTestConfig, func()) { 530 m.clientSet = f.ClientSet 531 532 // Create secondary namespace which will be used for creating driver 533 m.driverNamespace = utils.CreateDriverNamespace(f) 534 driverns := m.driverNamespace.Name 535 testns := f.Namespace.Name 536 537 if m.embedded { 538 ginkgo.By("deploying csi mock proxy") 539 } else { 540 ginkgo.By("deploying csi mock driver") 541 } 542 cancelLogging := utils.StartPodLogs(f, m.driverNamespace) 543 cs := f.ClientSet 544 545 // pods should be scheduled on the node 546 node, err := e2enode.GetRandomReadySchedulableNode(cs) 547 framework.ExpectNoError(err) 548 549 embeddedCleanup := func() {} 550 containerArgs := []string{} 551 if m.embedded { 552 // Run embedded CSI driver. 553 // 554 // For now we start exactly one instance which implements controller, 555 // node and identity services. It matches with the one pod that we run 556 // inside the cluster. The name and namespace of that one is deterministic, 557 // so we know what to connect to. 558 // 559 // Long-term we could also deploy one central controller and multiple 560 // node instances, with knowledge about provisioned volumes shared in 561 // this process. 562 podname := "csi-mockplugin-0" 563 containername := "mock" 564 ctx, cancel := context.WithCancel(context.Background()) 565 serviceConfig := mockservice.Config{ 566 DisableAttach: !m.attachable, 567 DriverName: "csi-mock-" + f.UniqueName, 568 AttachLimit: int64(m.attachLimit), 569 NodeExpansionRequired: m.enableNodeExpansion, 570 EnableTopology: m.enableTopology, 571 IO: proxy.PodDirIO{ 572 F: f, 573 Namespace: m.driverNamespace.Name, 574 PodName: podname, 575 ContainerName: "busybox", 576 }, 577 } 578 s := mockservice.New(serviceConfig) 579 servers := &mockdriver.CSIDriverServers{ 580 Controller: s, 581 Identity: s, 582 Node: s, 583 } 584 m.embeddedCSIDriver = mockdriver.NewCSIDriver(servers) 585 586 l, err := proxy.Listen(ctx, f.ClientSet, f.ClientConfig(), 587 proxy.Addr{ 588 Namespace: m.driverNamespace.Name, 589 PodName: podname, 590 ContainerName: containername, 591 Port: 9000, 592 }, 593 ) 594 framework.ExpectNoError(err, "start connecting to proxy pod") 595 err = m.embeddedCSIDriver.Start(l, m.interceptGRPC) 596 framework.ExpectNoError(err, "start mock driver") 597 598 embeddedCleanup = func() { 599 // Kill all goroutines and delete resources of the mock driver. 600 m.embeddedCSIDriver.Stop() 601 l.Close() 602 cancel() 603 } 604 } else { 605 // When using the mock driver inside the cluster it has to be reconfigured 606 // via command line parameters. 607 containerArgs = append(containerArgs, "--name=csi-mock-"+f.UniqueName) 608 609 if !m.attachable { 610 containerArgs = append(containerArgs, "--disable-attach") 611 } 612 613 if m.enableTopology { 614 containerArgs = append(containerArgs, "--enable-topology") 615 } 616 617 if m.attachLimit > 0 { 618 containerArgs = append(containerArgs, "--attach-limit", strconv.Itoa(m.attachLimit)) 619 } 620 621 if m.enableNodeExpansion { 622 containerArgs = append(containerArgs, "--node-expand-required=true") 623 } 624 } 625 626 config := &storageframework.PerTestConfig{ 627 Driver: m, 628 Prefix: "mock", 629 Framework: f, 630 ClientNodeSelection: e2epod.NodeSelection{Name: node.Name}, 631 DriverNamespace: m.driverNamespace, 632 } 633 634 o := utils.PatchCSIOptions{ 635 OldDriverName: "csi-mock", 636 NewDriverName: "csi-mock-" + f.UniqueName, 637 DriverContainerName: "mock", 638 DriverContainerArguments: containerArgs, 639 ProvisionerContainerName: "csi-provisioner", 640 NodeName: node.Name, 641 PodInfo: m.podInfo, 642 StorageCapacity: m.storageCapacity, 643 CanAttach: &m.attachable, 644 VolumeLifecycleModes: &[]storagev1.VolumeLifecycleMode{ 645 storagev1.VolumeLifecyclePersistent, 646 storagev1.VolumeLifecycleEphemeral, 647 }, 648 TokenRequests: m.tokenRequests, 649 RequiresRepublish: m.requiresRepublish, 650 FSGroupPolicy: m.fsGroupPolicy, 651 } 652 cleanup, err := utils.CreateFromManifests(f, m.driverNamespace, func(item interface{}) error { 653 if err := utils.PatchCSIDeployment(config.Framework, o, item); err != nil { 654 return err 655 } 656 657 switch item := item.(type) { 658 case *rbacv1.ClusterRole: 659 if strings.HasPrefix(item.Name, "external-snapshotter-runner") { 660 // Re-enable access to secrets for the snapshotter sidecar for 661 // https://github.com/kubernetes/kubernetes/blob/6ede5ca95f78478fa627ecfea8136e0dff34436b/test/e2e/storage/csi_mock_volume.go#L1539-L1548 662 // It was disabled in https://github.com/kubernetes-csi/external-snapshotter/blob/501cc505846c03ee665355132f2da0ce7d5d747d/deploy/kubernetes/csi-snapshotter/rbac-csi-snapshotter.yaml#L26-L32 663 item.Rules = append(item.Rules, rbacv1.PolicyRule{ 664 APIGroups: []string{""}, 665 Resources: []string{"secrets"}, 666 Verbs: []string{"get", "list"}, 667 }) 668 } 669 } 670 671 return nil 672 }, m.manifests...) 673 674 if err != nil { 675 framework.Failf("deploying csi mock driver: %v", err) 676 } 677 678 driverCleanupFunc := generateDriverCleanupFunc( 679 f, 680 "mock", 681 testns, 682 driverns, 683 cleanup, 684 cancelLogging) 685 686 cleanupFunc := func() { 687 embeddedCleanup() 688 driverCleanupFunc() 689 } 690 691 return config, cleanupFunc 692} 693 694func (m *mockCSIDriver) interceptGRPC(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { 695 defer func() { 696 // Always log the call and its final result, 697 // regardless whether the result was from the real 698 // implementation or a hook. 699 m.calls.LogGRPC(info.FullMethod, req, resp, err) 700 }() 701 702 if m.hooks.Pre != nil { 703 resp, err = m.hooks.Pre(ctx, info.FullMethod, req) 704 if resp != nil || err != nil { 705 return 706 } 707 } 708 resp, err = handler(ctx, req) 709 if m.hooks.Post != nil { 710 resp, err = m.hooks.Post(ctx, info.FullMethod, req, resp, err) 711 } 712 return 713} 714 715func (m *mockCSIDriver) GetCalls() ([]MockCSICall, error) { 716 if m.embedded { 717 return m.calls.Get(), nil 718 } 719 720 if m.driverNamespace == nil { 721 return nil, errors.New("PrepareTest not called yet") 722 } 723 724 // Name of CSI driver pod name (it's in a StatefulSet with a stable name) 725 driverPodName := "csi-mockplugin-0" 726 // Name of CSI driver container name 727 driverContainerName := "mock" 728 729 // Load logs of driver pod 730 log, err := e2epod.GetPodLogs(m.clientSet, m.driverNamespace.Name, driverPodName, driverContainerName) 731 if err != nil { 732 return nil, fmt.Errorf("could not load CSI driver logs: %s", err) 733 } 734 735 logLines := strings.Split(log, "\n") 736 var calls []MockCSICall 737 for _, line := range logLines { 738 index := strings.Index(line, grpcCallPrefix) 739 if index == -1 { 740 continue 741 } 742 line = line[index+len(grpcCallPrefix):] 743 call := MockCSICall{ 744 json: string(line), 745 } 746 err := json.Unmarshal([]byte(line), &call) 747 if err != nil { 748 framework.Logf("Could not parse CSI driver log line %q: %s", line, err) 749 continue 750 } 751 752 // Trim gRPC service name, i.e. "/csi.v1.Identity/Probe" -> "Probe" 753 methodParts := strings.Split(call.Method, "/") 754 call.Method = methodParts[len(methodParts)-1] 755 756 calls = append(calls, call) 757 } 758 return calls, nil 759} 760 761// gce-pd 762type gcePDCSIDriver struct { 763 driverInfo storageframework.DriverInfo 764} 765 766var _ storageframework.TestDriver = &gcePDCSIDriver{} 767var _ storageframework.DynamicPVTestDriver = &gcePDCSIDriver{} 768var _ storageframework.SnapshottableTestDriver = &gcePDCSIDriver{} 769 770// InitGcePDCSIDriver returns gcePDCSIDriver that implements TestDriver interface 771func InitGcePDCSIDriver() storageframework.TestDriver { 772 return &gcePDCSIDriver{ 773 driverInfo: storageframework.DriverInfo{ 774 Name: GCEPDCSIDriverName, 775 FeatureTag: "[Serial]", 776 MaxFileSize: storageframework.FileSizeMedium, 777 SupportedSizeRange: e2evolume.SizeRange{ 778 Min: "5Gi", 779 }, 780 SupportedFsType: sets.NewString( 781 "", // Default fsType 782 "ext2", 783 "ext3", 784 "ext4", 785 "xfs", 786 ), 787 SupportedMountOption: sets.NewString("debug", "nouid32"), 788 Capabilities: map[storageframework.Capability]bool{ 789 storageframework.CapPersistence: true, 790 storageframework.CapBlock: true, 791 storageframework.CapFsGroup: true, 792 storageframework.CapExec: true, 793 storageframework.CapMultiPODs: true, 794 // GCE supports volume limits, but the test creates large 795 // number of volumes and times out test suites. 796 storageframework.CapVolumeLimits: false, 797 storageframework.CapTopology: true, 798 storageframework.CapControllerExpansion: true, 799 storageframework.CapOnlineExpansion: true, 800 storageframework.CapNodeExpansion: true, 801 storageframework.CapSnapshotDataSource: true, 802 }, 803 RequiredAccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce}, 804 TopologyKeys: []string{GCEPDCSIZoneTopologyKey}, 805 StressTestOptions: &storageframework.StressTestOptions{ 806 NumPods: 10, 807 NumRestarts: 10, 808 }, 809 VolumeSnapshotStressTestOptions: &storageframework.VolumeSnapshotStressTestOptions{ 810 // GCE only allows for one snapshot per volume to be created at a time, 811 // which can cause test timeouts. We reduce the likelihood of test timeouts 812 // by increasing the number of pods (and volumes) and reducing the number 813 // of snapshots per volume. 814 NumPods: 20, 815 NumSnapshots: 2, 816 }, 817 }, 818 } 819} 820 821func (g *gcePDCSIDriver) GetDriverInfo() *storageframework.DriverInfo { 822 return &g.driverInfo 823} 824 825func (g *gcePDCSIDriver) SkipUnsupportedTest(pattern storageframework.TestPattern) { 826 e2eskipper.SkipUnlessProviderIs("gce", "gke") 827 if pattern.FsType == "xfs" { 828 e2eskipper.SkipUnlessNodeOSDistroIs("ubuntu", "custom") 829 } 830 if pattern.FeatureTag == "[Feature:Windows]" { 831 e2eskipper.Skipf("Skipping tests for windows since CSI does not support it yet") 832 } 833} 834 835func (g *gcePDCSIDriver) GetDynamicProvisionStorageClass(config *storageframework.PerTestConfig, fsType string) *storagev1.StorageClass { 836 ns := config.Framework.Namespace.Name 837 provisioner := g.driverInfo.Name 838 839 parameters := map[string]string{"type": "pd-standard"} 840 if fsType != "" { 841 parameters["csi.storage.k8s.io/fstype"] = fsType 842 } 843 delayedBinding := storagev1.VolumeBindingWaitForFirstConsumer 844 845 return storageframework.GetStorageClass(provisioner, parameters, &delayedBinding, ns) 846} 847 848func (g *gcePDCSIDriver) GetSnapshotClass(config *storageframework.PerTestConfig, parameters map[string]string) *unstructured.Unstructured { 849 snapshotter := g.driverInfo.Name 850 ns := config.Framework.Namespace.Name 851 852 return utils.GenerateSnapshotClassSpec(snapshotter, parameters, ns) 853} 854 855func (g *gcePDCSIDriver) PrepareTest(f *framework.Framework) (*storageframework.PerTestConfig, func()) { 856 testns := f.Namespace.Name 857 cfg := &storageframework.PerTestConfig{ 858 Driver: g, 859 Prefix: "gcepd", 860 Framework: f, 861 } 862 863 if framework.ProviderIs("gke") { 864 framework.Logf("The csi gce-pd driver is automatically installed in GKE. Skipping driver installation.") 865 return cfg, func() {} 866 } 867 868 ginkgo.By("deploying csi gce-pd driver") 869 // Create secondary namespace which will be used for creating driver 870 driverNamespace := utils.CreateDriverNamespace(f) 871 driverns := driverNamespace.Name 872 873 cancelLogging := utils.StartPodLogs(f, driverNamespace) 874 // It would be safer to rename the gcePD driver, but that 875 // hasn't been done before either and attempts to do so now led to 876 // errors during driver registration, therefore it is disabled 877 // by passing a nil function below. 878 // 879 // These are the options which would have to be used: 880 // o := utils.PatchCSIOptions{ 881 // OldDriverName: g.driverInfo.Name, 882 // NewDriverName: storageframework.GetUniqueDriverName(g), 883 // DriverContainerName: "gce-driver", 884 // ProvisionerContainerName: "csi-external-provisioner", 885 // } 886 createGCESecrets(f.ClientSet, driverns) 887 888 manifests := []string{ 889 "test/e2e/testing-manifests/storage-csi/external-attacher/rbac.yaml", 890 "test/e2e/testing-manifests/storage-csi/external-provisioner/rbac.yaml", 891 "test/e2e/testing-manifests/storage-csi/gce-pd/csi-controller-rbac.yaml", 892 "test/e2e/testing-manifests/storage-csi/gce-pd/node_ds.yaml", 893 "test/e2e/testing-manifests/storage-csi/gce-pd/controller_ss.yaml", 894 } 895 896 cleanup, err := utils.CreateFromManifests(f, driverNamespace, nil, manifests...) 897 if err != nil { 898 framework.Failf("deploying csi gce-pd driver: %v", err) 899 } 900 901 if err = WaitForCSIDriverRegistrationOnAllNodes(GCEPDCSIDriverName, f.ClientSet); err != nil { 902 framework.Failf("waiting for csi driver node registration on: %v", err) 903 } 904 905 cleanupFunc := generateDriverCleanupFunc( 906 f, 907 "gce-pd", 908 testns, 909 driverns, 910 cleanup, 911 cancelLogging) 912 913 return &storageframework.PerTestConfig{ 914 Driver: g, 915 Prefix: "gcepd", 916 Framework: f, 917 DriverNamespace: driverNamespace, 918 }, cleanupFunc 919} 920 921// WaitForCSIDriverRegistrationOnAllNodes waits for the CSINode object to be updated 922// with the given driver on all schedulable nodes. 923func WaitForCSIDriverRegistrationOnAllNodes(driverName string, cs clientset.Interface) error { 924 nodes, err := e2enode.GetReadySchedulableNodes(cs) 925 if err != nil { 926 return err 927 } 928 for _, node := range nodes.Items { 929 if err := WaitForCSIDriverRegistrationOnNode(node.Name, driverName, cs); err != nil { 930 return err 931 } 932 } 933 return nil 934} 935 936// WaitForCSIDriverRegistrationOnNode waits for the CSINode object generated by the node-registrar on a certain node 937func WaitForCSIDriverRegistrationOnNode(nodeName string, driverName string, cs clientset.Interface) error { 938 framework.Logf("waiting for CSIDriver %v to register on node %v", driverName, nodeName) 939 940 // About 8.6 minutes timeout 941 backoff := wait.Backoff{ 942 Duration: 2 * time.Second, 943 Factor: 1.5, 944 Steps: 12, 945 } 946 947 waitErr := wait.ExponentialBackoff(backoff, func() (bool, error) { 948 csiNode, err := cs.StorageV1().CSINodes().Get(context.TODO(), nodeName, metav1.GetOptions{}) 949 if err != nil && !apierrors.IsNotFound(err) { 950 return false, err 951 } 952 for _, driver := range csiNode.Spec.Drivers { 953 if driver.Name == driverName { 954 return true, nil 955 } 956 } 957 return false, nil 958 }) 959 if waitErr != nil { 960 return fmt.Errorf("error waiting for CSI driver %s registration on node %s: %v", driverName, nodeName, waitErr) 961 } 962 return nil 963} 964 965func tryFunc(f func()) error { 966 var err error 967 if f == nil { 968 return nil 969 } 970 defer func() { 971 if recoverError := recover(); recoverError != nil { 972 err = fmt.Errorf("%v", recoverError) 973 } 974 }() 975 f() 976 return err 977} 978 979func generateDriverCleanupFunc( 980 f *framework.Framework, 981 driverName, testns, driverns string, 982 driverCleanup, cancelLogging func()) func() { 983 984 cleanupHandle := new(framework.CleanupActionHandle) 985 986 // Cleanup CSI driver and namespaces. This function needs to be idempotent and can be 987 // concurrently called from defer (or AfterEach) and AfterSuite action hooks. 988 cleanupFunc := func() { 989 ginkgo.By(fmt.Sprintf("deleting the test namespace: %s", testns)) 990 // Delete the primary namespace but it's okay to fail here because this namespace will 991 // also be deleted by framework.Aftereach hook 992 tryFunc(func() { f.DeleteNamespace(testns) }) 993 994 ginkgo.By(fmt.Sprintf("uninstalling csi %s driver", driverName)) 995 tryFunc(driverCleanup) 996 tryFunc(cancelLogging) 997 998 ginkgo.By(fmt.Sprintf("deleting the driver namespace: %s", driverns)) 999 tryFunc(func() { f.DeleteNamespace(driverns) }) 1000 // cleanup function has already ran and hence we don't need to run it again. 1001 // We do this as very last action because in-case defer(or AfterEach) races 1002 // with AfterSuite and test routine gets killed then this block still 1003 // runs in AfterSuite 1004 framework.RemoveCleanupAction(*cleanupHandle) 1005 } 1006 1007 *cleanupHandle = framework.AddCleanupAction(cleanupFunc) 1008 return cleanupFunc 1009} 1010