1/*
2Copyright 2018 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17/*
18 * This file defines various csi volume test drivers for TestSuites.
19 *
20 * There are two ways, how to prepare test drivers:
21 * 1) With containerized server (NFS, Ceph, Gluster, iSCSI, ...)
22 * It creates a server pod which defines one volume for the tests.
23 * These tests work only when privileged containers are allowed, exporting
24 * various filesystems (NFS, GlusterFS, ...) usually needs some mounting or
25 * other privileged magic in the server pod.
26 *
27 * Note that the server containers are for testing purposes only and should not
28 * be used in production.
29 *
30 * 2) With server or cloud provider outside of Kubernetes (Cinder, GCE, AWS, Azure, ...)
31 * Appropriate server or cloud provider must exist somewhere outside
32 * the tested Kubernetes cluster. CreateVolume will create a new volume to be
33 * used in the TestSuites for inlineVolume or DynamicPV tests.
34 */
35
36package drivers
37
38import (
39	"context"
40	"encoding/json"
41	"errors"
42	"fmt"
43	"strconv"
44	"strings"
45	"sync"
46	"time"
47
48	"github.com/onsi/ginkgo"
49	spb "google.golang.org/genproto/googleapis/rpc/status"
50	"google.golang.org/grpc/codes"
51	grpcstatus "google.golang.org/grpc/status"
52
53	appsv1 "k8s.io/api/apps/v1"
54	v1 "k8s.io/api/core/v1"
55	rbacv1 "k8s.io/api/rbac/v1"
56	storagev1 "k8s.io/api/storage/v1"
57	apierrors "k8s.io/apimachinery/pkg/api/errors"
58	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
59	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
60	"k8s.io/apimachinery/pkg/util/sets"
61	"k8s.io/apimachinery/pkg/util/wait"
62	"k8s.io/client-go/kubernetes"
63	clientset "k8s.io/client-go/kubernetes"
64	"k8s.io/klog/v2"
65	"k8s.io/kubernetes/test/e2e/framework"
66	e2enode "k8s.io/kubernetes/test/e2e/framework/node"
67	e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
68	e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
69	e2evolume "k8s.io/kubernetes/test/e2e/framework/volume"
70	mockdriver "k8s.io/kubernetes/test/e2e/storage/drivers/csi-test/driver"
71	mockservice "k8s.io/kubernetes/test/e2e/storage/drivers/csi-test/mock/service"
72	"k8s.io/kubernetes/test/e2e/storage/drivers/proxy"
73	storageframework "k8s.io/kubernetes/test/e2e/storage/framework"
74	"k8s.io/kubernetes/test/e2e/storage/utils"
75
76	"google.golang.org/grpc"
77)
78
79const (
80	// GCEPDCSIDriverName is the name of GCE Persistent Disk CSI driver
81	GCEPDCSIDriverName = "pd.csi.storage.gke.io"
82	// GCEPDCSIZoneTopologyKey is the key of GCE Persistent Disk CSI zone topology
83	GCEPDCSIZoneTopologyKey = "topology.gke.io/zone"
84
85	// Prefix of the mock driver grpc log
86	grpcCallPrefix = "gRPCCall:"
87)
88
89// hostpathCSI
90type hostpathCSIDriver struct {
91	driverInfo       storageframework.DriverInfo
92	manifests        []string
93	volumeAttributes []map[string]string
94}
95
96func initHostPathCSIDriver(name string, capabilities map[storageframework.Capability]bool, volumeAttributes []map[string]string, manifests ...string) storageframework.TestDriver {
97	return &hostpathCSIDriver{
98		driverInfo: storageframework.DriverInfo{
99			Name:        name,
100			FeatureTag:  "",
101			MaxFileSize: storageframework.FileSizeMedium,
102			SupportedFsType: sets.NewString(
103				"", // Default fsType
104			),
105			SupportedSizeRange: e2evolume.SizeRange{
106				Min: "1Mi",
107			},
108			Capabilities: capabilities,
109			StressTestOptions: &storageframework.StressTestOptions{
110				NumPods:     10,
111				NumRestarts: 10,
112			},
113			VolumeSnapshotStressTestOptions: &storageframework.VolumeSnapshotStressTestOptions{
114				NumPods:      10,
115				NumSnapshots: 10,
116			},
117			PerformanceTestOptions: &storageframework.PerformanceTestOptions{
118				ProvisioningOptions: &storageframework.PerformanceTestProvisioningOptions{
119					VolumeSize: "1Mi",
120					Count:      300,
121					// Volume provisioning metrics are compared to a high baseline.
122					// Failure to pass would suggest a performance regression.
123					ExpectedMetrics: &storageframework.Metrics{
124						AvgLatency: 2 * time.Minute,
125						Throughput: 0.5,
126					},
127				},
128			},
129		},
130		manifests:        manifests,
131		volumeAttributes: volumeAttributes,
132	}
133}
134
135var _ storageframework.TestDriver = &hostpathCSIDriver{}
136var _ storageframework.DynamicPVTestDriver = &hostpathCSIDriver{}
137var _ storageframework.SnapshottableTestDriver = &hostpathCSIDriver{}
138var _ storageframework.EphemeralTestDriver = &hostpathCSIDriver{}
139
140// InitHostPathCSIDriver returns hostpathCSIDriver that implements TestDriver interface
141func InitHostPathCSIDriver() storageframework.TestDriver {
142	capabilities := map[storageframework.Capability]bool{
143		storageframework.CapPersistence:         true,
144		storageframework.CapSnapshotDataSource:  true,
145		storageframework.CapMultiPODs:           true,
146		storageframework.CapBlock:               true,
147		storageframework.CapPVCDataSource:       true,
148		storageframework.CapControllerExpansion: true,
149		storageframework.CapOnlineExpansion:     true,
150		storageframework.CapSingleNodeVolume:    true,
151
152		// This is needed for the
153		// testsuites/volumelimits.go `should support volume limits`
154		// test. --maxvolumespernode=10 gets
155		// added when patching the deployment.
156		storageframework.CapVolumeLimits: true,
157	}
158	return initHostPathCSIDriver("csi-hostpath",
159		capabilities,
160		// Volume attributes don't matter, but we have to provide at least one map.
161		[]map[string]string{
162			{"foo": "bar"},
163		},
164		"test/e2e/testing-manifests/storage-csi/external-attacher/rbac.yaml",
165		"test/e2e/testing-manifests/storage-csi/external-provisioner/rbac.yaml",
166		"test/e2e/testing-manifests/storage-csi/external-snapshotter/csi-snapshotter/rbac-csi-snapshotter.yaml",
167		"test/e2e/testing-manifests/storage-csi/external-health-monitor/external-health-monitor-controller/rbac.yaml",
168		"test/e2e/testing-manifests/storage-csi/external-resizer/rbac.yaml",
169		"test/e2e/testing-manifests/storage-csi/hostpath/hostpath/csi-hostpath-driverinfo.yaml",
170		"test/e2e/testing-manifests/storage-csi/hostpath/hostpath/csi-hostpath-plugin.yaml",
171		"test/e2e/testing-manifests/storage-csi/hostpath/hostpath/e2e-test-rbac.yaml",
172	)
173}
174
175func (h *hostpathCSIDriver) GetDriverInfo() *storageframework.DriverInfo {
176	return &h.driverInfo
177}
178
179func (h *hostpathCSIDriver) SkipUnsupportedTest(pattern storageframework.TestPattern) {
180	if pattern.VolType == storageframework.CSIInlineVolume && len(h.volumeAttributes) == 0 {
181		e2eskipper.Skipf("%s has no volume attributes defined, doesn't support ephemeral inline volumes", h.driverInfo.Name)
182	}
183}
184
185func (h *hostpathCSIDriver) GetDynamicProvisionStorageClass(config *storageframework.PerTestConfig, fsType string) *storagev1.StorageClass {
186	provisioner := config.GetUniqueDriverName()
187	parameters := map[string]string{}
188	ns := config.Framework.Namespace.Name
189
190	return storageframework.GetStorageClass(provisioner, parameters, nil, ns)
191}
192
193func (h *hostpathCSIDriver) GetVolume(config *storageframework.PerTestConfig, volumeNumber int) (map[string]string, bool, bool) {
194	return h.volumeAttributes[volumeNumber%len(h.volumeAttributes)], false /* not shared */, false /* read-write */
195}
196
197func (h *hostpathCSIDriver) GetCSIDriverName(config *storageframework.PerTestConfig) string {
198	return config.GetUniqueDriverName()
199}
200
201func (h *hostpathCSIDriver) GetSnapshotClass(config *storageframework.PerTestConfig, parameters map[string]string) *unstructured.Unstructured {
202	snapshotter := config.GetUniqueDriverName()
203	ns := config.Framework.Namespace.Name
204
205	return utils.GenerateSnapshotClassSpec(snapshotter, parameters, ns)
206}
207
208func (h *hostpathCSIDriver) PrepareTest(f *framework.Framework) (*storageframework.PerTestConfig, func()) {
209	// Create secondary namespace which will be used for creating driver
210	driverNamespace := utils.CreateDriverNamespace(f)
211	driverns := driverNamespace.Name
212	testns := f.Namespace.Name
213
214	ginkgo.By(fmt.Sprintf("deploying %s driver", h.driverInfo.Name))
215	cancelLogging := utils.StartPodLogs(f, driverNamespace)
216	cs := f.ClientSet
217
218	// The hostpath CSI driver only works when everything runs on the same node.
219	node, err := e2enode.GetRandomReadySchedulableNode(cs)
220	framework.ExpectNoError(err)
221	config := &storageframework.PerTestConfig{
222		Driver:              h,
223		Prefix:              "hostpath",
224		Framework:           f,
225		ClientNodeSelection: e2epod.NodeSelection{Name: node.Name},
226		DriverNamespace:     driverNamespace,
227	}
228
229	o := utils.PatchCSIOptions{
230		OldDriverName:       h.driverInfo.Name,
231		NewDriverName:       config.GetUniqueDriverName(),
232		DriverContainerName: "hostpath",
233		DriverContainerArguments: []string{"--drivername=" + config.GetUniqueDriverName(),
234			// This is needed for the
235			// testsuites/volumelimits.go `should support volume limits`
236			// test.
237			"--maxvolumespernode=10",
238			// Disable volume lifecycle checks due to issue #103651
239			// TODO: enable this check once issue is resolved for csi-host-path driver.
240			"--check-volume-lifecycle=false",
241		},
242		ProvisionerContainerName: "csi-provisioner",
243		SnapshotterContainerName: "csi-snapshotter",
244		NodeName:                 node.Name,
245	}
246	cleanup, err := utils.CreateFromManifests(config.Framework, driverNamespace, func(item interface{}) error {
247		if err := utils.PatchCSIDeployment(config.Framework, o, item); err != nil {
248			return err
249		}
250
251		// Remove csi-external-health-monitor-agent and
252		// csi-external-health-monitor-controller
253		// containers. The agent is obsolete.
254		// The controller is not needed for any of the
255		// tests and is causing too much overhead when
256		// running in a large cluster (see
257		// https://github.com/kubernetes/kubernetes/issues/102452#issuecomment-856991009).
258		switch item := item.(type) {
259		case *appsv1.StatefulSet:
260			var containers []v1.Container
261			for _, container := range item.Spec.Template.Spec.Containers {
262				switch container.Name {
263				case "csi-external-health-monitor-agent", "csi-external-health-monitor-controller":
264					// Remove these containers.
265				default:
266					// Keep the others.
267					containers = append(containers, container)
268				}
269			}
270			item.Spec.Template.Spec.Containers = containers
271		}
272		return nil
273	}, h.manifests...)
274
275	if err != nil {
276		framework.Failf("deploying %s driver: %v", h.driverInfo.Name, err)
277	}
278
279	cleanupFunc := generateDriverCleanupFunc(
280		f,
281		h.driverInfo.Name,
282		testns,
283		driverns,
284		cleanup,
285		cancelLogging)
286
287	return config, cleanupFunc
288}
289
290// mockCSI
291type mockCSIDriver struct {
292	driverInfo          storageframework.DriverInfo
293	manifests           []string
294	podInfo             *bool
295	storageCapacity     *bool
296	attachable          bool
297	attachLimit         int
298	enableTopology      bool
299	enableNodeExpansion bool
300	hooks               Hooks
301	tokenRequests       []storagev1.TokenRequest
302	requiresRepublish   *bool
303	fsGroupPolicy       *storagev1.FSGroupPolicy
304	embedded            bool
305	calls               MockCSICalls
306	embeddedCSIDriver   *mockdriver.CSIDriver
307
308	// Additional values set during PrepareTest
309	clientSet       kubernetes.Interface
310	driverNamespace *v1.Namespace
311}
312
313// Hooks to be run to execute while handling gRPC calls.
314//
315// At the moment, only generic pre- and post-function call
316// hooks are implemented. Those hooks can cast the request and
317// response values if needed. More hooks inside specific
318// functions could be added if needed.
319type Hooks struct {
320	// Pre is called before invoking the mock driver's implementation of a method.
321	// If either a non-nil reply or error are returned, then those are returned to the caller.
322	Pre func(ctx context.Context, method string, request interface{}) (reply interface{}, err error)
323
324	// Post is called after invoking the mock driver's implementation of a method.
325	// What it returns is used as actual result.
326	Post func(ctx context.Context, method string, request, reply interface{}, err error) (finalReply interface{}, finalErr error)
327}
328
329// MockCSITestDriver provides additional functions specific to the CSI mock driver.
330type MockCSITestDriver interface {
331	storageframework.DynamicPVTestDriver
332
333	// GetCalls returns all currently observed gRPC calls. Only valid
334	// after PrepareTest.
335	GetCalls() ([]MockCSICall, error)
336}
337
338// CSIMockDriverOpts defines options used for csi driver
339type CSIMockDriverOpts struct {
340	RegisterDriver      bool
341	DisableAttach       bool
342	PodInfo             *bool
343	StorageCapacity     *bool
344	AttachLimit         int
345	EnableTopology      bool
346	EnableResizing      bool
347	EnableNodeExpansion bool
348	EnableSnapshot      bool
349	TokenRequests       []storagev1.TokenRequest
350	RequiresRepublish   *bool
351	FSGroupPolicy       *storagev1.FSGroupPolicy
352
353	// Embedded defines whether the CSI mock driver runs
354	// inside the cluster (false, the default) or just a proxy
355	// runs inside the cluster and all gRPC calls are handled
356	// inside the e2e.test binary.
357	Embedded bool
358
359	// Hooks that will be called if (and only if!) the embedded
360	// mock driver is used. Beware that hooks are invoked
361	// asynchronously in different goroutines.
362	Hooks Hooks
363}
364
365// Dummy structure that parses just volume_attributes and error code out of logged CSI call
366type MockCSICall struct {
367	json string // full log entry
368
369	Method  string
370	Request struct {
371		VolumeContext map[string]string `json:"volume_context"`
372	}
373	FullError struct {
374		Code    codes.Code `json:"code"`
375		Message string     `json:"message"`
376	}
377	Error string
378}
379
380// MockCSICalls is a Thread-safe storage for MockCSICall instances.
381type MockCSICalls struct {
382	calls []MockCSICall
383	mutex sync.Mutex
384}
385
386// Get returns all currently recorded calls.
387func (c *MockCSICalls) Get() []MockCSICall {
388	c.mutex.Lock()
389	defer c.mutex.Unlock()
390
391	return c.calls[:]
392}
393
394// Add appens one new call at the end.
395func (c *MockCSICalls) Add(call MockCSICall) {
396	c.mutex.Lock()
397	defer c.mutex.Unlock()
398
399	c.calls = append(c.calls, call)
400}
401
402// LogGRPC takes individual parameters from the mock CSI driver and adds them.
403func (c *MockCSICalls) LogGRPC(method string, request, reply interface{}, err error) {
404	// Encoding to JSON and decoding mirrors the traditional way of capturing calls.
405	// Probably could be simplified now...
406	logMessage := struct {
407		Method   string
408		Request  interface{}
409		Response interface{}
410		// Error as string, for backward compatibility.
411		// "" on no error.
412		Error string
413		// Full error dump, to be able to parse out full gRPC error code and message separately in a test.
414		FullError *spb.Status
415	}{
416		Method:   method,
417		Request:  request,
418		Response: reply,
419	}
420
421	if err != nil {
422		logMessage.Error = err.Error()
423		logMessage.FullError = grpcstatus.Convert(err).Proto()
424	}
425
426	msg, _ := json.Marshal(logMessage)
427	call := MockCSICall{
428		json: string(msg),
429	}
430	json.Unmarshal(msg, &call)
431
432	klog.Infof("%s %s", grpcCallPrefix, string(msg))
433
434	// Trim gRPC service name, i.e. "/csi.v1.Identity/Probe" -> "Probe"
435	methodParts := strings.Split(call.Method, "/")
436	call.Method = methodParts[len(methodParts)-1]
437
438	c.Add(call)
439}
440
441var _ storageframework.TestDriver = &mockCSIDriver{}
442var _ storageframework.DynamicPVTestDriver = &mockCSIDriver{}
443var _ storageframework.SnapshottableTestDriver = &mockCSIDriver{}
444
445// InitMockCSIDriver returns a mockCSIDriver that implements TestDriver interface
446func InitMockCSIDriver(driverOpts CSIMockDriverOpts) MockCSITestDriver {
447	driverManifests := []string{
448		"test/e2e/testing-manifests/storage-csi/external-attacher/rbac.yaml",
449		"test/e2e/testing-manifests/storage-csi/external-provisioner/rbac.yaml",
450		"test/e2e/testing-manifests/storage-csi/external-resizer/rbac.yaml",
451		"test/e2e/testing-manifests/storage-csi/external-snapshotter/csi-snapshotter/rbac-csi-snapshotter.yaml",
452		"test/e2e/testing-manifests/storage-csi/mock/csi-mock-rbac.yaml",
453		"test/e2e/testing-manifests/storage-csi/mock/csi-storageclass.yaml",
454	}
455	if driverOpts.Embedded {
456		driverManifests = append(driverManifests, "test/e2e/testing-manifests/storage-csi/mock/csi-mock-proxy.yaml")
457	} else {
458		driverManifests = append(driverManifests, "test/e2e/testing-manifests/storage-csi/mock/csi-mock-driver.yaml")
459	}
460
461	if driverOpts.RegisterDriver {
462		driverManifests = append(driverManifests, "test/e2e/testing-manifests/storage-csi/mock/csi-mock-driverinfo.yaml")
463	}
464
465	if !driverOpts.DisableAttach {
466		driverManifests = append(driverManifests, "test/e2e/testing-manifests/storage-csi/mock/csi-mock-driver-attacher.yaml")
467	}
468
469	if driverOpts.EnableResizing {
470		driverManifests = append(driverManifests, "test/e2e/testing-manifests/storage-csi/mock/csi-mock-driver-resizer.yaml")
471	}
472
473	if driverOpts.EnableSnapshot {
474		driverManifests = append(driverManifests, "test/e2e/testing-manifests/storage-csi/mock/csi-mock-driver-snapshotter.yaml")
475	}
476
477	return &mockCSIDriver{
478		driverInfo: storageframework.DriverInfo{
479			Name:        "csi-mock",
480			FeatureTag:  "",
481			MaxFileSize: storageframework.FileSizeMedium,
482			SupportedFsType: sets.NewString(
483				"", // Default fsType
484			),
485			Capabilities: map[storageframework.Capability]bool{
486				storageframework.CapPersistence:  false,
487				storageframework.CapFsGroup:      false,
488				storageframework.CapExec:         false,
489				storageframework.CapVolumeLimits: true,
490			},
491		},
492		manifests:           driverManifests,
493		podInfo:             driverOpts.PodInfo,
494		storageCapacity:     driverOpts.StorageCapacity,
495		enableTopology:      driverOpts.EnableTopology,
496		attachable:          !driverOpts.DisableAttach,
497		attachLimit:         driverOpts.AttachLimit,
498		enableNodeExpansion: driverOpts.EnableNodeExpansion,
499		tokenRequests:       driverOpts.TokenRequests,
500		requiresRepublish:   driverOpts.RequiresRepublish,
501		fsGroupPolicy:       driverOpts.FSGroupPolicy,
502		embedded:            driverOpts.Embedded,
503		hooks:               driverOpts.Hooks,
504	}
505}
506
507func (m *mockCSIDriver) GetDriverInfo() *storageframework.DriverInfo {
508	return &m.driverInfo
509}
510
511func (m *mockCSIDriver) SkipUnsupportedTest(pattern storageframework.TestPattern) {
512}
513
514func (m *mockCSIDriver) GetDynamicProvisionStorageClass(config *storageframework.PerTestConfig, fsType string) *storagev1.StorageClass {
515	provisioner := config.GetUniqueDriverName()
516	parameters := map[string]string{}
517	ns := config.Framework.Namespace.Name
518
519	return storageframework.GetStorageClass(provisioner, parameters, nil, ns)
520}
521
522func (m *mockCSIDriver) GetSnapshotClass(config *storageframework.PerTestConfig, parameters map[string]string) *unstructured.Unstructured {
523	snapshotter := m.driverInfo.Name + "-" + config.Framework.UniqueName
524	ns := config.Framework.Namespace.Name
525
526	return utils.GenerateSnapshotClassSpec(snapshotter, parameters, ns)
527}
528
529func (m *mockCSIDriver) PrepareTest(f *framework.Framework) (*storageframework.PerTestConfig, func()) {
530	m.clientSet = f.ClientSet
531
532	// Create secondary namespace which will be used for creating driver
533	m.driverNamespace = utils.CreateDriverNamespace(f)
534	driverns := m.driverNamespace.Name
535	testns := f.Namespace.Name
536
537	if m.embedded {
538		ginkgo.By("deploying csi mock proxy")
539	} else {
540		ginkgo.By("deploying csi mock driver")
541	}
542	cancelLogging := utils.StartPodLogs(f, m.driverNamespace)
543	cs := f.ClientSet
544
545	// pods should be scheduled on the node
546	node, err := e2enode.GetRandomReadySchedulableNode(cs)
547	framework.ExpectNoError(err)
548
549	embeddedCleanup := func() {}
550	containerArgs := []string{}
551	if m.embedded {
552		// Run embedded CSI driver.
553		//
554		// For now we start exactly one instance which implements controller,
555		// node and identity services. It matches with the one pod that we run
556		// inside the cluster. The name and namespace of that one is deterministic,
557		// so we know what to connect to.
558		//
559		// Long-term we could also deploy one central controller and multiple
560		// node instances, with knowledge about provisioned volumes shared in
561		// this process.
562		podname := "csi-mockplugin-0"
563		containername := "mock"
564		ctx, cancel := context.WithCancel(context.Background())
565		serviceConfig := mockservice.Config{
566			DisableAttach:         !m.attachable,
567			DriverName:            "csi-mock-" + f.UniqueName,
568			AttachLimit:           int64(m.attachLimit),
569			NodeExpansionRequired: m.enableNodeExpansion,
570			EnableTopology:        m.enableTopology,
571			IO: proxy.PodDirIO{
572				F:             f,
573				Namespace:     m.driverNamespace.Name,
574				PodName:       podname,
575				ContainerName: "busybox",
576			},
577		}
578		s := mockservice.New(serviceConfig)
579		servers := &mockdriver.CSIDriverServers{
580			Controller: s,
581			Identity:   s,
582			Node:       s,
583		}
584		m.embeddedCSIDriver = mockdriver.NewCSIDriver(servers)
585
586		l, err := proxy.Listen(ctx, f.ClientSet, f.ClientConfig(),
587			proxy.Addr{
588				Namespace:     m.driverNamespace.Name,
589				PodName:       podname,
590				ContainerName: containername,
591				Port:          9000,
592			},
593		)
594		framework.ExpectNoError(err, "start connecting to proxy pod")
595		err = m.embeddedCSIDriver.Start(l, m.interceptGRPC)
596		framework.ExpectNoError(err, "start mock driver")
597
598		embeddedCleanup = func() {
599			// Kill all goroutines and delete resources of the mock driver.
600			m.embeddedCSIDriver.Stop()
601			l.Close()
602			cancel()
603		}
604	} else {
605		// When using the mock driver inside the cluster it has to be reconfigured
606		// via command line parameters.
607		containerArgs = append(containerArgs, "--name=csi-mock-"+f.UniqueName)
608
609		if !m.attachable {
610			containerArgs = append(containerArgs, "--disable-attach")
611		}
612
613		if m.enableTopology {
614			containerArgs = append(containerArgs, "--enable-topology")
615		}
616
617		if m.attachLimit > 0 {
618			containerArgs = append(containerArgs, "--attach-limit", strconv.Itoa(m.attachLimit))
619		}
620
621		if m.enableNodeExpansion {
622			containerArgs = append(containerArgs, "--node-expand-required=true")
623		}
624	}
625
626	config := &storageframework.PerTestConfig{
627		Driver:              m,
628		Prefix:              "mock",
629		Framework:           f,
630		ClientNodeSelection: e2epod.NodeSelection{Name: node.Name},
631		DriverNamespace:     m.driverNamespace,
632	}
633
634	o := utils.PatchCSIOptions{
635		OldDriverName:            "csi-mock",
636		NewDriverName:            "csi-mock-" + f.UniqueName,
637		DriverContainerName:      "mock",
638		DriverContainerArguments: containerArgs,
639		ProvisionerContainerName: "csi-provisioner",
640		NodeName:                 node.Name,
641		PodInfo:                  m.podInfo,
642		StorageCapacity:          m.storageCapacity,
643		CanAttach:                &m.attachable,
644		VolumeLifecycleModes: &[]storagev1.VolumeLifecycleMode{
645			storagev1.VolumeLifecyclePersistent,
646			storagev1.VolumeLifecycleEphemeral,
647		},
648		TokenRequests:     m.tokenRequests,
649		RequiresRepublish: m.requiresRepublish,
650		FSGroupPolicy:     m.fsGroupPolicy,
651	}
652	cleanup, err := utils.CreateFromManifests(f, m.driverNamespace, func(item interface{}) error {
653		if err := utils.PatchCSIDeployment(config.Framework, o, item); err != nil {
654			return err
655		}
656
657		switch item := item.(type) {
658		case *rbacv1.ClusterRole:
659			if strings.HasPrefix(item.Name, "external-snapshotter-runner") {
660				// Re-enable access to secrets for the snapshotter sidecar for
661				// https://github.com/kubernetes/kubernetes/blob/6ede5ca95f78478fa627ecfea8136e0dff34436b/test/e2e/storage/csi_mock_volume.go#L1539-L1548
662				// It was disabled in https://github.com/kubernetes-csi/external-snapshotter/blob/501cc505846c03ee665355132f2da0ce7d5d747d/deploy/kubernetes/csi-snapshotter/rbac-csi-snapshotter.yaml#L26-L32
663				item.Rules = append(item.Rules, rbacv1.PolicyRule{
664					APIGroups: []string{""},
665					Resources: []string{"secrets"},
666					Verbs:     []string{"get", "list"},
667				})
668			}
669		}
670
671		return nil
672	}, m.manifests...)
673
674	if err != nil {
675		framework.Failf("deploying csi mock driver: %v", err)
676	}
677
678	driverCleanupFunc := generateDriverCleanupFunc(
679		f,
680		"mock",
681		testns,
682		driverns,
683		cleanup,
684		cancelLogging)
685
686	cleanupFunc := func() {
687		embeddedCleanup()
688		driverCleanupFunc()
689	}
690
691	return config, cleanupFunc
692}
693
694func (m *mockCSIDriver) interceptGRPC(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
695	defer func() {
696		// Always log the call and its final result,
697		// regardless whether the result was from the real
698		// implementation or a hook.
699		m.calls.LogGRPC(info.FullMethod, req, resp, err)
700	}()
701
702	if m.hooks.Pre != nil {
703		resp, err = m.hooks.Pre(ctx, info.FullMethod, req)
704		if resp != nil || err != nil {
705			return
706		}
707	}
708	resp, err = handler(ctx, req)
709	if m.hooks.Post != nil {
710		resp, err = m.hooks.Post(ctx, info.FullMethod, req, resp, err)
711	}
712	return
713}
714
715func (m *mockCSIDriver) GetCalls() ([]MockCSICall, error) {
716	if m.embedded {
717		return m.calls.Get(), nil
718	}
719
720	if m.driverNamespace == nil {
721		return nil, errors.New("PrepareTest not called yet")
722	}
723
724	// Name of CSI driver pod name (it's in a StatefulSet with a stable name)
725	driverPodName := "csi-mockplugin-0"
726	// Name of CSI driver container name
727	driverContainerName := "mock"
728
729	// Load logs of driver pod
730	log, err := e2epod.GetPodLogs(m.clientSet, m.driverNamespace.Name, driverPodName, driverContainerName)
731	if err != nil {
732		return nil, fmt.Errorf("could not load CSI driver logs: %s", err)
733	}
734
735	logLines := strings.Split(log, "\n")
736	var calls []MockCSICall
737	for _, line := range logLines {
738		index := strings.Index(line, grpcCallPrefix)
739		if index == -1 {
740			continue
741		}
742		line = line[index+len(grpcCallPrefix):]
743		call := MockCSICall{
744			json: string(line),
745		}
746		err := json.Unmarshal([]byte(line), &call)
747		if err != nil {
748			framework.Logf("Could not parse CSI driver log line %q: %s", line, err)
749			continue
750		}
751
752		// Trim gRPC service name, i.e. "/csi.v1.Identity/Probe" -> "Probe"
753		methodParts := strings.Split(call.Method, "/")
754		call.Method = methodParts[len(methodParts)-1]
755
756		calls = append(calls, call)
757	}
758	return calls, nil
759}
760
761// gce-pd
762type gcePDCSIDriver struct {
763	driverInfo storageframework.DriverInfo
764}
765
766var _ storageframework.TestDriver = &gcePDCSIDriver{}
767var _ storageframework.DynamicPVTestDriver = &gcePDCSIDriver{}
768var _ storageframework.SnapshottableTestDriver = &gcePDCSIDriver{}
769
770// InitGcePDCSIDriver returns gcePDCSIDriver that implements TestDriver interface
771func InitGcePDCSIDriver() storageframework.TestDriver {
772	return &gcePDCSIDriver{
773		driverInfo: storageframework.DriverInfo{
774			Name:        GCEPDCSIDriverName,
775			FeatureTag:  "[Serial]",
776			MaxFileSize: storageframework.FileSizeMedium,
777			SupportedSizeRange: e2evolume.SizeRange{
778				Min: "5Gi",
779			},
780			SupportedFsType: sets.NewString(
781				"", // Default fsType
782				"ext2",
783				"ext3",
784				"ext4",
785				"xfs",
786			),
787			SupportedMountOption: sets.NewString("debug", "nouid32"),
788			Capabilities: map[storageframework.Capability]bool{
789				storageframework.CapPersistence: true,
790				storageframework.CapBlock:       true,
791				storageframework.CapFsGroup:     true,
792				storageframework.CapExec:        true,
793				storageframework.CapMultiPODs:   true,
794				// GCE supports volume limits, but the test creates large
795				// number of volumes and times out test suites.
796				storageframework.CapVolumeLimits:        false,
797				storageframework.CapTopology:            true,
798				storageframework.CapControllerExpansion: true,
799				storageframework.CapOnlineExpansion:     true,
800				storageframework.CapNodeExpansion:       true,
801				storageframework.CapSnapshotDataSource:  true,
802			},
803			RequiredAccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce},
804			TopologyKeys:        []string{GCEPDCSIZoneTopologyKey},
805			StressTestOptions: &storageframework.StressTestOptions{
806				NumPods:     10,
807				NumRestarts: 10,
808			},
809			VolumeSnapshotStressTestOptions: &storageframework.VolumeSnapshotStressTestOptions{
810				// GCE only allows for one snapshot per volume to be created at a time,
811				// which can cause test timeouts. We reduce the likelihood of test timeouts
812				// by increasing the number of pods (and volumes) and reducing the number
813				// of snapshots per volume.
814				NumPods:      20,
815				NumSnapshots: 2,
816			},
817		},
818	}
819}
820
821func (g *gcePDCSIDriver) GetDriverInfo() *storageframework.DriverInfo {
822	return &g.driverInfo
823}
824
825func (g *gcePDCSIDriver) SkipUnsupportedTest(pattern storageframework.TestPattern) {
826	e2eskipper.SkipUnlessProviderIs("gce", "gke")
827	if pattern.FsType == "xfs" {
828		e2eskipper.SkipUnlessNodeOSDistroIs("ubuntu", "custom")
829	}
830	if pattern.FeatureTag == "[Feature:Windows]" {
831		e2eskipper.Skipf("Skipping tests for windows since CSI does not support it yet")
832	}
833}
834
835func (g *gcePDCSIDriver) GetDynamicProvisionStorageClass(config *storageframework.PerTestConfig, fsType string) *storagev1.StorageClass {
836	ns := config.Framework.Namespace.Name
837	provisioner := g.driverInfo.Name
838
839	parameters := map[string]string{"type": "pd-standard"}
840	if fsType != "" {
841		parameters["csi.storage.k8s.io/fstype"] = fsType
842	}
843	delayedBinding := storagev1.VolumeBindingWaitForFirstConsumer
844
845	return storageframework.GetStorageClass(provisioner, parameters, &delayedBinding, ns)
846}
847
848func (g *gcePDCSIDriver) GetSnapshotClass(config *storageframework.PerTestConfig, parameters map[string]string) *unstructured.Unstructured {
849	snapshotter := g.driverInfo.Name
850	ns := config.Framework.Namespace.Name
851
852	return utils.GenerateSnapshotClassSpec(snapshotter, parameters, ns)
853}
854
855func (g *gcePDCSIDriver) PrepareTest(f *framework.Framework) (*storageframework.PerTestConfig, func()) {
856	testns := f.Namespace.Name
857	cfg := &storageframework.PerTestConfig{
858		Driver:    g,
859		Prefix:    "gcepd",
860		Framework: f,
861	}
862
863	if framework.ProviderIs("gke") {
864		framework.Logf("The csi gce-pd driver is automatically installed in GKE. Skipping driver installation.")
865		return cfg, func() {}
866	}
867
868	ginkgo.By("deploying csi gce-pd driver")
869	// Create secondary namespace which will be used for creating driver
870	driverNamespace := utils.CreateDriverNamespace(f)
871	driverns := driverNamespace.Name
872
873	cancelLogging := utils.StartPodLogs(f, driverNamespace)
874	// It would be safer to rename the gcePD driver, but that
875	// hasn't been done before either and attempts to do so now led to
876	// errors during driver registration, therefore it is disabled
877	// by passing a nil function below.
878	//
879	// These are the options which would have to be used:
880	// o := utils.PatchCSIOptions{
881	// 	OldDriverName:            g.driverInfo.Name,
882	// 	NewDriverName:            storageframework.GetUniqueDriverName(g),
883	// 	DriverContainerName:      "gce-driver",
884	// 	ProvisionerContainerName: "csi-external-provisioner",
885	// }
886	createGCESecrets(f.ClientSet, driverns)
887
888	manifests := []string{
889		"test/e2e/testing-manifests/storage-csi/external-attacher/rbac.yaml",
890		"test/e2e/testing-manifests/storage-csi/external-provisioner/rbac.yaml",
891		"test/e2e/testing-manifests/storage-csi/gce-pd/csi-controller-rbac.yaml",
892		"test/e2e/testing-manifests/storage-csi/gce-pd/node_ds.yaml",
893		"test/e2e/testing-manifests/storage-csi/gce-pd/controller_ss.yaml",
894	}
895
896	cleanup, err := utils.CreateFromManifests(f, driverNamespace, nil, manifests...)
897	if err != nil {
898		framework.Failf("deploying csi gce-pd driver: %v", err)
899	}
900
901	if err = WaitForCSIDriverRegistrationOnAllNodes(GCEPDCSIDriverName, f.ClientSet); err != nil {
902		framework.Failf("waiting for csi driver node registration on: %v", err)
903	}
904
905	cleanupFunc := generateDriverCleanupFunc(
906		f,
907		"gce-pd",
908		testns,
909		driverns,
910		cleanup,
911		cancelLogging)
912
913	return &storageframework.PerTestConfig{
914		Driver:          g,
915		Prefix:          "gcepd",
916		Framework:       f,
917		DriverNamespace: driverNamespace,
918	}, cleanupFunc
919}
920
921// WaitForCSIDriverRegistrationOnAllNodes waits for the CSINode object to be updated
922// with the given driver on all schedulable nodes.
923func WaitForCSIDriverRegistrationOnAllNodes(driverName string, cs clientset.Interface) error {
924	nodes, err := e2enode.GetReadySchedulableNodes(cs)
925	if err != nil {
926		return err
927	}
928	for _, node := range nodes.Items {
929		if err := WaitForCSIDriverRegistrationOnNode(node.Name, driverName, cs); err != nil {
930			return err
931		}
932	}
933	return nil
934}
935
936// WaitForCSIDriverRegistrationOnNode waits for the CSINode object generated by the node-registrar on a certain node
937func WaitForCSIDriverRegistrationOnNode(nodeName string, driverName string, cs clientset.Interface) error {
938	framework.Logf("waiting for CSIDriver %v to register on node %v", driverName, nodeName)
939
940	// About 8.6 minutes timeout
941	backoff := wait.Backoff{
942		Duration: 2 * time.Second,
943		Factor:   1.5,
944		Steps:    12,
945	}
946
947	waitErr := wait.ExponentialBackoff(backoff, func() (bool, error) {
948		csiNode, err := cs.StorageV1().CSINodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
949		if err != nil && !apierrors.IsNotFound(err) {
950			return false, err
951		}
952		for _, driver := range csiNode.Spec.Drivers {
953			if driver.Name == driverName {
954				return true, nil
955			}
956		}
957		return false, nil
958	})
959	if waitErr != nil {
960		return fmt.Errorf("error waiting for CSI driver %s registration on node %s: %v", driverName, nodeName, waitErr)
961	}
962	return nil
963}
964
965func tryFunc(f func()) error {
966	var err error
967	if f == nil {
968		return nil
969	}
970	defer func() {
971		if recoverError := recover(); recoverError != nil {
972			err = fmt.Errorf("%v", recoverError)
973		}
974	}()
975	f()
976	return err
977}
978
979func generateDriverCleanupFunc(
980	f *framework.Framework,
981	driverName, testns, driverns string,
982	driverCleanup, cancelLogging func()) func() {
983
984	cleanupHandle := new(framework.CleanupActionHandle)
985
986	// Cleanup CSI driver and namespaces. This function needs to be idempotent and can be
987	// concurrently called from defer (or AfterEach) and AfterSuite action hooks.
988	cleanupFunc := func() {
989		ginkgo.By(fmt.Sprintf("deleting the test namespace: %s", testns))
990		// Delete the primary namespace but it's okay to fail here because this namespace will
991		// also be deleted by framework.Aftereach hook
992		tryFunc(func() { f.DeleteNamespace(testns) })
993
994		ginkgo.By(fmt.Sprintf("uninstalling csi %s driver", driverName))
995		tryFunc(driverCleanup)
996		tryFunc(cancelLogging)
997
998		ginkgo.By(fmt.Sprintf("deleting the driver namespace: %s", driverns))
999		tryFunc(func() { f.DeleteNamespace(driverns) })
1000		// cleanup function has already ran and hence we don't need to run it again.
1001		// We do this as very last action because in-case defer(or AfterEach) races
1002		// with AfterSuite and test routine gets killed then this block still
1003		// runs in AfterSuite
1004		framework.RemoveCleanupAction(*cleanupHandle)
1005	}
1006
1007	*cleanupHandle = framework.AddCleanupAction(cleanupFunc)
1008	return cleanupFunc
1009}
1010