1/*
2Copyright 2017 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package csi
18
19import (
20	"context"
21	"errors"
22	"fmt"
23	"os"
24	"path/filepath"
25	"strings"
26	"time"
27
28	"k8s.io/klog/v2"
29
30	authenticationv1 "k8s.io/api/authentication/v1"
31	api "k8s.io/api/core/v1"
32	storage "k8s.io/api/storage/v1"
33	apierrors "k8s.io/apimachinery/pkg/api/errors"
34	meta "k8s.io/apimachinery/pkg/apis/meta/v1"
35	"k8s.io/apimachinery/pkg/types"
36	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
37	utilversion "k8s.io/apimachinery/pkg/util/version"
38	"k8s.io/apimachinery/pkg/util/wait"
39	utilfeature "k8s.io/apiserver/pkg/util/feature"
40	clientset "k8s.io/client-go/kubernetes"
41	storagelisters "k8s.io/client-go/listers/storage/v1"
42	csitranslationplugins "k8s.io/csi-translation-lib/plugins"
43	"k8s.io/kubernetes/pkg/features"
44	"k8s.io/kubernetes/pkg/volume"
45	"k8s.io/kubernetes/pkg/volume/csi/nodeinfomanager"
46	volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
47)
48
49const (
50	// CSIPluginName is the name of the in-tree CSI Plugin
51	CSIPluginName = "kubernetes.io/csi"
52
53	csiTimeout      = 2 * time.Minute
54	volNameSep      = "^"
55	volDataFileName = "vol_data.json"
56	fsTypeBlockName = "block"
57
58	// CsiResyncPeriod is default resync period duration
59	// TODO: increase to something useful
60	CsiResyncPeriod = time.Minute
61)
62
63type csiPlugin struct {
64	host                      volume.VolumeHost
65	csiDriverLister           storagelisters.CSIDriverLister
66	serviceAccountTokenGetter func(namespace, name string, tr *authenticationv1.TokenRequest) (*authenticationv1.TokenRequest, error)
67	volumeAttachmentLister    storagelisters.VolumeAttachmentLister
68}
69
70// ProbeVolumePlugins returns implemented plugins
71func ProbeVolumePlugins() []volume.VolumePlugin {
72	p := &csiPlugin{
73		host: nil,
74	}
75	return []volume.VolumePlugin{p}
76}
77
78// volume.VolumePlugin methods
79var _ volume.VolumePlugin = &csiPlugin{}
80
81// RegistrationHandler is the handler which is fed to the pluginwatcher API.
82type RegistrationHandler struct {
83}
84
85// TODO (verult) consider using a struct instead of global variables
86// csiDrivers map keep track of all registered CSI drivers on the node and their
87// corresponding sockets
88var csiDrivers = &DriversStore{}
89
90var nim nodeinfomanager.Interface
91
92// PluginHandler is the plugin registration handler interface passed to the
93// pluginwatcher module in kubelet
94var PluginHandler = &RegistrationHandler{}
95
96// ValidatePlugin is called by kubelet's plugin watcher upon detection
97// of a new registration socket opened by CSI Driver registrar side car.
98func (h *RegistrationHandler) ValidatePlugin(pluginName string, endpoint string, versions []string) error {
99	klog.Infof(log("Trying to validate a new CSI Driver with name: %s endpoint: %s versions: %s",
100		pluginName, endpoint, strings.Join(versions, ",")))
101
102	_, err := h.validateVersions("ValidatePlugin", pluginName, endpoint, versions)
103	if err != nil {
104		return fmt.Errorf("validation failed for CSI Driver %s at endpoint %s: %v", pluginName, endpoint, err)
105	}
106
107	return err
108}
109
110// RegisterPlugin is called when a plugin can be registered
111func (h *RegistrationHandler) RegisterPlugin(pluginName string, endpoint string, versions []string) error {
112	klog.Infof(log("Register new plugin with name: %s at endpoint: %s", pluginName, endpoint))
113
114	highestSupportedVersion, err := h.validateVersions("RegisterPlugin", pluginName, endpoint, versions)
115	if err != nil {
116		return err
117	}
118
119	// Storing endpoint of newly registered CSI driver into the map, where CSI driver name will be the key
120	// all other CSI components will be able to get the actual socket of CSI drivers by its name.
121	csiDrivers.Set(pluginName, Driver{
122		endpoint:                endpoint,
123		highestSupportedVersion: highestSupportedVersion,
124	})
125
126	// Get node info from the driver.
127	csi, err := newCsiDriverClient(csiDriverName(pluginName))
128	if err != nil {
129		return err
130	}
131
132	ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
133	defer cancel()
134
135	driverNodeID, maxVolumePerNode, accessibleTopology, err := csi.NodeGetInfo(ctx)
136	if err != nil {
137		if unregErr := unregisterDriver(pluginName); unregErr != nil {
138			klog.Error(log("registrationHandler.RegisterPlugin failed to unregister plugin due to previous error: %v", unregErr))
139		}
140		return err
141	}
142
143	err = nim.InstallCSIDriver(pluginName, driverNodeID, maxVolumePerNode, accessibleTopology)
144	if err != nil {
145		if unregErr := unregisterDriver(pluginName); unregErr != nil {
146			klog.Error(log("registrationHandler.RegisterPlugin failed to unregister plugin due to previous error: %v", unregErr))
147		}
148		return err
149	}
150
151	return nil
152}
153
154func (h *RegistrationHandler) validateVersions(callerName, pluginName string, endpoint string, versions []string) (*utilversion.Version, error) {
155	if len(versions) == 0 {
156		return nil, errors.New(log("%s for CSI driver %q failed. Plugin returned an empty list for supported versions", callerName, pluginName))
157	}
158
159	// Validate version
160	newDriverHighestVersion, err := highestSupportedVersion(versions)
161	if err != nil {
162		return nil, errors.New(log("%s for CSI driver %q failed. None of the versions specified %q are supported. err=%v", callerName, pluginName, versions, err))
163	}
164
165	existingDriver, driverExists := csiDrivers.Get(pluginName)
166	if driverExists {
167		if !existingDriver.highestSupportedVersion.LessThan(newDriverHighestVersion) {
168			return nil, errors.New(log("%s for CSI driver %q failed. Another driver with the same name is already registered with a higher supported version: %q", callerName, pluginName, existingDriver.highestSupportedVersion))
169		}
170	}
171
172	return newDriverHighestVersion, nil
173}
174
175// DeRegisterPlugin is called when a plugin removed its socket, signaling
176// it is no longer available
177func (h *RegistrationHandler) DeRegisterPlugin(pluginName string) {
178	klog.Info(log("registrationHandler.DeRegisterPlugin request for plugin %s", pluginName))
179	if err := unregisterDriver(pluginName); err != nil {
180		klog.Error(log("registrationHandler.DeRegisterPlugin failed: %v", err))
181	}
182}
183
184func (p *csiPlugin) Init(host volume.VolumeHost) error {
185	p.host = host
186
187	csiClient := host.GetKubeClient()
188	if csiClient == nil {
189		klog.Warning(log("kubeclient not set, assuming standalone kubelet"))
190	} else {
191		// set CSIDriverLister and volumeAttachmentLister
192		adcHost, ok := host.(volume.AttachDetachVolumeHost)
193		if ok {
194			p.csiDriverLister = adcHost.CSIDriverLister()
195			if p.csiDriverLister == nil {
196				klog.Error(log("CSIDriverLister not found on AttachDetachVolumeHost"))
197			}
198			p.volumeAttachmentLister = adcHost.VolumeAttachmentLister()
199			if p.volumeAttachmentLister == nil {
200				klog.Error(log("VolumeAttachmentLister not found on AttachDetachVolumeHost"))
201			}
202		}
203		kletHost, ok := host.(volume.KubeletVolumeHost)
204		if ok {
205			p.csiDriverLister = kletHost.CSIDriverLister()
206			if p.csiDriverLister == nil {
207				klog.Error(log("CSIDriverLister not found on KubeletVolumeHost"))
208			}
209			p.serviceAccountTokenGetter = host.GetServiceAccountTokenFunc()
210			if p.serviceAccountTokenGetter == nil {
211				klog.Error(log("ServiceAccountTokenGetter not found on KubeletVolumeHost"))
212			}
213			// We don't run the volumeAttachmentLister in the kubelet context
214			p.volumeAttachmentLister = nil
215		}
216	}
217
218	var migratedPlugins = map[string](func() bool){
219		csitranslationplugins.GCEPDInTreePluginName: func() bool {
220			return utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) && utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationGCE)
221		},
222		csitranslationplugins.AWSEBSInTreePluginName: func() bool {
223			return utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) && utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationAWS)
224		},
225		csitranslationplugins.CinderInTreePluginName: func() bool {
226			return utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) && utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationOpenStack)
227		},
228		csitranslationplugins.AzureDiskInTreePluginName: func() bool {
229			return utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) && utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationAzureDisk)
230		},
231		csitranslationplugins.AzureFileInTreePluginName: func() bool {
232			return utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) && utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationAzureFile)
233		},
234		csitranslationplugins.VSphereInTreePluginName: func() bool {
235			return utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) && utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationvSphere)
236		},
237	}
238
239	// Initializing the label management channels
240	nim = nodeinfomanager.NewNodeInfoManager(host.GetNodeName(), host, migratedPlugins)
241
242	if utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) {
243		// This function prevents Kubelet from posting Ready status until CSINode
244		// is both installed and initialized
245		if err := initializeCSINode(host); err != nil {
246			return errors.New(log("failed to initialize CSINode: %v", err))
247		}
248	}
249
250	return nil
251}
252
253func initializeCSINode(host volume.VolumeHost) error {
254	kvh, ok := host.(volume.KubeletVolumeHost)
255	if !ok {
256		klog.V(4).Info("Cast from VolumeHost to KubeletVolumeHost failed. Skipping CSINode initialization, not running on kubelet")
257		return nil
258	}
259	kubeClient := host.GetKubeClient()
260	if kubeClient == nil {
261		// Kubelet running in standalone mode. Skip CSINode initialization
262		klog.Warning("Skipping CSINode initialization, kubelet running in standalone mode")
263		return nil
264	}
265
266	kvh.SetKubeletError(errors.New("CSINode is not yet initialized"))
267
268	go func() {
269		defer utilruntime.HandleCrash()
270
271		// First wait indefinitely to talk to Kube APIServer
272		nodeName := host.GetNodeName()
273		err := waitForAPIServerForever(kubeClient, nodeName)
274		if err != nil {
275			klog.Fatalf("Failed to initialize CSINode while waiting for API server to report ok: %v", err)
276		}
277
278		// Backoff parameters tuned to retry over 140 seconds. Will fail and restart the Kubelet
279		// after max retry steps.
280		initBackoff := wait.Backoff{
281			Steps:    6,
282			Duration: 15 * time.Millisecond,
283			Factor:   6.0,
284			Jitter:   0.1,
285		}
286		err = wait.ExponentialBackoff(initBackoff, func() (bool, error) {
287			klog.V(4).Infof("Initializing migrated drivers on CSINode")
288			err := nim.InitializeCSINodeWithAnnotation()
289			if err != nil {
290				kvh.SetKubeletError(fmt.Errorf("failed to initialize CSINode: %v", err))
291				klog.Errorf("Failed to initialize CSINode: %v", err)
292				return false, nil
293			}
294
295			// Successfully initialized drivers, allow Kubelet to post Ready
296			kvh.SetKubeletError(nil)
297			return true, nil
298		})
299		if err != nil {
300			// 2 releases after CSIMigration and all CSIMigrationX (where X is a volume plugin)
301			// are permanently enabled the apiserver/controllers can assume that the kubelet is
302			// using CSI for all Migrated volume plugins. Then all the CSINode initialization
303			// code can be dropped from Kubelet.
304			// Kill the Kubelet process and allow it to restart to retry initialization
305			klog.Fatalf("Failed to initialize CSINode after retrying: %v", err)
306		}
307	}()
308	return nil
309}
310
311func (p *csiPlugin) GetPluginName() string {
312	return CSIPluginName
313}
314
315// GetvolumeName returns a concatenated string of CSIVolumeSource.Driver<volNameSe>CSIVolumeSource.VolumeHandle
316// That string value is used in Detach() to extract driver name and volumeName.
317func (p *csiPlugin) GetVolumeName(spec *volume.Spec) (string, error) {
318	csi, err := getPVSourceFromSpec(spec)
319	if err != nil {
320		return "", errors.New(log("plugin.GetVolumeName failed to extract volume source from spec: %v", err))
321	}
322
323	// return driverName<separator>volumeHandle
324	return fmt.Sprintf("%s%s%s", csi.Driver, volNameSep, csi.VolumeHandle), nil
325}
326
327func (p *csiPlugin) CanSupport(spec *volume.Spec) bool {
328	// TODO (vladimirvivien) CanSupport should also take into account
329	// the availability/registration of specified Driver in the volume source
330	if spec == nil {
331		return false
332	}
333	if utilfeature.DefaultFeatureGate.Enabled(features.CSIInlineVolume) {
334		return (spec.PersistentVolume != nil && spec.PersistentVolume.Spec.CSI != nil) ||
335			(spec.Volume != nil && spec.Volume.CSI != nil)
336	}
337
338	return spec.PersistentVolume != nil && spec.PersistentVolume.Spec.CSI != nil
339}
340
341func (p *csiPlugin) RequiresRemount(spec *volume.Spec) bool {
342	if p.csiDriverLister == nil {
343		return false
344	}
345	driverName, err := GetCSIDriverName(spec)
346	if err != nil {
347		klog.V(5).Info(log("Failed to mark %q as republish required, err: %v", spec.Name(), err))
348		return false
349	}
350	csiDriver, err := p.csiDriverLister.Get(driverName)
351	if err != nil {
352		klog.V(5).Info(log("Failed to mark %q as republish required, err: %v", spec.Name(), err))
353		return false
354	}
355	return *csiDriver.Spec.RequiresRepublish
356}
357
358func (p *csiPlugin) NewMounter(
359	spec *volume.Spec,
360	pod *api.Pod,
361	_ volume.VolumeOptions) (volume.Mounter, error) {
362
363	volSrc, pvSrc, err := getSourceFromSpec(spec)
364	if err != nil {
365		return nil, err
366	}
367
368	var (
369		driverName   string
370		volumeHandle string
371		readOnly     bool
372	)
373
374	switch {
375	case volSrc != nil && utilfeature.DefaultFeatureGate.Enabled(features.CSIInlineVolume):
376		volumeHandle = makeVolumeHandle(string(pod.UID), spec.Name())
377		driverName = volSrc.Driver
378		if volSrc.ReadOnly != nil {
379			readOnly = *volSrc.ReadOnly
380		}
381	case pvSrc != nil:
382		driverName = pvSrc.Driver
383		volumeHandle = pvSrc.VolumeHandle
384		readOnly = spec.ReadOnly
385	default:
386		return nil, errors.New(log("volume source not found in volume.Spec"))
387	}
388
389	volumeLifecycleMode, err := p.getVolumeLifecycleMode(spec)
390	if err != nil {
391		return nil, err
392	}
393
394	// Check CSIDriver.Spec.Mode to ensure that the CSI driver
395	// supports the current volumeLifecycleMode.
396	if err := p.supportsVolumeLifecycleMode(driverName, volumeLifecycleMode); err != nil {
397		return nil, err
398	}
399
400	fsGroupPolicy, err := p.getFSGroupPolicy(driverName)
401	if err != nil {
402		return nil, err
403	}
404
405	k8s := p.host.GetKubeClient()
406	if k8s == nil {
407		return nil, errors.New(log("failed to get a kubernetes client"))
408	}
409
410	kvh, ok := p.host.(volume.KubeletVolumeHost)
411	if !ok {
412		return nil, errors.New(log("cast from VolumeHost to KubeletVolumeHost failed"))
413	}
414
415	mounter := &csiMountMgr{
416		plugin:              p,
417		k8s:                 k8s,
418		spec:                spec,
419		pod:                 pod,
420		podUID:              pod.UID,
421		driverName:          csiDriverName(driverName),
422		volumeLifecycleMode: volumeLifecycleMode,
423		fsGroupPolicy:       fsGroupPolicy,
424		volumeID:            volumeHandle,
425		specVolumeID:        spec.Name(),
426		readOnly:            readOnly,
427		kubeVolHost:         kvh,
428	}
429	mounter.csiClientGetter.driverName = csiDriverName(driverName)
430
431	// Save volume info in pod dir
432	dir := mounter.GetPath()
433	dataDir := filepath.Dir(dir) // dropoff /mount at end
434
435	if err := os.MkdirAll(dataDir, 0750); err != nil {
436		return nil, errors.New(log("failed to create dir %#v:  %v", dataDir, err))
437	}
438	klog.V(4).Info(log("created path successfully [%s]", dataDir))
439
440	mounter.MetricsProvider = NewMetricsCsi(volumeHandle, dir, csiDriverName(driverName))
441
442	// persist volume info data for teardown
443	node := string(p.host.GetNodeName())
444	volData := map[string]string{
445		volDataKey.specVolID:           spec.Name(),
446		volDataKey.volHandle:           volumeHandle,
447		volDataKey.driverName:          driverName,
448		volDataKey.nodeName:            node,
449		volDataKey.volumeLifecycleMode: string(volumeLifecycleMode),
450	}
451
452	attachID := getAttachmentName(volumeHandle, driverName, node)
453	volData[volDataKey.attachmentID] = attachID
454
455	err = saveVolumeData(dataDir, volDataFileName, volData)
456	defer func() {
457		// Only if there was an error and volume operation was considered
458		// finished, we should remove the directory.
459		if err != nil && volumetypes.IsOperationFinishedError(err) {
460			// attempt to cleanup volume mount dir.
461			if err = removeMountDir(p, dir); err != nil {
462				klog.Error(log("attacher.MountDevice failed to remove mount dir after error [%s]: %v", dir, err))
463			}
464		}
465	}()
466
467	if err != nil {
468		errorMsg := log("csi.NewMounter failed to save volume info data: %v", err)
469		klog.Error(errorMsg)
470
471		return nil, errors.New(errorMsg)
472	}
473
474	klog.V(4).Info(log("mounter created successfully"))
475
476	return mounter, nil
477}
478
479func (p *csiPlugin) NewUnmounter(specName string, podUID types.UID) (volume.Unmounter, error) {
480	klog.V(4).Infof(log("setting up unmounter for [name=%v, podUID=%v]", specName, podUID))
481
482	kvh, ok := p.host.(volume.KubeletVolumeHost)
483	if !ok {
484		return nil, errors.New(log("cast from VolumeHost to KubeletVolumeHost failed"))
485	}
486
487	unmounter := &csiMountMgr{
488		plugin:       p,
489		podUID:       podUID,
490		specVolumeID: specName,
491		kubeVolHost:  kvh,
492	}
493
494	// load volume info from file
495	dir := unmounter.GetPath()
496	dataDir := filepath.Dir(dir) // dropoff /mount at end
497	data, err := loadVolumeData(dataDir, volDataFileName)
498	if err != nil {
499		return nil, errors.New(log("unmounter failed to load volume data file [%s]: %v", dir, err))
500	}
501	unmounter.driverName = csiDriverName(data[volDataKey.driverName])
502	unmounter.volumeID = data[volDataKey.volHandle]
503	unmounter.csiClientGetter.driverName = unmounter.driverName
504
505	return unmounter, nil
506}
507
508func (p *csiPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
509	klog.V(4).Info(log("plugin.ConstructVolumeSpec [pv.Name=%v, path=%v]", volumeName, mountPath))
510
511	volData, err := loadVolumeData(mountPath, volDataFileName)
512	if err != nil {
513		return nil, errors.New(log("plugin.ConstructVolumeSpec failed loading volume data using [%s]: %v", mountPath, err))
514	}
515
516	klog.V(4).Info(log("plugin.ConstructVolumeSpec extracted [%#v]", volData))
517
518	var spec *volume.Spec
519	inlineEnabled := utilfeature.DefaultFeatureGate.Enabled(features.CSIInlineVolume)
520
521	// If inlineEnabled is true and mode is VolumeLifecycleEphemeral,
522	// use constructVolSourceSpec to construct volume source spec.
523	// If inlineEnabled is false or mode is VolumeLifecyclePersistent,
524	// use constructPVSourceSpec to construct volume construct pv source spec.
525	if inlineEnabled && storage.VolumeLifecycleMode(volData[volDataKey.volumeLifecycleMode]) == storage.VolumeLifecycleEphemeral {
526		spec = p.constructVolSourceSpec(volData[volDataKey.specVolID], volData[volDataKey.driverName])
527		return spec, nil
528	}
529	spec = p.constructPVSourceSpec(volData[volDataKey.specVolID], volData[volDataKey.driverName], volData[volDataKey.volHandle])
530
531	return spec, nil
532}
533
534// constructVolSourceSpec constructs volume.Spec with CSIVolumeSource
535func (p *csiPlugin) constructVolSourceSpec(volSpecName, driverName string) *volume.Spec {
536	vol := &api.Volume{
537		Name: volSpecName,
538		VolumeSource: api.VolumeSource{
539			CSI: &api.CSIVolumeSource{
540				Driver: driverName,
541			},
542		},
543	}
544	return volume.NewSpecFromVolume(vol)
545}
546
547//constructPVSourceSpec constructs volume.Spec with CSIPersistentVolumeSource
548func (p *csiPlugin) constructPVSourceSpec(volSpecName, driverName, volumeHandle string) *volume.Spec {
549	fsMode := api.PersistentVolumeFilesystem
550	pv := &api.PersistentVolume{
551		ObjectMeta: meta.ObjectMeta{
552			Name: volSpecName,
553		},
554		Spec: api.PersistentVolumeSpec{
555			PersistentVolumeSource: api.PersistentVolumeSource{
556				CSI: &api.CSIPersistentVolumeSource{
557					Driver:       driverName,
558					VolumeHandle: volumeHandle,
559				},
560			},
561			VolumeMode: &fsMode,
562		},
563	}
564	return volume.NewSpecFromPersistentVolume(pv, false)
565}
566
567func (p *csiPlugin) SupportsMountOption() bool {
568	// TODO (vladimirvivien) use CSI VolumeCapability.MountVolume.mount_flags
569	// to probe for the result for this method
570	// (bswartz) Until the CSI spec supports probing, our only option is to
571	// make plugins register their support for mount options or lack thereof
572	// directly with kubernetes.
573	return true
574}
575
576func (p *csiPlugin) SupportsBulkVolumeVerification() bool {
577	return false
578}
579
580// volume.AttachableVolumePlugin methods
581var _ volume.AttachableVolumePlugin = &csiPlugin{}
582
583var _ volume.DeviceMountableVolumePlugin = &csiPlugin{}
584
585func (p *csiPlugin) NewAttacher() (volume.Attacher, error) {
586	return p.newAttacherDetacher()
587}
588
589func (p *csiPlugin) NewDeviceMounter() (volume.DeviceMounter, error) {
590	return p.NewAttacher()
591}
592
593func (p *csiPlugin) NewDetacher() (volume.Detacher, error) {
594	return p.newAttacherDetacher()
595}
596
597func (p *csiPlugin) CanAttach(spec *volume.Spec) (bool, error) {
598	inlineEnabled := utilfeature.DefaultFeatureGate.Enabled(features.CSIInlineVolume)
599	if inlineEnabled {
600		volumeLifecycleMode, err := p.getVolumeLifecycleMode(spec)
601		if err != nil {
602			return false, err
603		}
604
605		if volumeLifecycleMode == storage.VolumeLifecycleEphemeral {
606			klog.V(5).Info(log("plugin.CanAttach = false, ephemeral mode detected for spec %v", spec.Name()))
607			return false, nil
608		}
609	}
610
611	pvSrc, err := getCSISourceFromSpec(spec)
612	if err != nil {
613		return false, err
614	}
615
616	driverName := pvSrc.Driver
617
618	skipAttach, err := p.skipAttach(driverName)
619	if err != nil {
620		return false, err
621	}
622
623	return !skipAttach, nil
624}
625
626// CanDeviceMount returns true if the spec supports device mount
627func (p *csiPlugin) CanDeviceMount(spec *volume.Spec) (bool, error) {
628	inlineEnabled := utilfeature.DefaultFeatureGate.Enabled(features.CSIInlineVolume)
629	if !inlineEnabled {
630		// No need to check anything, we assume it is a persistent volume.
631		return true, nil
632	}
633
634	volumeLifecycleMode, err := p.getVolumeLifecycleMode(spec)
635	if err != nil {
636		return false, err
637	}
638
639	if volumeLifecycleMode == storage.VolumeLifecycleEphemeral {
640		klog.V(5).Info(log("plugin.CanDeviceMount skipped ephemeral mode detected for spec %v", spec.Name()))
641		return false, nil
642	}
643
644	// Persistent volumes support device mount.
645	return true, nil
646}
647
648func (p *csiPlugin) NewDeviceUnmounter() (volume.DeviceUnmounter, error) {
649	return p.NewDetacher()
650}
651
652func (p *csiPlugin) GetDeviceMountRefs(deviceMountPath string) ([]string, error) {
653	m := p.host.GetMounter(p.GetPluginName())
654	return m.GetMountRefs(deviceMountPath)
655}
656
657// BlockVolumePlugin methods
658var _ volume.BlockVolumePlugin = &csiPlugin{}
659
660func (p *csiPlugin) NewBlockVolumeMapper(spec *volume.Spec, podRef *api.Pod, opts volume.VolumeOptions) (volume.BlockVolumeMapper, error) {
661	pvSource, err := getCSISourceFromSpec(spec)
662	if err != nil {
663		return nil, err
664	}
665	readOnly, err := getReadOnlyFromSpec(spec)
666	if err != nil {
667		return nil, err
668	}
669
670	klog.V(4).Info(log("setting up block mapper for [volume=%v,driver=%v]", pvSource.VolumeHandle, pvSource.Driver))
671
672	k8s := p.host.GetKubeClient()
673	if k8s == nil {
674		return nil, errors.New(log("failed to get a kubernetes client"))
675	}
676
677	mapper := &csiBlockMapper{
678		k8s:        k8s,
679		plugin:     p,
680		volumeID:   pvSource.VolumeHandle,
681		driverName: csiDriverName(pvSource.Driver),
682		readOnly:   readOnly,
683		spec:       spec,
684		specName:   spec.Name(),
685		pod:        podRef,
686		podUID:     podRef.UID,
687	}
688	mapper.csiClientGetter.driverName = csiDriverName(pvSource.Driver)
689
690	// Save volume info in pod dir
691	dataDir := getVolumeDeviceDataDir(spec.Name(), p.host)
692
693	if err := os.MkdirAll(dataDir, 0750); err != nil {
694		return nil, errors.New(log("failed to create data dir %s:  %v", dataDir, err))
695	}
696	klog.V(4).Info(log("created path successfully [%s]", dataDir))
697
698	blockPath, err := mapper.GetGlobalMapPath(spec)
699	if err != nil {
700		return nil, errors.New(log("failed to get device path: %v", err))
701	}
702
703	mapper.MetricsProvider = NewMetricsCsi(pvSource.VolumeHandle, blockPath+"/"+string(podRef.UID), csiDriverName(pvSource.Driver))
704
705	// persist volume info data for teardown
706	node := string(p.host.GetNodeName())
707	attachID := getAttachmentName(pvSource.VolumeHandle, pvSource.Driver, node)
708	volData := map[string]string{
709		volDataKey.specVolID:    spec.Name(),
710		volDataKey.volHandle:    pvSource.VolumeHandle,
711		volDataKey.driverName:   pvSource.Driver,
712		volDataKey.nodeName:     node,
713		volDataKey.attachmentID: attachID,
714	}
715
716	err = saveVolumeData(dataDir, volDataFileName, volData)
717	defer func() {
718		// Only if there was an error and volume operation was considered
719		// finished, we should remove the directory.
720		if err != nil && volumetypes.IsOperationFinishedError(err) {
721			// attempt to cleanup volume mount dir.
722			if err = removeMountDir(p, dataDir); err != nil {
723				klog.Error(log("attacher.MountDevice failed to remove mount dir after error [%s]: %v", dataDir, err))
724			}
725		}
726	}()
727	if err != nil {
728		errorMsg := log("csi.NewBlockVolumeMapper failed to save volume info data: %v", err)
729		klog.Error(errorMsg)
730		return nil, errors.New(errorMsg)
731	}
732
733	return mapper, nil
734}
735
736func (p *csiPlugin) NewBlockVolumeUnmapper(volName string, podUID types.UID) (volume.BlockVolumeUnmapper, error) {
737	klog.V(4).Infof(log("setting up block unmapper for [Spec=%v, podUID=%v]", volName, podUID))
738	unmapper := &csiBlockMapper{
739		plugin:   p,
740		podUID:   podUID,
741		specName: volName,
742	}
743
744	// load volume info from file
745	dataDir := getVolumeDeviceDataDir(unmapper.specName, p.host)
746	data, err := loadVolumeData(dataDir, volDataFileName)
747	if err != nil {
748		return nil, errors.New(log("unmapper failed to load volume data file [%s]: %v", dataDir, err))
749	}
750	unmapper.driverName = csiDriverName(data[volDataKey.driverName])
751	unmapper.volumeID = data[volDataKey.volHandle]
752	unmapper.csiClientGetter.driverName = unmapper.driverName
753
754	return unmapper, nil
755}
756
757func (p *csiPlugin) ConstructBlockVolumeSpec(podUID types.UID, specVolName, mapPath string) (*volume.Spec, error) {
758	klog.V(4).Infof("plugin.ConstructBlockVolumeSpec [podUID=%s, specVolName=%s, path=%s]", string(podUID), specVolName, mapPath)
759
760	dataDir := getVolumeDeviceDataDir(specVolName, p.host)
761	volData, err := loadVolumeData(dataDir, volDataFileName)
762	if err != nil {
763		return nil, errors.New(log("plugin.ConstructBlockVolumeSpec failed loading volume data using [%s]: %v", mapPath, err))
764	}
765
766	klog.V(4).Info(log("plugin.ConstructBlockVolumeSpec extracted [%#v]", volData))
767
768	blockMode := api.PersistentVolumeBlock
769	pv := &api.PersistentVolume{
770		ObjectMeta: meta.ObjectMeta{
771			Name: volData[volDataKey.specVolID],
772		},
773		Spec: api.PersistentVolumeSpec{
774			PersistentVolumeSource: api.PersistentVolumeSource{
775				CSI: &api.CSIPersistentVolumeSource{
776					Driver:       volData[volDataKey.driverName],
777					VolumeHandle: volData[volDataKey.volHandle],
778				},
779			},
780			VolumeMode: &blockMode,
781		},
782	}
783
784	return volume.NewSpecFromPersistentVolume(pv, false), nil
785}
786
787// skipAttach looks up CSIDriver object associated with driver name
788// to determine if driver requires attachment volume operation
789func (p *csiPlugin) skipAttach(driver string) (bool, error) {
790	kletHost, ok := p.host.(volume.KubeletVolumeHost)
791	if ok {
792		if err := kletHost.WaitForCacheSync(); err != nil {
793			return false, err
794		}
795	}
796
797	if p.csiDriverLister == nil {
798		return false, errors.New("CSIDriver lister does not exist")
799	}
800	csiDriver, err := p.csiDriverLister.Get(driver)
801	if err != nil {
802		if apierrors.IsNotFound(err) {
803			// Don't skip attach if CSIDriver does not exist
804			return false, nil
805		}
806		return false, err
807	}
808	if csiDriver.Spec.AttachRequired != nil && *csiDriver.Spec.AttachRequired == false {
809		return true, nil
810	}
811	return false, nil
812}
813
814// supportsVolumeMode checks whether the CSI driver supports a volume in the given mode.
815// An error indicates that it isn't supported and explains why.
816func (p *csiPlugin) supportsVolumeLifecycleMode(driver string, volumeMode storage.VolumeLifecycleMode) error {
817	if !utilfeature.DefaultFeatureGate.Enabled(features.CSIInlineVolume) {
818		// Feature disabled, therefore only "persistent" volumes are supported.
819		if volumeMode != storage.VolumeLifecyclePersistent {
820			return fmt.Errorf("CSIInlineVolume feature not enabled, %q volumes not supported", volumeMode)
821		}
822		return nil
823	}
824
825	// Retrieve CSIDriver. It's not an error if that isn't
826	// possible (we don't have the lister if CSIDriverRegistry is
827	// disabled) or the driver isn't found (CSIDriver is
828	// optional), but then only persistent volumes are supported.
829	var csiDriver *storage.CSIDriver
830	if p.csiDriverLister != nil {
831		kletHost, ok := p.host.(volume.KubeletVolumeHost)
832		if ok {
833			if err := kletHost.WaitForCacheSync(); err != nil {
834				return err
835			}
836		}
837
838		c, err := p.csiDriverLister.Get(driver)
839		if err != nil && !apierrors.IsNotFound(err) {
840			// Some internal error.
841			return err
842		}
843		csiDriver = c
844	}
845
846	// The right response depends on whether we have information
847	// about the driver and the volume mode.
848	switch {
849	case csiDriver == nil && volumeMode == storage.VolumeLifecyclePersistent:
850		// No information, but that's okay for persistent volumes (and only those).
851		return nil
852	case csiDriver == nil:
853		return fmt.Errorf("volume mode %q not supported by driver %s (no CSIDriver object)", volumeMode, driver)
854	case containsVolumeMode(csiDriver.Spec.VolumeLifecycleModes, volumeMode):
855		// Explicitly listed.
856		return nil
857	default:
858		return fmt.Errorf("volume mode %q not supported by driver %s (only supports %q)", volumeMode, driver, csiDriver.Spec.VolumeLifecycleModes)
859	}
860}
861
862// containsVolumeMode checks whether the given volume mode is listed.
863func containsVolumeMode(modes []storage.VolumeLifecycleMode, mode storage.VolumeLifecycleMode) bool {
864	for _, m := range modes {
865		if m == mode {
866			return true
867		}
868	}
869	return false
870}
871
872// getVolumeLifecycleMode returns the mode for the specified spec: {persistent|ephemeral}.
873// 1) If mode cannot be determined, it will default to "persistent".
874// 2) If Mode cannot be resolved to either {persistent | ephemeral}, an error is returned
875// See https://github.com/kubernetes/enhancements/blob/master/keps/sig-storage/596-csi-inline-volumes/README.md
876func (p *csiPlugin) getVolumeLifecycleMode(spec *volume.Spec) (storage.VolumeLifecycleMode, error) {
877	// 1) if volume.Spec.Volume.CSI != nil -> mode is ephemeral
878	// 2) if volume.Spec.PersistentVolume.Spec.CSI != nil -> persistent
879	volSrc, _, err := getSourceFromSpec(spec)
880	if err != nil {
881		return "", err
882	}
883
884	if volSrc != nil && utilfeature.DefaultFeatureGate.Enabled(features.CSIInlineVolume) {
885		return storage.VolumeLifecycleEphemeral, nil
886	}
887	return storage.VolumeLifecyclePersistent, nil
888}
889
890// getFSGroupPolicy returns if the CSI driver supports a volume in the given mode.
891// An error indicates that it isn't supported and explains why.
892func (p *csiPlugin) getFSGroupPolicy(driver string) (storage.FSGroupPolicy, error) {
893	if !utilfeature.DefaultFeatureGate.Enabled(features.CSIVolumeFSGroupPolicy) {
894		// feature is disabled, default to ReadWriteOnceWithFSTypeFSGroupPolicy
895		return storage.ReadWriteOnceWithFSTypeFSGroupPolicy, nil
896	}
897
898	// Retrieve CSIDriver. It's not an error if that isn't
899	// possible (we don't have the lister if CSIDriverRegistry is
900	// disabled) or the driver isn't found (CSIDriver is
901	// optional)
902	var csiDriver *storage.CSIDriver
903	if p.csiDriverLister != nil {
904		kletHost, ok := p.host.(volume.KubeletVolumeHost)
905		if ok {
906			if err := kletHost.WaitForCacheSync(); err != nil {
907				return storage.ReadWriteOnceWithFSTypeFSGroupPolicy, err
908			}
909		}
910
911		c, err := p.csiDriverLister.Get(driver)
912		if err != nil && !apierrors.IsNotFound(err) {
913			// Some internal error.
914			return storage.ReadWriteOnceWithFSTypeFSGroupPolicy, err
915		}
916		csiDriver = c
917	}
918
919	// If the csiDriver isn't defined, return the default behavior
920	if csiDriver == nil {
921		return storage.ReadWriteOnceWithFSTypeFSGroupPolicy, nil
922	}
923	// If the csiDriver exists but the fsGroupPolicy isn't defined, return an error
924	if csiDriver.Spec.FSGroupPolicy == nil || *csiDriver.Spec.FSGroupPolicy == "" {
925		return storage.ReadWriteOnceWithFSTypeFSGroupPolicy, errors.New(log("expected valid fsGroupPolicy, received nil value or empty string"))
926	}
927	return *csiDriver.Spec.FSGroupPolicy, nil
928}
929
930func (p *csiPlugin) getPublishContext(client clientset.Interface, handle, driver, nodeName string) (map[string]string, error) {
931	skip, err := p.skipAttach(driver)
932	if err != nil {
933		return nil, err
934	}
935	if skip {
936		return nil, nil
937	}
938
939	attachID := getAttachmentName(handle, driver, nodeName)
940
941	// search for attachment by VolumeAttachment.Spec.Source.PersistentVolumeName
942	attachment, err := client.StorageV1().VolumeAttachments().Get(context.TODO(), attachID, meta.GetOptions{})
943	if err != nil {
944		return nil, err // This err already has enough context ("VolumeAttachment xyz not found")
945	}
946
947	if attachment == nil {
948		err = errors.New("no existing VolumeAttachment found")
949		return nil, err
950	}
951	return attachment.Status.AttachmentMetadata, nil
952}
953
954func (p *csiPlugin) newAttacherDetacher() (*csiAttacher, error) {
955	k8s := p.host.GetKubeClient()
956	if k8s == nil {
957		return nil, errors.New(log("unable to get kubernetes client from host"))
958	}
959
960	return &csiAttacher{
961		plugin:       p,
962		k8s:          k8s,
963		watchTimeout: csiTimeout,
964	}, nil
965}
966
967// podInfoEnabled  check CSIDriver enabled pod info flag
968func (p *csiPlugin) podInfoEnabled(driverName string) (bool, error) {
969	kletHost, ok := p.host.(volume.KubeletVolumeHost)
970	if ok {
971		kletHost.WaitForCacheSync()
972	}
973
974	if p.csiDriverLister == nil {
975		return false, fmt.Errorf("CSIDriverLister not found")
976	}
977
978	csiDriver, err := p.csiDriverLister.Get(driverName)
979	if err != nil {
980		if apierrors.IsNotFound(err) {
981			klog.V(4).Infof(log("CSIDriver %q not found, not adding pod information", driverName))
982			return false, nil
983		}
984		return false, err
985	}
986
987	// if PodInfoOnMount is not set or false we do not set pod attributes
988	if csiDriver.Spec.PodInfoOnMount == nil || *csiDriver.Spec.PodInfoOnMount == false {
989		klog.V(4).Infof(log("CSIDriver %q does not require pod information", driverName))
990		return false, nil
991	}
992	return true, nil
993}
994
995func unregisterDriver(driverName string) error {
996	csiDrivers.Delete(driverName)
997
998	if err := nim.UninstallCSIDriver(driverName); err != nil {
999		return errors.New(log("Error uninstalling CSI driver: %v", err))
1000	}
1001
1002	return nil
1003}
1004
1005// Return the highest supported version
1006func highestSupportedVersion(versions []string) (*utilversion.Version, error) {
1007	if len(versions) == 0 {
1008		return nil, errors.New(log("CSI driver reporting empty array for supported versions"))
1009	}
1010
1011	var highestSupportedVersion *utilversion.Version
1012	var theErr error
1013	for i := len(versions) - 1; i >= 0; i-- {
1014		currentHighestVer, err := utilversion.ParseGeneric(versions[i])
1015		if err != nil {
1016			theErr = err
1017			continue
1018		}
1019		if currentHighestVer.Major() > 1 {
1020			// CSI currently only has version 0.x and 1.x (see https://github.com/container-storage-interface/spec/releases).
1021			// Therefore any driver claiming version 2.x+ is ignored as an unsupported versions.
1022			// Future 1.x versions of CSI are supposed to be backwards compatible so this version of Kubernetes will work with any 1.x driver
1023			// (or 0.x), but it may not work with 2.x drivers (because 2.x does not have to be backwards compatible with 1.x).
1024			continue
1025		}
1026		if highestSupportedVersion == nil || highestSupportedVersion.LessThan(currentHighestVer) {
1027			highestSupportedVersion = currentHighestVer
1028		}
1029	}
1030
1031	if highestSupportedVersion == nil {
1032		return nil, fmt.Errorf("could not find a highest supported version from versions (%v) reported by this driver: %v", versions, theErr)
1033	}
1034
1035	if highestSupportedVersion.Major() != 1 {
1036		// CSI v0.x is no longer supported as of Kubernetes v1.17 in
1037		// accordance with deprecation policy set out in Kubernetes v1.13
1038		return nil, fmt.Errorf("highest supported version reported by driver is %v, must be v1.x", highestSupportedVersion)
1039	}
1040	return highestSupportedVersion, nil
1041}
1042
1043// waitForAPIServerForever waits forever to get a CSINode instance as a proxy
1044// for a healthy APIServer
1045func waitForAPIServerForever(client clientset.Interface, nodeName types.NodeName) error {
1046	var lastErr error
1047	err := wait.PollImmediateInfinite(time.Second, func() (bool, error) {
1048		// Get a CSINode from API server to make sure 1) kubelet can reach API server
1049		// and 2) it has enough permissions. Kubelet may have restricted permissions
1050		// when it's bootstrapping TLS.
1051		// https://kubernetes.io/docs/reference/command-line-tools-reference/kubelet-tls-bootstrapping/
1052		_, lastErr = client.StorageV1().CSINodes().Get(context.TODO(), string(nodeName), meta.GetOptions{})
1053		if lastErr == nil || apierrors.IsNotFound(lastErr) {
1054			// API server contacted
1055			return true, nil
1056		}
1057		klog.V(2).Infof("Failed to contact API server when waiting for CSINode publishing: %s", lastErr)
1058		return false, nil
1059	})
1060	if err != nil {
1061		// In theory this is unreachable, but just in case:
1062		return fmt.Errorf("%v: %v", err, lastErr)
1063	}
1064
1065	return nil
1066}
1067