1/*
2Copyright 2017 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package csi
18
19import (
20	"crypto/sha256"
21	"encoding/json"
22	"errors"
23	"fmt"
24	"os"
25	"path/filepath"
26
27	"k8s.io/klog/v2"
28
29	authenticationv1 "k8s.io/api/authentication/v1"
30	api "k8s.io/api/core/v1"
31	storage "k8s.io/api/storage/v1"
32	apierrors "k8s.io/apimachinery/pkg/api/errors"
33	"k8s.io/apimachinery/pkg/types"
34	utilfeature "k8s.io/apiserver/pkg/util/feature"
35	"k8s.io/client-go/kubernetes"
36	"k8s.io/kubernetes/pkg/features"
37	"k8s.io/kubernetes/pkg/volume"
38	"k8s.io/kubernetes/pkg/volume/util"
39	volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
40	"k8s.io/mount-utils"
41	utilstrings "k8s.io/utils/strings"
42)
43
44//TODO (vladimirvivien) move this in a central loc later
45var (
46	volDataKey = struct {
47		specVolID,
48		volHandle,
49		driverName,
50		nodeName,
51		attachmentID,
52		volumeLifecycleMode string
53	}{
54		"specVolID",
55		"volumeHandle",
56		"driverName",
57		"nodeName",
58		"attachmentID",
59		"volumeLifecycleMode",
60	}
61)
62
63type csiMountMgr struct {
64	csiClientGetter
65	k8s                 kubernetes.Interface
66	plugin              *csiPlugin
67	driverName          csiDriverName
68	volumeLifecycleMode storage.VolumeLifecycleMode
69	fsGroupPolicy       storage.FSGroupPolicy
70	volumeID            string
71	specVolumeID        string
72	readOnly            bool
73	supportsSELinux     bool
74	spec                *volume.Spec
75	pod                 *api.Pod
76	podUID              types.UID
77	publishContext      map[string]string
78	kubeVolHost         volume.KubeletVolumeHost
79	volume.MetricsProvider
80}
81
82// volume.Volume methods
83var _ volume.Volume = &csiMountMgr{}
84
85func (c *csiMountMgr) GetPath() string {
86	dir := GetCSIMounterPath(filepath.Join(getTargetPath(c.podUID, c.specVolumeID, c.plugin.host)))
87	klog.V(4).Info(log("mounter.GetPath generated [%s]", dir))
88	return dir
89}
90
91func getTargetPath(uid types.UID, specVolumeID string, host volume.VolumeHost) string {
92	specVolID := utilstrings.EscapeQualifiedName(specVolumeID)
93	return host.GetPodVolumeDir(uid, utilstrings.EscapeQualifiedName(CSIPluginName), specVolID)
94}
95
96// volume.Mounter methods
97var _ volume.Mounter = &csiMountMgr{}
98
99func (c *csiMountMgr) CanMount() error {
100	return nil
101}
102
103func (c *csiMountMgr) SetUp(mounterArgs volume.MounterArgs) error {
104	return c.SetUpAt(c.GetPath(), mounterArgs)
105}
106
107func (c *csiMountMgr) SetUpAt(dir string, mounterArgs volume.MounterArgs) error {
108	klog.V(4).Infof(log("Mounter.SetUpAt(%s)", dir))
109
110	csi, err := c.csiClientGetter.Get()
111	if err != nil {
112		return volumetypes.NewTransientOperationFailure(log("mounter.SetUpAt failed to get CSI client: %v", err))
113
114	}
115	ctx, cancel := createCSIOperationContext(c.spec, csiTimeout)
116	defer cancel()
117
118	volSrc, pvSrc, err := getSourceFromSpec(c.spec)
119	if err != nil {
120		return errors.New(log("mounter.SetupAt failed to get CSI persistent source: %v", err))
121	}
122
123	driverName := c.driverName
124	volumeHandle := c.volumeID
125	readOnly := c.readOnly
126	accessMode := api.ReadWriteOnce
127
128	var (
129		fsType             string
130		volAttribs         map[string]string
131		nodePublishSecrets map[string]string
132		publishContext     map[string]string
133		mountOptions       []string
134		deviceMountPath    string
135		secretRef          *api.SecretReference
136	)
137
138	switch {
139	case volSrc != nil:
140		if !utilfeature.DefaultFeatureGate.Enabled(features.CSIInlineVolume) {
141			return fmt.Errorf("CSIInlineVolume feature required")
142		}
143		if c.volumeLifecycleMode != storage.VolumeLifecycleEphemeral {
144			return fmt.Errorf("unexpected volume mode: %s", c.volumeLifecycleMode)
145		}
146		if volSrc.FSType != nil {
147			fsType = *volSrc.FSType
148		}
149
150		volAttribs = volSrc.VolumeAttributes
151
152		if volSrc.NodePublishSecretRef != nil {
153			secretName := volSrc.NodePublishSecretRef.Name
154			ns := c.pod.Namespace
155			secretRef = &api.SecretReference{Name: secretName, Namespace: ns}
156		}
157	case pvSrc != nil:
158		if c.volumeLifecycleMode != storage.VolumeLifecyclePersistent {
159			return fmt.Errorf("unexpected driver mode: %s", c.volumeLifecycleMode)
160		}
161
162		fsType = pvSrc.FSType
163
164		volAttribs = pvSrc.VolumeAttributes
165
166		if pvSrc.NodePublishSecretRef != nil {
167			secretRef = pvSrc.NodePublishSecretRef
168		}
169
170		//TODO (vladimirvivien) implement better AccessModes mapping between k8s and CSI
171		if c.spec.PersistentVolume.Spec.AccessModes != nil {
172			accessMode = c.spec.PersistentVolume.Spec.AccessModes[0]
173		}
174
175		mountOptions = c.spec.PersistentVolume.Spec.MountOptions
176
177		// Check for STAGE_UNSTAGE_VOLUME set and populate deviceMountPath if so
178		stageUnstageSet, err := csi.NodeSupportsStageUnstage(ctx)
179		if err != nil {
180			return errors.New(log("mounter.SetUpAt failed to check for STAGE_UNSTAGE_VOLUME capability: %v", err))
181		}
182
183		if stageUnstageSet {
184			deviceMountPath, err = makeDeviceMountPath(c.plugin, c.spec)
185			if err != nil {
186				return errors.New(log("mounter.SetUpAt failed to make device mount path: %v", err))
187			}
188		}
189
190		// search for attachment by VolumeAttachment.Spec.Source.PersistentVolumeName
191		if c.publishContext == nil {
192			nodeName := string(c.plugin.host.GetNodeName())
193			c.publishContext, err = c.plugin.getPublishContext(c.k8s, volumeHandle, string(driverName), nodeName)
194			if err != nil {
195				// we could have a transient error associated with fetching publish context
196				return volumetypes.NewTransientOperationFailure(log("mounter.SetUpAt failed to fetch publishContext: %v", err))
197			}
198			publishContext = c.publishContext
199		}
200
201	default:
202		return fmt.Errorf("volume source not found in volume.Spec")
203	}
204
205	// create target_dir before call to NodePublish
206	parentDir := filepath.Dir(dir)
207	if err := os.MkdirAll(parentDir, 0750); err != nil {
208		return errors.New(log("mounter.SetUpAt failed to create dir %#v:  %v", parentDir, err))
209	}
210	klog.V(4).Info(log("created target path successfully [%s]", parentDir))
211
212	nodePublishSecrets = map[string]string{}
213	if secretRef != nil {
214		nodePublishSecrets, err = getCredentialsFromSecret(c.k8s, secretRef)
215		if err != nil {
216			return volumetypes.NewTransientOperationFailure(fmt.Sprintf("fetching NodePublishSecretRef %s/%s failed: %v",
217				secretRef.Namespace, secretRef.Name, err))
218		}
219
220	}
221
222	// Inject pod information into volume_attributes
223	podInfoEnabled, err := c.plugin.podInfoEnabled(string(c.driverName))
224	if err != nil {
225		return volumetypes.NewTransientOperationFailure(log("mounter.SetUpAt failed to assemble volume attributes: %v", err))
226	}
227	if podInfoEnabled {
228		volAttribs = mergeMap(volAttribs, getPodInfoAttrs(c.pod, c.volumeLifecycleMode))
229	}
230
231	// Inject pod service account token into volume attributes
232	serviceAccountTokenAttrs, err := c.podServiceAccountTokenAttrs()
233	if err != nil {
234		return volumetypes.NewTransientOperationFailure(log("mounter.SetUpAt failed to get service accoount token attributes: %v", err))
235	}
236	volAttribs = mergeMap(volAttribs, serviceAccountTokenAttrs)
237
238	driverSupportsCSIVolumeMountGroup := false
239	var nodePublishFSGroupArg *int64
240	if utilfeature.DefaultFeatureGate.Enabled(features.DelegateFSGroupToCSIDriver) {
241		driverSupportsCSIVolumeMountGroup, err = csi.NodeSupportsVolumeMountGroup(ctx)
242		if err != nil {
243			return volumetypes.NewTransientOperationFailure(log("mounter.SetUpAt failed to determine if the node service has VOLUME_MOUNT_GROUP capability: %v", err))
244		}
245
246		if driverSupportsCSIVolumeMountGroup {
247			klog.V(3).Infof("Driver %s supports applying FSGroup (has VOLUME_MOUNT_GROUP node capability). Delegating FSGroup application to the driver through NodePublishVolume.", c.driverName)
248			nodePublishFSGroupArg = mounterArgs.FsGroup
249		}
250	}
251
252	err = csi.NodePublishVolume(
253		ctx,
254		volumeHandle,
255		readOnly,
256		deviceMountPath,
257		dir,
258		accessMode,
259		publishContext,
260		volAttribs,
261		nodePublishSecrets,
262		fsType,
263		mountOptions,
264		nodePublishFSGroupArg,
265	)
266
267	if err != nil {
268		// If operation finished with error then we can remove the mount directory.
269		if volumetypes.IsOperationFinishedError(err) {
270			if removeMountDirErr := removeMountDir(c.plugin, dir); removeMountDirErr != nil {
271				klog.Error(log("mounter.SetupAt failed to remove mount dir after a NodePublish() error [%s]: %v", dir, removeMountDirErr))
272			}
273		}
274		return err
275	}
276
277	c.supportsSELinux, err = c.kubeVolHost.GetHostUtil().GetSELinuxSupport(dir)
278	if err != nil {
279		klog.V(2).Info(log("error checking for SELinux support: %s", err))
280	}
281
282	if !driverSupportsCSIVolumeMountGroup && c.supportsFSGroup(fsType, mounterArgs.FsGroup, c.fsGroupPolicy) {
283		// Driver doesn't support applying FSGroup. Kubelet must apply it instead.
284
285		// fullPluginName helps to distinguish different driver from csi plugin
286		err := volume.SetVolumeOwnership(c, mounterArgs.FsGroup, mounterArgs.FSGroupChangePolicy, util.FSGroupCompleteHook(c.plugin, c.spec))
287		if err != nil {
288			// At this point mount operation is successful:
289			//   1. Since volume can not be used by the pod because of invalid permissions, we must return error
290			//   2. Since mount is successful, we must record volume as mounted in uncertain state, so it can be
291			//      cleaned up.
292			return volumetypes.NewUncertainProgressError(fmt.Sprintf("applyFSGroup failed for vol %s: %v", c.volumeID, err))
293		}
294		klog.V(4).Info(log("mounter.SetupAt fsGroup [%d] applied successfully to %s", *mounterArgs.FsGroup, c.volumeID))
295	}
296
297	klog.V(4).Infof(log("mounter.SetUp successfully requested NodePublish [%s]", dir))
298	return nil
299}
300
301func (c *csiMountMgr) podServiceAccountTokenAttrs() (map[string]string, error) {
302	if c.plugin.serviceAccountTokenGetter == nil {
303		return nil, errors.New("ServiceAccountTokenGetter is nil")
304	}
305
306	csiDriver, err := c.plugin.csiDriverLister.Get(string(c.driverName))
307	if err != nil {
308		if apierrors.IsNotFound(err) {
309			klog.V(5).Infof(log("CSIDriver %q not found, not adding service account token information", c.driverName))
310			return nil, nil
311		}
312		return nil, err
313	}
314
315	if len(csiDriver.Spec.TokenRequests) == 0 {
316		return nil, nil
317	}
318
319	outputs := map[string]authenticationv1.TokenRequestStatus{}
320	for _, tokenRequest := range csiDriver.Spec.TokenRequests {
321		audience := tokenRequest.Audience
322		audiences := []string{audience}
323		if audience == "" {
324			audiences = []string{}
325		}
326		tr, err := c.plugin.serviceAccountTokenGetter(c.pod.Namespace, c.pod.Spec.ServiceAccountName, &authenticationv1.TokenRequest{
327			Spec: authenticationv1.TokenRequestSpec{
328				Audiences:         audiences,
329				ExpirationSeconds: tokenRequest.ExpirationSeconds,
330				BoundObjectRef: &authenticationv1.BoundObjectReference{
331					APIVersion: "v1",
332					Kind:       "Pod",
333					Name:       c.pod.Name,
334					UID:        c.pod.UID,
335				},
336			},
337		})
338		if err != nil {
339			return nil, err
340		}
341
342		outputs[audience] = tr.Status
343	}
344
345	klog.V(4).Infof(log("Fetched service account token attrs for CSIDriver %q", c.driverName))
346	tokens, _ := json.Marshal(outputs)
347	return map[string]string{
348		"csi.storage.k8s.io/serviceAccount.tokens": string(tokens),
349	}, nil
350}
351
352func (c *csiMountMgr) GetAttributes() volume.Attributes {
353	return volume.Attributes{
354		ReadOnly:        c.readOnly,
355		Managed:         !c.readOnly,
356		SupportsSELinux: c.supportsSELinux,
357	}
358}
359
360// volume.Unmounter methods
361var _ volume.Unmounter = &csiMountMgr{}
362
363func (c *csiMountMgr) TearDown() error {
364	return c.TearDownAt(c.GetPath())
365}
366func (c *csiMountMgr) TearDownAt(dir string) error {
367	klog.V(4).Infof(log("Unmounter.TearDown(%s)", dir))
368
369	volID := c.volumeID
370	csi, err := c.csiClientGetter.Get()
371	if err != nil {
372		return errors.New(log("mounter.SetUpAt failed to get CSI client: %v", err))
373	}
374
375	// Could not get spec info on whether this is a migrated operation because c.spec is nil
376	ctx, cancel := createCSIOperationContext(c.spec, csiTimeout)
377	defer cancel()
378
379	if err := csi.NodeUnpublishVolume(ctx, volID, dir); err != nil {
380		return errors.New(log("mounter.TearDownAt failed: %v", err))
381	}
382
383	// Deprecation: Removal of target_path provided in the NodePublish RPC call
384	// (in this case location `dir`) MUST be done by the CSI plugin according
385	// to the spec. This will no longer be done directly as part of TearDown
386	// by the kubelet in the future. Kubelet will only be responsible for
387	// removal of json data files it creates and parent directories.
388	if err := removeMountDir(c.plugin, dir); err != nil {
389		return errors.New(log("mounter.TearDownAt failed to clean mount dir [%s]: %v", dir, err))
390	}
391	klog.V(4).Infof(log("mounter.TearDownAt successfully unmounted dir [%s]", dir))
392
393	return nil
394}
395
396func (c *csiMountMgr) supportsFSGroup(fsType string, fsGroup *int64, driverPolicy storage.FSGroupPolicy) bool {
397	if fsGroup == nil || driverPolicy == storage.NoneFSGroupPolicy || c.readOnly {
398		return false
399	}
400
401	if driverPolicy == storage.FileFSGroupPolicy {
402		return true
403	}
404
405	if fsType == "" {
406		klog.V(4).Info(log("mounter.SetupAt WARNING: skipping fsGroup, fsType not provided"))
407		return false
408	}
409
410	if c.spec.PersistentVolume == nil {
411		klog.V(4).Info(log("mounter.SetupAt Warning: skipping fsGroup permission change, no access mode available. The volume may only be accessible to root users."))
412		return false
413	}
414	if c.spec.PersistentVolume.Spec.AccessModes == nil {
415		klog.V(4).Info(log("mounter.SetupAt WARNING: skipping fsGroup, access modes not provided"))
416		return false
417	}
418	if !hasReadWriteOnce(c.spec.PersistentVolume.Spec.AccessModes) {
419		klog.V(4).Info(log("mounter.SetupAt WARNING: skipping fsGroup, only support ReadWriteOnce access mode"))
420		return false
421	}
422	return true
423}
424
425// isDirMounted returns the !notMounted result from IsLikelyNotMountPoint check
426func isDirMounted(plug *csiPlugin, dir string) (bool, error) {
427	mounter := plug.host.GetMounter(plug.GetPluginName())
428	notMnt, err := mounter.IsLikelyNotMountPoint(dir)
429	if err != nil && !os.IsNotExist(err) {
430		klog.Error(log("isDirMounted IsLikelyNotMountPoint test failed for dir [%v]", dir))
431		return false, err
432	}
433	return !notMnt, nil
434}
435
436func isCorruptedDir(dir string) bool {
437	_, pathErr := mount.PathExists(dir)
438	return pathErr != nil && mount.IsCorruptedMnt(pathErr)
439}
440
441// removeMountDir cleans the mount dir when dir is not mounted and removed the volume data file in dir
442func removeMountDir(plug *csiPlugin, mountPath string) error {
443	klog.V(4).Info(log("removing mount path [%s]", mountPath))
444
445	mnt, err := isDirMounted(plug, mountPath)
446	if err != nil {
447		return err
448	}
449	if !mnt {
450		klog.V(4).Info(log("dir not mounted, deleting it [%s]", mountPath))
451		if err := os.Remove(mountPath); err != nil && !os.IsNotExist(err) {
452			return errors.New(log("failed to remove dir [%s]: %v", mountPath, err))
453		}
454		// remove volume data file as well
455		volPath := filepath.Dir(mountPath)
456		dataFile := filepath.Join(volPath, volDataFileName)
457		klog.V(4).Info(log("also deleting volume info data file [%s]", dataFile))
458		if err := os.Remove(dataFile); err != nil && !os.IsNotExist(err) {
459			return errors.New(log("failed to delete volume data file [%s]: %v", dataFile, err))
460		}
461		// remove volume path
462		klog.V(4).Info(log("deleting volume path [%s]", volPath))
463		if err := os.Remove(volPath); err != nil && !os.IsNotExist(err) {
464			return errors.New(log("failed to delete volume path [%s]: %v", volPath, err))
465		}
466	}
467	return nil
468}
469
470// makeVolumeHandle returns csi-<sha256(podUID,volSourceSpecName)>
471func makeVolumeHandle(podUID, volSourceSpecName string) string {
472	result := sha256.Sum256([]byte(fmt.Sprintf("%s%s", podUID, volSourceSpecName)))
473	return fmt.Sprintf("csi-%x", result)
474}
475
476func mergeMap(first, second map[string]string) map[string]string {
477	if first == nil && second != nil {
478		return second
479	}
480	for k, v := range second {
481		first[k] = v
482	}
483	return first
484}
485