1/*
2Copyright 2016 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17// Package operationexecutor implements interfaces that enable execution of
18// attach, detach, mount, and unmount operations with a
19// nestedpendingoperations so that more than one operation is never triggered
20// on the same volume for the same pod.
21package operationexecutor
22
23import (
24	"fmt"
25	"time"
26
27	"k8s.io/klog/v2"
28	"k8s.io/mount-utils"
29
30	v1 "k8s.io/api/core/v1"
31	"k8s.io/apimachinery/pkg/api/resource"
32	"k8s.io/apimachinery/pkg/types"
33	"k8s.io/kubernetes/pkg/volume"
34	"k8s.io/kubernetes/pkg/volume/util"
35	"k8s.io/kubernetes/pkg/volume/util/hostutil"
36	"k8s.io/kubernetes/pkg/volume/util/nestedpendingoperations"
37	volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
38	"k8s.io/kubernetes/pkg/volume/util/volumepathhandler"
39)
40
41// OperationExecutor defines a set of operations for attaching, detaching,
42// mounting, or unmounting a volume that are executed with a NewNestedPendingOperations which
43// prevents more than one operation from being triggered on the same volume.
44//
45// These operations should be idempotent (for example, AttachVolume should
46// still succeed if the volume is already attached to the node, etc.). However,
47// they depend on the volume plugins to implement this behavior.
48//
49// Once an operation completes successfully, the actualStateOfWorld is updated
50// to indicate the volume is attached/detached/mounted/unmounted.
51//
52// If the OperationExecutor fails to start the operation because, for example,
53// an operation with the same UniqueVolumeName is already pending, a non-nil
54// error is returned.
55//
56// Once the operation is started, since it is executed asynchronously,
57// errors are simply logged and the goroutine is terminated without updating
58// actualStateOfWorld (callers are responsible for retrying as needed).
59//
60// Some of these operations may result in calls to the API server; callers are
61// responsible for rate limiting on errors.
62type OperationExecutor interface {
63	// AttachVolume attaches the volume to the node specified in volumeToAttach.
64	// It then updates the actual state of the world to reflect that.
65	AttachVolume(volumeToAttach VolumeToAttach, actualStateOfWorld ActualStateOfWorldAttacherUpdater) error
66
67	// VerifyVolumesAreAttachedPerNode verifies the given list of volumes to see whether they are still attached to the node.
68	// If any volume is not attached right now, it will update the actual state of the world to reflect that.
69	// Note that this operation could be operated concurrently with other attach/detach operations.
70	// In theory (but very unlikely in practise), race condition among these operations might mark volume as detached
71	// even if it is attached. But reconciler can correct this in a short period of time.
72	VerifyVolumesAreAttachedPerNode(AttachedVolumes []AttachedVolume, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) error
73
74	// VerifyVolumesAreAttached verifies volumes being used in entire cluster and if they are still attached to the node
75	// If any volume is not attached right now, it will update actual state of world to reflect that.
76	VerifyVolumesAreAttached(volumesToVerify map[types.NodeName][]AttachedVolume, actualStateOfWorld ActualStateOfWorldAttacherUpdater)
77
78	// DetachVolume detaches the volume from the node specified in
79	// volumeToDetach, and updates the actual state of the world to reflect
80	// that. If verifySafeToDetach is set, a call is made to the fetch the node
81	// object and it is used to verify that the volume does not exist in Node's
82	// Status.VolumesInUse list (operation fails with error if it is).
83	DetachVolume(volumeToDetach AttachedVolume, verifySafeToDetach bool, actualStateOfWorld ActualStateOfWorldAttacherUpdater) error
84
85	// If a volume has 'Filesystem' volumeMode, MountVolume mounts the
86	// volume to the pod specified in volumeToMount.
87	// Specifically it will:
88	// * Wait for the device to finish attaching (for attachable volumes only).
89	// * Mount device to global mount path (for attachable volumes only).
90	// * Update actual state of world to reflect volume is globally mounted (for
91	//   attachable volumes only).
92	// * Mount the volume to the pod specific path.
93	// * Update actual state of world to reflect volume is mounted to the pod
94	//   path.
95	// The parameter "isRemount" is informational and used to adjust logging
96	// verbosity. An initial mount is more log-worthy than a remount, for
97	// example.
98	//
99	// For 'Block' volumeMode, this method creates a symbolic link to
100	// the volume from both the pod specified in volumeToMount and global map path.
101	// Specifically it will:
102	// * Wait for the device to finish attaching (for attachable volumes only).
103	// * Update actual state of world to reflect volume is globally mounted/mapped.
104	// * Map volume to global map path using symbolic link.
105	// * Map the volume to the pod device map path using symbolic link.
106	// * Update actual state of world to reflect volume is mounted/mapped to the pod path.
107	MountVolume(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater, isRemount bool) error
108
109	// If a volume has 'Filesystem' volumeMode, UnmountVolume unmounts the
110	// volume from the pod specified in volumeToUnmount and updates the actual
111	// state of the world to reflect that.
112	//
113	// For 'Block' volumeMode, this method unmaps symbolic link to the volume
114	// from both the pod device map path in volumeToUnmount and global map path.
115	// And then, updates the actual state of the world to reflect that.
116	UnmountVolume(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, podsDir string) error
117
118	// If a volume has 'Filesystem' volumeMode, UnmountDevice unmounts the
119	// volumes global mount path from the device (for attachable volumes only,
120	// freeing it for detach. It then updates the actual state of the world to
121	// reflect that.
122	//
123	// For 'Block' volumeMode, this method checks number of symbolic links under
124	// global map path. If number of reference is zero, remove global map path
125	// directory and free a volume for detach.
126	// It then updates the actual state of the world to reflect that.
127	UnmountDevice(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, hostutil hostutil.HostUtils) error
128
129	// VerifyControllerAttachedVolume checks if the specified volume is present
130	// in the specified nodes AttachedVolumes Status field. It uses kubeClient
131	// to fetch the node object.
132	// If the volume is found, the actual state of the world is updated to mark
133	// the volume as attached.
134	// If the volume does not implement the attacher interface, it is assumed to
135	// be attached and the actual state of the world is updated accordingly.
136	// If the volume is not found or there is an error (fetching the node
137	// object, for example) then an error is returned which triggers exponential
138	// back off on retries.
139	VerifyControllerAttachedVolume(volumeToMount VolumeToMount, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) error
140
141	// IsOperationPending returns true if an operation for the given volumeName
142	// and one of podName or nodeName is pending, otherwise it returns false
143	IsOperationPending(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName, nodeName types.NodeName) bool
144	// ExpandInUseVolume will resize volume's file system to expected size without unmounting the volume.
145	ExpandInUseVolume(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) error
146	// ReconstructVolumeOperation construct a new volumeSpec and returns it created by plugin
147	ReconstructVolumeOperation(volumeMode v1.PersistentVolumeMode, plugin volume.VolumePlugin, mapperPlugin volume.BlockVolumePlugin, uid types.UID, podName volumetypes.UniquePodName, volumeSpecName string, volumePath string, pluginName string) (*volume.Spec, error)
148	// CheckVolumeExistenceOperation checks volume existence
149	CheckVolumeExistenceOperation(volumeSpec *volume.Spec, mountPath, volumeName string, mounter mount.Interface, uniqueVolumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName, podUID types.UID, attachable volume.AttachableVolumePlugin) (bool, error)
150}
151
152// NewOperationExecutor returns a new instance of OperationExecutor.
153func NewOperationExecutor(
154	operationGenerator OperationGenerator) OperationExecutor {
155
156	return &operationExecutor{
157		pendingOperations: nestedpendingoperations.NewNestedPendingOperations(
158			true /* exponentialBackOffOnError */),
159		operationGenerator: operationGenerator,
160	}
161}
162
163// MarkVolumeOpts is an struct to pass arguments to MountVolume functions
164type MarkVolumeOpts struct {
165	PodName             volumetypes.UniquePodName
166	PodUID              types.UID
167	VolumeName          v1.UniqueVolumeName
168	Mounter             volume.Mounter
169	BlockVolumeMapper   volume.BlockVolumeMapper
170	OuterVolumeSpecName string
171	VolumeGidVolume     string
172	VolumeSpec          *volume.Spec
173	VolumeMountState    VolumeMountState
174}
175
176// ActualStateOfWorldMounterUpdater defines a set of operations updating the actual
177// state of the world cache after successful mount/unmount.
178type ActualStateOfWorldMounterUpdater interface {
179	// Marks the specified volume as mounted to the specified pod
180	MarkVolumeAsMounted(markVolumeOpts MarkVolumeOpts) error
181
182	// Marks the specified volume as unmounted from the specified pod
183	MarkVolumeAsUnmounted(podName volumetypes.UniquePodName, volumeName v1.UniqueVolumeName) error
184
185	// MarkVolumeMountAsUncertain marks state of volume mount for the pod uncertain
186	MarkVolumeMountAsUncertain(markVolumeOpts MarkVolumeOpts) error
187
188	// Marks the specified volume as having been globally mounted.
189	MarkDeviceAsMounted(volumeName v1.UniqueVolumeName, devicePath, deviceMountPath string) error
190
191	// MarkDeviceAsUncertain marks device state in global mount path as uncertain
192	MarkDeviceAsUncertain(volumeName v1.UniqueVolumeName, devicePath, deviceMountPath string) error
193
194	// Marks the specified volume as having its global mount unmounted.
195	MarkDeviceAsUnmounted(volumeName v1.UniqueVolumeName) error
196
197	// Marks the specified volume's file system resize request is finished.
198	MarkVolumeAsResized(podName volumetypes.UniquePodName, volumeName v1.UniqueVolumeName) error
199
200	// GetDeviceMountState returns mount state of the device in global path
201	GetDeviceMountState(volumeName v1.UniqueVolumeName) DeviceMountState
202
203	// GetVolumeMountState returns mount state of the volume for the Pod
204	GetVolumeMountState(volumName v1.UniqueVolumeName, podName volumetypes.UniquePodName) VolumeMountState
205
206	// IsVolumeMountedElsewhere returns whether the supplied volume is mounted in a Pod other than the supplied one
207	IsVolumeMountedElsewhere(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName) bool
208
209	// MarkForInUseExpansionError marks the volume to have in-use error during expansion.
210	// volume expansion must not be retried for this volume
211	MarkForInUseExpansionError(volumeName v1.UniqueVolumeName)
212}
213
214// ActualStateOfWorldAttacherUpdater defines a set of operations updating the
215// actual state of the world cache after successful attach/detach/mount/unmount.
216type ActualStateOfWorldAttacherUpdater interface {
217	// Marks the specified volume as attached to the specified node.  If the
218	// volume name is supplied, that volume name will be used.  If not, the
219	// volume name is computed using the result from querying the plugin.
220	//
221	// TODO: in the future, we should be able to remove the volumeName
222	// argument to this method -- since it is used only for attachable
223	// volumes.  See issue 29695.
224	MarkVolumeAsAttached(volumeName v1.UniqueVolumeName, volumeSpec *volume.Spec, nodeName types.NodeName, devicePath string) error
225
226	// Marks the specified volume as *possibly* attached to the specified node.
227	// If an attach operation fails, the attach/detach controller does not know for certain if the volume is attached or not.
228	// If the volume name is supplied, that volume name will be used.  If not, the
229	// volume name is computed using the result from querying the plugin.
230	MarkVolumeAsUncertain(volumeName v1.UniqueVolumeName, volumeSpec *volume.Spec, nodeName types.NodeName) error
231
232	// Marks the specified volume as detached from the specified node
233	MarkVolumeAsDetached(volumeName v1.UniqueVolumeName, nodeName types.NodeName)
234
235	// Marks desire to detach the specified volume (remove the volume from the node's
236	// volumesToReportAsAttached list)
237	RemoveVolumeFromReportAsAttached(volumeName v1.UniqueVolumeName, nodeName types.NodeName) error
238
239	// Unmarks the desire to detach for the specified volume (add the volume back to
240	// the node's volumesToReportAsAttached list)
241	AddVolumeToReportAsAttached(volumeName v1.UniqueVolumeName, nodeName types.NodeName)
242}
243
244// VolumeLogger defines a set of operations for generating volume-related logging and error msgs
245type VolumeLogger interface {
246	// Creates a detailed msg that can be used in logs
247	// The msg format follows the pattern "<prefixMsg> <volume details> <suffixMsg>",
248	// where each implementation provides the volume details
249	GenerateMsgDetailed(prefixMsg, suffixMsg string) (detailedMsg string)
250
251	// Creates a detailed error that can be used in logs.
252	// The msg format follows the pattern "<prefixMsg> <volume details>: <err> ",
253	GenerateErrorDetailed(prefixMsg string, err error) (detailedErr error)
254
255	// Creates a simple msg that is user friendly and a detailed msg that can be used in logs
256	// The msg format follows the pattern "<prefixMsg> <volume details> <suffixMsg>",
257	// where each implementation provides the volume details
258	GenerateMsg(prefixMsg, suffixMsg string) (simpleMsg, detailedMsg string)
259
260	// Creates a simple error that is user friendly and a detailed error that can be used in logs.
261	// The msg format follows the pattern "<prefixMsg> <volume details>: <err> ",
262	GenerateError(prefixMsg string, err error) (simpleErr, detailedErr error)
263}
264
265// Generates an error string with the format ": <err>" if err exists
266func errSuffix(err error) string {
267	errStr := ""
268	if err != nil {
269		errStr = fmt.Sprintf(": %v", err)
270	}
271	return errStr
272}
273
274// Generate a detailed error msg for logs
275func generateVolumeMsgDetailed(prefixMsg, suffixMsg, volumeName, details string) (detailedMsg string) {
276	return fmt.Sprintf("%v for volume %q %v %v", prefixMsg, volumeName, details, suffixMsg)
277}
278
279// Generate a simplified error msg for events and a detailed error msg for logs
280func generateVolumeMsg(prefixMsg, suffixMsg, volumeName, details string) (simpleMsg, detailedMsg string) {
281	simpleMsg = fmt.Sprintf("%v for volume %q %v", prefixMsg, volumeName, suffixMsg)
282	return simpleMsg, generateVolumeMsgDetailed(prefixMsg, suffixMsg, volumeName, details)
283}
284
285// VolumeToAttach represents a volume that should be attached to a node.
286type VolumeToAttach struct {
287	// MultiAttachErrorReported indicates whether the multi-attach error has been reported for the given volume.
288	// It is used to prevent reporting the error from being reported more than once for a given volume.
289	MultiAttachErrorReported bool
290
291	// VolumeName is the unique identifier for the volume that should be
292	// attached.
293	VolumeName v1.UniqueVolumeName
294
295	// VolumeSpec is a volume spec containing the specification for the volume
296	// that should be attached.
297	VolumeSpec *volume.Spec
298
299	// NodeName is the identifier for the node that the volume should be
300	// attached to.
301	NodeName types.NodeName
302
303	// scheduledPods is a map containing the set of pods that reference this
304	// volume and are scheduled to the underlying node. The key in the map is
305	// the name of the pod and the value is a pod object containing more
306	// information about the pod.
307	ScheduledPods []*v1.Pod
308}
309
310// GenerateMsgDetailed returns detailed msgs for volumes to attach
311func (volume *VolumeToAttach) GenerateMsgDetailed(prefixMsg, suffixMsg string) (detailedMsg string) {
312	detailedStr := fmt.Sprintf("(UniqueName: %q) from node %q", volume.VolumeName, volume.NodeName)
313	volumeSpecName := "nil"
314	if volume.VolumeSpec != nil {
315		volumeSpecName = volume.VolumeSpec.Name()
316	}
317	return generateVolumeMsgDetailed(prefixMsg, suffixMsg, volumeSpecName, detailedStr)
318}
319
320// GenerateMsg returns simple and detailed msgs for volumes to attach
321func (volume *VolumeToAttach) GenerateMsg(prefixMsg, suffixMsg string) (simpleMsg, detailedMsg string) {
322	detailedStr := fmt.Sprintf("(UniqueName: %q) from node %q", volume.VolumeName, volume.NodeName)
323	volumeSpecName := "nil"
324	if volume.VolumeSpec != nil {
325		volumeSpecName = volume.VolumeSpec.Name()
326	}
327	return generateVolumeMsg(prefixMsg, suffixMsg, volumeSpecName, detailedStr)
328}
329
330// GenerateErrorDetailed returns detailed errors for volumes to attach
331func (volume *VolumeToAttach) GenerateErrorDetailed(prefixMsg string, err error) (detailedErr error) {
332	return fmt.Errorf(volume.GenerateMsgDetailed(prefixMsg, errSuffix(err)))
333}
334
335// GenerateError returns simple and detailed errors for volumes to attach
336func (volume *VolumeToAttach) GenerateError(prefixMsg string, err error) (simpleErr, detailedErr error) {
337	simpleMsg, detailedMsg := volume.GenerateMsg(prefixMsg, errSuffix(err))
338	return fmt.Errorf(simpleMsg), fmt.Errorf(detailedMsg)
339}
340
341// VolumeToMount represents a volume that should be attached to this node and
342// mounted to the PodName.
343type VolumeToMount struct {
344	// VolumeName is the unique identifier for the volume that should be
345	// mounted.
346	VolumeName v1.UniqueVolumeName
347
348	// PodName is the unique identifier for the pod that the volume should be
349	// mounted to after it is attached.
350	PodName volumetypes.UniquePodName
351
352	// VolumeSpec is a volume spec containing the specification for the volume
353	// that should be mounted. Used to create NewMounter. Used to generate
354	// InnerVolumeSpecName.
355	VolumeSpec *volume.Spec
356
357	// outerVolumeSpecName is the podSpec.Volume[x].Name of the volume. If the
358	// volume was referenced through a persistent volume claim, this contains
359	// the podSpec.Volume[x].Name of the persistent volume claim.
360	OuterVolumeSpecName string
361
362	// Pod to mount the volume to. Used to create NewMounter.
363	Pod *v1.Pod
364
365	// PluginIsAttachable indicates that the plugin for this volume implements
366	// the volume.Attacher interface
367	PluginIsAttachable bool
368
369	// PluginIsDeviceMountable indicates that the plugin for this volume implements
370	// the volume.DeviceMounter interface
371	PluginIsDeviceMountable bool
372
373	// VolumeGidValue contains the value of the GID annotation, if present.
374	VolumeGidValue string
375
376	// DevicePath contains the path on the node where the volume is attached.
377	// For non-attachable volumes this is empty.
378	DevicePath string
379
380	// ReportedInUse indicates that the volume was successfully added to the
381	// VolumesInUse field in the node's status.
382	ReportedInUse bool
383
384	// DesiredSizeLimit indicates the desired upper bound on the size of the volume
385	// (if so implemented)
386	DesiredSizeLimit *resource.Quantity
387}
388
389// DeviceMountState represents device mount state in a global path.
390type DeviceMountState string
391
392const (
393	// DeviceGloballyMounted means device has been globally mounted successfully
394	DeviceGloballyMounted DeviceMountState = "DeviceGloballyMounted"
395
396	// DeviceMountUncertain means device may not be mounted but a mount operation may be
397	// in-progress which can cause device mount to succeed.
398	DeviceMountUncertain DeviceMountState = "DeviceMountUncertain"
399
400	// DeviceNotMounted means device has not been mounted globally.
401	DeviceNotMounted DeviceMountState = "DeviceNotMounted"
402)
403
404// VolumeMountState represents volume mount state in a path local to the pod.
405type VolumeMountState string
406
407const (
408	// VolumeMounted means volume has been mounted in pod's local path
409	VolumeMounted VolumeMountState = "VolumeMounted"
410
411	// VolumeMountUncertain means volume may or may not be mounted in pods' local path
412	VolumeMountUncertain VolumeMountState = "VolumeMountUncertain"
413
414	// VolumeNotMounted means volume has not be mounted in pod's local path
415	VolumeNotMounted VolumeMountState = "VolumeNotMounted"
416)
417
418// GenerateMsgDetailed returns detailed msgs for volumes to mount
419func (volume *VolumeToMount) GenerateMsgDetailed(prefixMsg, suffixMsg string) (detailedMsg string) {
420	detailedStr := fmt.Sprintf("(UniqueName: %q) pod %q (UID: %q)", volume.VolumeName, volume.Pod.Name, volume.Pod.UID)
421	volumeSpecName := "nil"
422	if volume.VolumeSpec != nil {
423		volumeSpecName = volume.VolumeSpec.Name()
424	}
425	return generateVolumeMsgDetailed(prefixMsg, suffixMsg, volumeSpecName, detailedStr)
426}
427
428// GenerateMsg returns simple and detailed msgs for volumes to mount
429func (volume *VolumeToMount) GenerateMsg(prefixMsg, suffixMsg string) (simpleMsg, detailedMsg string) {
430	detailedStr := fmt.Sprintf("(UniqueName: %q) pod %q (UID: %q)", volume.VolumeName, volume.Pod.Name, volume.Pod.UID)
431	volumeSpecName := "nil"
432	if volume.VolumeSpec != nil {
433		volumeSpecName = volume.VolumeSpec.Name()
434	}
435	return generateVolumeMsg(prefixMsg, suffixMsg, volumeSpecName, detailedStr)
436}
437
438// GenerateErrorDetailed returns detailed errors for volumes to mount
439func (volume *VolumeToMount) GenerateErrorDetailed(prefixMsg string, err error) (detailedErr error) {
440	return fmt.Errorf(volume.GenerateMsgDetailed(prefixMsg, errSuffix(err)))
441}
442
443// GenerateError returns simple and detailed errors for volumes to mount
444func (volume *VolumeToMount) GenerateError(prefixMsg string, err error) (simpleErr, detailedErr error) {
445	simpleMsg, detailedMsg := volume.GenerateMsg(prefixMsg, errSuffix(err))
446	return fmt.Errorf(simpleMsg), fmt.Errorf(detailedMsg)
447}
448
449// AttachedVolume represents a volume that is attached to a node.
450type AttachedVolume struct {
451	// VolumeName is the unique identifier for the volume that is attached.
452	VolumeName v1.UniqueVolumeName
453
454	// VolumeSpec is the volume spec containing the specification for the
455	// volume that is attached.
456	VolumeSpec *volume.Spec
457
458	// NodeName is the identifier for the node that the volume is attached to.
459	NodeName types.NodeName
460
461	// PluginIsAttachable indicates that the plugin for this volume implements
462	// the volume.Attacher interface
463	PluginIsAttachable bool
464
465	// DevicePath contains the path on the node where the volume is attached.
466	// For non-attachable volumes this is empty.
467	DevicePath string
468
469	// DeviceMountPath contains the path on the node where the device should
470	// be mounted after it is attached.
471	DeviceMountPath string
472
473	// PluginName is the Unescaped Qualified name of the volume plugin used to
474	// attach and mount this volume.
475	PluginName string
476}
477
478// GenerateMsgDetailed returns detailed msgs for attached volumes
479func (volume *AttachedVolume) GenerateMsgDetailed(prefixMsg, suffixMsg string) (detailedMsg string) {
480	detailedStr := fmt.Sprintf("(UniqueName: %q) on node %q", volume.VolumeName, volume.NodeName)
481	volumeSpecName := "nil"
482	if volume.VolumeSpec != nil {
483		volumeSpecName = volume.VolumeSpec.Name()
484	}
485	return generateVolumeMsgDetailed(prefixMsg, suffixMsg, volumeSpecName, detailedStr)
486}
487
488// GenerateMsg returns simple and detailed msgs for attached volumes
489func (volume *AttachedVolume) GenerateMsg(prefixMsg, suffixMsg string) (simpleMsg, detailedMsg string) {
490	detailedStr := fmt.Sprintf("(UniqueName: %q) on node %q", volume.VolumeName, volume.NodeName)
491	volumeSpecName := "nil"
492	if volume.VolumeSpec != nil {
493		volumeSpecName = volume.VolumeSpec.Name()
494	}
495	return generateVolumeMsg(prefixMsg, suffixMsg, volumeSpecName, detailedStr)
496}
497
498// GenerateErrorDetailed returns detailed errors for attached volumes
499func (volume *AttachedVolume) GenerateErrorDetailed(prefixMsg string, err error) (detailedErr error) {
500	return fmt.Errorf(volume.GenerateMsgDetailed(prefixMsg, errSuffix(err)))
501}
502
503// GenerateError returns simple and detailed errors for attached volumes
504func (volume *AttachedVolume) GenerateError(prefixMsg string, err error) (simpleErr, detailedErr error) {
505	simpleMsg, detailedMsg := volume.GenerateMsg(prefixMsg, errSuffix(err))
506	return fmt.Errorf(simpleMsg), fmt.Errorf(detailedMsg)
507}
508
509// MountedVolume represents a volume that has successfully been mounted to a pod.
510type MountedVolume struct {
511	// PodName is the unique identifier of the pod mounted to.
512	PodName volumetypes.UniquePodName
513
514	// VolumeName is the unique identifier of the volume mounted to the pod.
515	VolumeName v1.UniqueVolumeName
516
517	// InnerVolumeSpecName is the volume.Spec.Name() of the volume. If the
518	// volume was referenced through a persistent volume claims, this contains
519	// the name of the bound persistent volume object.
520	// It is the name that plugins use in their pod mount path, i.e.
521	// /var/lib/kubelet/pods/{podUID}/volumes/{escapeQualifiedPluginName}/{innerVolumeSpecName}/
522	// PVC example,
523	//   apiVersion: v1
524	//   kind: PersistentVolume
525	//   metadata:
526	//     name: pv0003				<- InnerVolumeSpecName
527	//   spec:
528	//     capacity:
529	//       storage: 5Gi
530	//     accessModes:
531	//       - ReadWriteOnce
532	//     persistentVolumeReclaimPolicy: Recycle
533	//     nfs:
534	//       path: /tmp
535	//       server: 172.17.0.2
536	// Non-PVC example:
537	//   apiVersion: v1
538	//   kind: Pod
539	//   metadata:
540	//     name: test-pd
541	//   spec:
542	//     containers:
543	//     - image: k8s.gcr.io/test-webserver
544	//     	 name: test-container
545	//     	 volumeMounts:
546	//     	 - mountPath: /test-pd
547	//     	   name: test-volume
548	//     volumes:
549	//     - name: test-volume			<- InnerVolumeSpecName
550	//     	 gcePersistentDisk:
551	//     	   pdName: my-data-disk
552	//     	   fsType: ext4
553	InnerVolumeSpecName string
554
555	// outerVolumeSpecName is the podSpec.Volume[x].Name of the volume. If the
556	// volume was referenced through a persistent volume claim, this contains
557	// the podSpec.Volume[x].Name of the persistent volume claim.
558	// PVC example:
559	//   kind: Pod
560	//   apiVersion: v1
561	//   metadata:
562	//     name: mypod
563	//   spec:
564	//     containers:
565	//       - name: myfrontend
566	//         image: dockerfile/nginx
567	//         volumeMounts:
568	//         - mountPath: "/var/www/html"
569	//           name: mypd
570	//     volumes:
571	//       - name: mypd				<- OuterVolumeSpecName
572	//         persistentVolumeClaim:
573	//           claimName: myclaim
574	// Non-PVC example:
575	//   apiVersion: v1
576	//   kind: Pod
577	//   metadata:
578	//     name: test-pd
579	//   spec:
580	//     containers:
581	//     - image: k8s.gcr.io/test-webserver
582	//     	 name: test-container
583	//     	 volumeMounts:
584	//     	 - mountPath: /test-pd
585	//     	   name: test-volume
586	//     volumes:
587	//     - name: test-volume			<- OuterVolumeSpecName
588	//     	 gcePersistentDisk:
589	//     	   pdName: my-data-disk
590	//     	   fsType: ext4
591	OuterVolumeSpecName string
592
593	// PluginName is the "Unescaped Qualified" name of the volume plugin used to
594	// mount and unmount this volume. It can be used to fetch the volume plugin
595	// to unmount with, on demand. It is also the name that plugins use, though
596	// escaped, in their pod mount path, i.e.
597	// /var/lib/kubelet/pods/{podUID}/volumes/{escapeQualifiedPluginName}/{outerVolumeSpecName}/
598	PluginName string
599
600	// PodUID is the UID of the pod mounted to. It is also the string used by
601	// plugins in their pod mount path, i.e.
602	// /var/lib/kubelet/pods/{podUID}/volumes/{escapeQualifiedPluginName}/{outerVolumeSpecName}/
603	PodUID types.UID
604
605	// Mounter is the volume mounter used to mount this volume. It is required
606	// by kubelet to create container.VolumeMap.
607	// Mounter is only required for file system volumes and not required for block volumes.
608	Mounter volume.Mounter
609
610	// BlockVolumeMapper is the volume mapper used to map this volume. It is required
611	// by kubelet to create container.VolumeMap.
612	// BlockVolumeMapper is only required for block volumes and not required for file system volumes.
613	BlockVolumeMapper volume.BlockVolumeMapper
614
615	// VolumeGidValue contains the value of the GID annotation, if present.
616	VolumeGidValue string
617
618	// VolumeSpec is a volume spec containing the specification for the volume
619	// that should be mounted.
620	VolumeSpec *volume.Spec
621
622	// DeviceMountPath contains the path on the node where the device should
623	// be mounted after it is attached.
624	DeviceMountPath string
625}
626
627// GenerateMsgDetailed returns detailed msgs for mounted volumes
628func (volume *MountedVolume) GenerateMsgDetailed(prefixMsg, suffixMsg string) (detailedMsg string) {
629	detailedStr := fmt.Sprintf("(UniqueName: %q) pod %q (UID: %q)", volume.VolumeName, volume.PodName, volume.PodUID)
630	return generateVolumeMsgDetailed(prefixMsg, suffixMsg, volume.OuterVolumeSpecName, detailedStr)
631}
632
633// GenerateMsg returns simple and detailed msgs for mounted volumes
634func (volume *MountedVolume) GenerateMsg(prefixMsg, suffixMsg string) (simpleMsg, detailedMsg string) {
635	detailedStr := fmt.Sprintf("(UniqueName: %q) pod %q (UID: %q)", volume.VolumeName, volume.PodName, volume.PodUID)
636	return generateVolumeMsg(prefixMsg, suffixMsg, volume.OuterVolumeSpecName, detailedStr)
637}
638
639// GenerateErrorDetailed returns simple and detailed errors for mounted volumes
640func (volume *MountedVolume) GenerateErrorDetailed(prefixMsg string, err error) (detailedErr error) {
641	return fmt.Errorf(volume.GenerateMsgDetailed(prefixMsg, errSuffix(err)))
642}
643
644// GenerateError returns simple and detailed errors for mounted volumes
645func (volume *MountedVolume) GenerateError(prefixMsg string, err error) (simpleErr, detailedErr error) {
646	simpleMsg, detailedMsg := volume.GenerateMsg(prefixMsg, errSuffix(err))
647	return fmt.Errorf(simpleMsg), fmt.Errorf(detailedMsg)
648}
649
650type operationExecutor struct {
651	// pendingOperations keeps track of pending attach and detach operations so
652	// multiple operations are not started on the same volume
653	pendingOperations nestedpendingoperations.NestedPendingOperations
654
655	// operationGenerator is an interface that provides implementations for
656	// generating volume function
657	operationGenerator OperationGenerator
658}
659
660func (oe *operationExecutor) IsOperationPending(
661	volumeName v1.UniqueVolumeName,
662	podName volumetypes.UniquePodName,
663	nodeName types.NodeName) bool {
664	return oe.pendingOperations.IsOperationPending(volumeName, podName, nodeName)
665}
666
667func (oe *operationExecutor) AttachVolume(
668	volumeToAttach VolumeToAttach,
669	actualStateOfWorld ActualStateOfWorldAttacherUpdater) error {
670	generatedOperations :=
671		oe.operationGenerator.GenerateAttachVolumeFunc(volumeToAttach, actualStateOfWorld)
672
673	if util.IsMultiAttachAllowed(volumeToAttach.VolumeSpec) {
674		return oe.pendingOperations.Run(
675			volumeToAttach.VolumeName, "" /* podName */, volumeToAttach.NodeName, generatedOperations)
676	}
677
678	return oe.pendingOperations.Run(
679		volumeToAttach.VolumeName, "" /* podName */, "" /* nodeName */, generatedOperations)
680}
681
682func (oe *operationExecutor) DetachVolume(
683	volumeToDetach AttachedVolume,
684	verifySafeToDetach bool,
685	actualStateOfWorld ActualStateOfWorldAttacherUpdater) error {
686	generatedOperations, err :=
687		oe.operationGenerator.GenerateDetachVolumeFunc(volumeToDetach, verifySafeToDetach, actualStateOfWorld)
688	if err != nil {
689		return err
690	}
691
692	if util.IsMultiAttachAllowed(volumeToDetach.VolumeSpec) {
693		return oe.pendingOperations.Run(
694			volumeToDetach.VolumeName, "" /* podName */, volumeToDetach.NodeName, generatedOperations)
695	}
696	return oe.pendingOperations.Run(
697		volumeToDetach.VolumeName, "" /* podName */, "" /* nodeName */, generatedOperations)
698
699}
700
701func (oe *operationExecutor) VerifyVolumesAreAttached(
702	attachedVolumes map[types.NodeName][]AttachedVolume,
703	actualStateOfWorld ActualStateOfWorldAttacherUpdater) {
704
705	// A map of plugin names and nodes on which they exist with volumes they manage
706	bulkVerifyPluginsByNode := make(map[string]map[types.NodeName][]*volume.Spec)
707	volumeSpecMapByPlugin := make(map[string]map[*volume.Spec]v1.UniqueVolumeName)
708
709	for node, nodeAttachedVolumes := range attachedVolumes {
710		needIndividualVerifyVolumes := []AttachedVolume{}
711		for _, volumeAttached := range nodeAttachedVolumes {
712			if volumeAttached.VolumeSpec == nil {
713				klog.Errorf("VerifyVolumesAreAttached: nil spec for volume %s", volumeAttached.VolumeName)
714				continue
715			}
716
717			volumePlugin, err :=
718				oe.operationGenerator.GetVolumePluginMgr().FindPluginBySpec(volumeAttached.VolumeSpec)
719			if err != nil {
720				klog.Errorf(
721					"VolumesAreAttached.FindPluginBySpec failed for volume %q (spec.Name: %q) on node %q with error: %v",
722					volumeAttached.VolumeName,
723					volumeAttached.VolumeSpec.Name(),
724					volumeAttached.NodeName,
725					err)
726				continue
727			}
728			if volumePlugin == nil {
729				// should never happen since FindPluginBySpec always returns error if volumePlugin = nil
730				klog.Errorf(
731					"Failed to find volume plugin for volume %q (spec.Name: %q) on node %q",
732					volumeAttached.VolumeName,
733					volumeAttached.VolumeSpec.Name(),
734					volumeAttached.NodeName)
735				continue
736			}
737
738			pluginName := volumePlugin.GetPluginName()
739
740			if volumePlugin.SupportsBulkVolumeVerification() {
741				pluginNodes, pluginNodesExist := bulkVerifyPluginsByNode[pluginName]
742
743				if !pluginNodesExist {
744					pluginNodes = make(map[types.NodeName][]*volume.Spec)
745				}
746
747				volumeSpecList, nodeExists := pluginNodes[node]
748				if !nodeExists {
749					volumeSpecList = []*volume.Spec{}
750				}
751				volumeSpecList = append(volumeSpecList, volumeAttached.VolumeSpec)
752				pluginNodes[node] = volumeSpecList
753
754				bulkVerifyPluginsByNode[pluginName] = pluginNodes
755				volumeSpecMap, mapExists := volumeSpecMapByPlugin[pluginName]
756
757				if !mapExists {
758					volumeSpecMap = make(map[*volume.Spec]v1.UniqueVolumeName)
759				}
760				volumeSpecMap[volumeAttached.VolumeSpec] = volumeAttached.VolumeName
761				volumeSpecMapByPlugin[pluginName] = volumeSpecMap
762				continue
763			}
764			// If node doesn't support Bulk volume polling it is best to poll individually
765			needIndividualVerifyVolumes = append(needIndividualVerifyVolumes, volumeAttached)
766		}
767		nodeError := oe.VerifyVolumesAreAttachedPerNode(needIndividualVerifyVolumes, node, actualStateOfWorld)
768		if nodeError != nil {
769			klog.Errorf("VerifyVolumesAreAttached failed for volumes %v, node %q with error %v", needIndividualVerifyVolumes, node, nodeError)
770		}
771	}
772
773	for pluginName, pluginNodeVolumes := range bulkVerifyPluginsByNode {
774		generatedOperations, err := oe.operationGenerator.GenerateBulkVolumeVerifyFunc(
775			pluginNodeVolumes,
776			pluginName,
777			volumeSpecMapByPlugin[pluginName],
778			actualStateOfWorld)
779		if err != nil {
780			klog.Errorf("BulkVerifyVolumes.GenerateBulkVolumeVerifyFunc error bulk verifying volumes for plugin %q with  %v", pluginName, err)
781		}
782
783		// Ugly hack to ensure - we don't do parallel bulk polling of same volume plugin
784		uniquePluginName := v1.UniqueVolumeName(pluginName)
785		err = oe.pendingOperations.Run(uniquePluginName, "" /* Pod Name */, "" /* nodeName */, generatedOperations)
786		if err != nil {
787			klog.Errorf("BulkVerifyVolumes.Run Error bulk volume verification for plugin %q  with %v", pluginName, err)
788		}
789	}
790}
791
792func (oe *operationExecutor) VerifyVolumesAreAttachedPerNode(
793	attachedVolumes []AttachedVolume,
794	nodeName types.NodeName,
795	actualStateOfWorld ActualStateOfWorldAttacherUpdater) error {
796	generatedOperations, err :=
797		oe.operationGenerator.GenerateVolumesAreAttachedFunc(attachedVolumes, nodeName, actualStateOfWorld)
798	if err != nil {
799		return err
800	}
801
802	// Give an empty UniqueVolumeName so that this operation could be executed concurrently.
803	return oe.pendingOperations.Run("" /* volumeName */, "" /* podName */, "" /* nodeName */, generatedOperations)
804}
805
806func (oe *operationExecutor) MountVolume(
807	waitForAttachTimeout time.Duration,
808	volumeToMount VolumeToMount,
809	actualStateOfWorld ActualStateOfWorldMounterUpdater,
810	isRemount bool) error {
811	fsVolume, err := util.CheckVolumeModeFilesystem(volumeToMount.VolumeSpec)
812	if err != nil {
813		return err
814	}
815	var generatedOperations volumetypes.GeneratedOperations
816	if fsVolume {
817		// Filesystem volume case
818		// Mount/remount a volume when a volume is attached
819		generatedOperations = oe.operationGenerator.GenerateMountVolumeFunc(
820			waitForAttachTimeout, volumeToMount, actualStateOfWorld, isRemount)
821
822	} else {
823		// Block volume case
824		// Creates a map to device if a volume is attached
825		generatedOperations, err = oe.operationGenerator.GenerateMapVolumeFunc(
826			waitForAttachTimeout, volumeToMount, actualStateOfWorld)
827	}
828	if err != nil {
829		return err
830	}
831	// Avoid executing mount/map from multiple pods referencing the
832	// same volume in parallel
833	podName := nestedpendingoperations.EmptyUniquePodName
834
835	// TODO: remove this -- not necessary
836	if !volumeToMount.PluginIsAttachable && !volumeToMount.PluginIsDeviceMountable {
837		// volume plugins which are Non-attachable and Non-deviceMountable can execute mount for multiple pods
838		// referencing the same volume in parallel
839		podName = util.GetUniquePodName(volumeToMount.Pod)
840	}
841
842	// TODO mount_device
843	return oe.pendingOperations.Run(
844		volumeToMount.VolumeName, podName, "" /* nodeName */, generatedOperations)
845}
846
847func (oe *operationExecutor) UnmountVolume(
848	volumeToUnmount MountedVolume,
849	actualStateOfWorld ActualStateOfWorldMounterUpdater,
850	podsDir string) error {
851	fsVolume, err := util.CheckVolumeModeFilesystem(volumeToUnmount.VolumeSpec)
852	if err != nil {
853		return err
854	}
855	var generatedOperations volumetypes.GeneratedOperations
856	if fsVolume {
857		// Filesystem volume case
858		// Unmount a volume if a volume is mounted
859		generatedOperations, err = oe.operationGenerator.GenerateUnmountVolumeFunc(
860			volumeToUnmount, actualStateOfWorld, podsDir)
861	} else {
862		// Block volume case
863		// Unmap a volume if a volume is mapped
864		generatedOperations, err = oe.operationGenerator.GenerateUnmapVolumeFunc(
865			volumeToUnmount, actualStateOfWorld)
866	}
867	if err != nil {
868		return err
869	}
870	// All volume plugins can execute unmount/unmap for multiple pods referencing the
871	// same volume in parallel
872	podName := volumetypes.UniquePodName(volumeToUnmount.PodUID)
873
874	return oe.pendingOperations.Run(
875		volumeToUnmount.VolumeName, podName, "" /* nodeName */, generatedOperations)
876}
877
878func (oe *operationExecutor) UnmountDevice(
879	deviceToDetach AttachedVolume,
880	actualStateOfWorld ActualStateOfWorldMounterUpdater,
881	hostutil hostutil.HostUtils) error {
882	fsVolume, err := util.CheckVolumeModeFilesystem(deviceToDetach.VolumeSpec)
883	if err != nil {
884		return err
885	}
886	var generatedOperations volumetypes.GeneratedOperations
887	if fsVolume {
888		// Filesystem volume case
889		// Unmount and detach a device if a volume isn't referenced
890		generatedOperations, err = oe.operationGenerator.GenerateUnmountDeviceFunc(
891			deviceToDetach, actualStateOfWorld, hostutil)
892	} else {
893		// Block volume case
894		// Detach a device and remove loopback if a volume isn't referenced
895		generatedOperations, err = oe.operationGenerator.GenerateUnmapDeviceFunc(
896			deviceToDetach, actualStateOfWorld, hostutil)
897	}
898	if err != nil {
899		return err
900	}
901	// Avoid executing unmount/unmap device from multiple pods referencing
902	// the same volume in parallel
903	podName := nestedpendingoperations.EmptyUniquePodName
904
905	return oe.pendingOperations.Run(
906		deviceToDetach.VolumeName, podName, "" /* nodeName */, generatedOperations)
907}
908
909func (oe *operationExecutor) ExpandInUseVolume(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) error {
910	generatedOperations, err := oe.operationGenerator.GenerateExpandInUseVolumeFunc(volumeToMount, actualStateOfWorld)
911	if err != nil {
912		return err
913	}
914	return oe.pendingOperations.Run(volumeToMount.VolumeName, "", "" /* nodeName */, generatedOperations)
915}
916
917func (oe *operationExecutor) VerifyControllerAttachedVolume(
918	volumeToMount VolumeToMount,
919	nodeName types.NodeName,
920	actualStateOfWorld ActualStateOfWorldAttacherUpdater) error {
921	generatedOperations, err :=
922		oe.operationGenerator.GenerateVerifyControllerAttachedVolumeFunc(volumeToMount, nodeName, actualStateOfWorld)
923	if err != nil {
924		return err
925	}
926
927	return oe.pendingOperations.Run(
928		volumeToMount.VolumeName, "" /* podName */, "" /* nodeName */, generatedOperations)
929}
930
931// ReconstructVolumeOperation return a func to create volumeSpec from mount path
932func (oe *operationExecutor) ReconstructVolumeOperation(
933	volumeMode v1.PersistentVolumeMode,
934	plugin volume.VolumePlugin,
935	mapperPlugin volume.BlockVolumePlugin,
936	uid types.UID,
937	podName volumetypes.UniquePodName,
938	volumeSpecName string,
939	volumePath string,
940	pluginName string) (*volume.Spec, error) {
941
942	// Filesystem Volume case
943	if volumeMode == v1.PersistentVolumeFilesystem {
944		// Create volumeSpec from mount path
945		klog.V(5).Infof("Starting operationExecutor.ReconstructVolumepodName")
946		volumeSpec, err := plugin.ConstructVolumeSpec(volumeSpecName, volumePath)
947		if err != nil {
948			return nil, err
949		}
950		return volumeSpec, nil
951	}
952
953	// Block Volume case
954	// Create volumeSpec from mount path
955	klog.V(5).Infof("Starting operationExecutor.ReconstructVolume")
956
957	// volumePath contains volumeName on the path. In the case of block volume, {volumeName} is symbolic link
958	// corresponding to raw block device.
959	// ex. volumePath: pods/{podUid}}/{DefaultKubeletVolumeDevicesDirName}/{escapeQualifiedPluginName}/{volumeName}
960	volumeSpec, err := mapperPlugin.ConstructBlockVolumeSpec(uid, volumeSpecName, volumePath)
961	if err != nil {
962		return nil, err
963	}
964	return volumeSpec, nil
965}
966
967// CheckVolumeExistenceOperation checks mount path directory if volume still exists
968func (oe *operationExecutor) CheckVolumeExistenceOperation(
969	volumeSpec *volume.Spec,
970	mountPath, volumeName string,
971	mounter mount.Interface,
972	uniqueVolumeName v1.UniqueVolumeName,
973	podName volumetypes.UniquePodName,
974	podUID types.UID,
975	attachable volume.AttachableVolumePlugin) (bool, error) {
976	fsVolume, err := util.CheckVolumeModeFilesystem(volumeSpec)
977	if err != nil {
978		return false, err
979	}
980
981	// Filesystem Volume case
982	// For attachable volume case, check mount path directory if volume is still existing and mounted.
983	// Return true if volume is mounted.
984	if fsVolume {
985		if attachable != nil {
986			var isNotMount bool
987			var mountCheckErr error
988			if mounter == nil {
989				return false, fmt.Errorf("mounter was not set for a filesystem volume")
990			}
991			if isNotMount, mountCheckErr = mounter.IsLikelyNotMountPoint(mountPath); mountCheckErr != nil {
992				return false, fmt.Errorf("could not check whether the volume %q (spec.Name: %q) pod %q (UID: %q) is mounted with: %v",
993					uniqueVolumeName,
994					volumeName,
995					podName,
996					podUID,
997					mountCheckErr)
998			}
999			return !isNotMount, nil
1000		}
1001		// For non-attachable volume case, skip check and return true without mount point check
1002		// since plugins may not have volume mount point.
1003		return true, nil
1004	}
1005
1006	// Block Volume case
1007	// Check mount path directory if volume still exists, then return true if volume
1008	// is there. Either plugin is attachable or non-attachable, the plugin should
1009	// have symbolic link associated to raw block device under pod device map
1010	// if volume exists.
1011	blkutil := volumepathhandler.NewBlockVolumePathHandler()
1012	var islinkExist bool
1013	var checkErr error
1014	if islinkExist, checkErr = blkutil.IsSymlinkExist(mountPath); checkErr != nil {
1015		return false, fmt.Errorf("could not check whether the block volume %q (spec.Name: %q) pod %q (UID: %q) is mapped to: %v",
1016			uniqueVolumeName,
1017			volumeName,
1018			podName,
1019			podUID,
1020			checkErr)
1021	}
1022	return islinkExist, nil
1023}
1024