1/* 2Copyright 2016 The Kubernetes Authors. 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17// Package operationexecutor implements interfaces that enable execution of 18// attach, detach, mount, and unmount operations with a 19// nestedpendingoperations so that more than one operation is never triggered 20// on the same volume for the same pod. 21package operationexecutor 22 23import ( 24 "fmt" 25 "time" 26 27 "k8s.io/klog/v2" 28 "k8s.io/mount-utils" 29 30 v1 "k8s.io/api/core/v1" 31 "k8s.io/apimachinery/pkg/api/resource" 32 "k8s.io/apimachinery/pkg/types" 33 "k8s.io/kubernetes/pkg/volume" 34 "k8s.io/kubernetes/pkg/volume/util" 35 "k8s.io/kubernetes/pkg/volume/util/hostutil" 36 "k8s.io/kubernetes/pkg/volume/util/nestedpendingoperations" 37 volumetypes "k8s.io/kubernetes/pkg/volume/util/types" 38 "k8s.io/kubernetes/pkg/volume/util/volumepathhandler" 39) 40 41// OperationExecutor defines a set of operations for attaching, detaching, 42// mounting, or unmounting a volume that are executed with a NewNestedPendingOperations which 43// prevents more than one operation from being triggered on the same volume. 44// 45// These operations should be idempotent (for example, AttachVolume should 46// still succeed if the volume is already attached to the node, etc.). However, 47// they depend on the volume plugins to implement this behavior. 48// 49// Once an operation completes successfully, the actualStateOfWorld is updated 50// to indicate the volume is attached/detached/mounted/unmounted. 51// 52// If the OperationExecutor fails to start the operation because, for example, 53// an operation with the same UniqueVolumeName is already pending, a non-nil 54// error is returned. 55// 56// Once the operation is started, since it is executed asynchronously, 57// errors are simply logged and the goroutine is terminated without updating 58// actualStateOfWorld (callers are responsible for retrying as needed). 59// 60// Some of these operations may result in calls to the API server; callers are 61// responsible for rate limiting on errors. 62type OperationExecutor interface { 63 // AttachVolume attaches the volume to the node specified in volumeToAttach. 64 // It then updates the actual state of the world to reflect that. 65 AttachVolume(volumeToAttach VolumeToAttach, actualStateOfWorld ActualStateOfWorldAttacherUpdater) error 66 67 // VerifyVolumesAreAttachedPerNode verifies the given list of volumes to see whether they are still attached to the node. 68 // If any volume is not attached right now, it will update the actual state of the world to reflect that. 69 // Note that this operation could be operated concurrently with other attach/detach operations. 70 // In theory (but very unlikely in practise), race condition among these operations might mark volume as detached 71 // even if it is attached. But reconciler can correct this in a short period of time. 72 VerifyVolumesAreAttachedPerNode(AttachedVolumes []AttachedVolume, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) error 73 74 // VerifyVolumesAreAttached verifies volumes being used in entire cluster and if they are still attached to the node 75 // If any volume is not attached right now, it will update actual state of world to reflect that. 76 VerifyVolumesAreAttached(volumesToVerify map[types.NodeName][]AttachedVolume, actualStateOfWorld ActualStateOfWorldAttacherUpdater) 77 78 // DetachVolume detaches the volume from the node specified in 79 // volumeToDetach, and updates the actual state of the world to reflect 80 // that. If verifySafeToDetach is set, a call is made to the fetch the node 81 // object and it is used to verify that the volume does not exist in Node's 82 // Status.VolumesInUse list (operation fails with error if it is). 83 DetachVolume(volumeToDetach AttachedVolume, verifySafeToDetach bool, actualStateOfWorld ActualStateOfWorldAttacherUpdater) error 84 85 // If a volume has 'Filesystem' volumeMode, MountVolume mounts the 86 // volume to the pod specified in volumeToMount. 87 // Specifically it will: 88 // * Wait for the device to finish attaching (for attachable volumes only). 89 // * Mount device to global mount path (for attachable volumes only). 90 // * Update actual state of world to reflect volume is globally mounted (for 91 // attachable volumes only). 92 // * Mount the volume to the pod specific path. 93 // * Update actual state of world to reflect volume is mounted to the pod 94 // path. 95 // The parameter "isRemount" is informational and used to adjust logging 96 // verbosity. An initial mount is more log-worthy than a remount, for 97 // example. 98 // 99 // For 'Block' volumeMode, this method creates a symbolic link to 100 // the volume from both the pod specified in volumeToMount and global map path. 101 // Specifically it will: 102 // * Wait for the device to finish attaching (for attachable volumes only). 103 // * Update actual state of world to reflect volume is globally mounted/mapped. 104 // * Map volume to global map path using symbolic link. 105 // * Map the volume to the pod device map path using symbolic link. 106 // * Update actual state of world to reflect volume is mounted/mapped to the pod path. 107 MountVolume(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater, isRemount bool) error 108 109 // If a volume has 'Filesystem' volumeMode, UnmountVolume unmounts the 110 // volume from the pod specified in volumeToUnmount and updates the actual 111 // state of the world to reflect that. 112 // 113 // For 'Block' volumeMode, this method unmaps symbolic link to the volume 114 // from both the pod device map path in volumeToUnmount and global map path. 115 // And then, updates the actual state of the world to reflect that. 116 UnmountVolume(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, podsDir string) error 117 118 // If a volume has 'Filesystem' volumeMode, UnmountDevice unmounts the 119 // volumes global mount path from the device (for attachable volumes only, 120 // freeing it for detach. It then updates the actual state of the world to 121 // reflect that. 122 // 123 // For 'Block' volumeMode, this method checks number of symbolic links under 124 // global map path. If number of reference is zero, remove global map path 125 // directory and free a volume for detach. 126 // It then updates the actual state of the world to reflect that. 127 UnmountDevice(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, hostutil hostutil.HostUtils) error 128 129 // VerifyControllerAttachedVolume checks if the specified volume is present 130 // in the specified nodes AttachedVolumes Status field. It uses kubeClient 131 // to fetch the node object. 132 // If the volume is found, the actual state of the world is updated to mark 133 // the volume as attached. 134 // If the volume does not implement the attacher interface, it is assumed to 135 // be attached and the actual state of the world is updated accordingly. 136 // If the volume is not found or there is an error (fetching the node 137 // object, for example) then an error is returned which triggers exponential 138 // back off on retries. 139 VerifyControllerAttachedVolume(volumeToMount VolumeToMount, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) error 140 141 // IsOperationPending returns true if an operation for the given volumeName 142 // and one of podName or nodeName is pending, otherwise it returns false 143 IsOperationPending(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName, nodeName types.NodeName) bool 144 // ExpandInUseVolume will resize volume's file system to expected size without unmounting the volume. 145 ExpandInUseVolume(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) error 146 // ReconstructVolumeOperation construct a new volumeSpec and returns it created by plugin 147 ReconstructVolumeOperation(volumeMode v1.PersistentVolumeMode, plugin volume.VolumePlugin, mapperPlugin volume.BlockVolumePlugin, uid types.UID, podName volumetypes.UniquePodName, volumeSpecName string, volumePath string, pluginName string) (*volume.Spec, error) 148 // CheckVolumeExistenceOperation checks volume existence 149 CheckVolumeExistenceOperation(volumeSpec *volume.Spec, mountPath, volumeName string, mounter mount.Interface, uniqueVolumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName, podUID types.UID, attachable volume.AttachableVolumePlugin) (bool, error) 150} 151 152// NewOperationExecutor returns a new instance of OperationExecutor. 153func NewOperationExecutor( 154 operationGenerator OperationGenerator) OperationExecutor { 155 156 return &operationExecutor{ 157 pendingOperations: nestedpendingoperations.NewNestedPendingOperations( 158 true /* exponentialBackOffOnError */), 159 operationGenerator: operationGenerator, 160 } 161} 162 163// MarkVolumeOpts is an struct to pass arguments to MountVolume functions 164type MarkVolumeOpts struct { 165 PodName volumetypes.UniquePodName 166 PodUID types.UID 167 VolumeName v1.UniqueVolumeName 168 Mounter volume.Mounter 169 BlockVolumeMapper volume.BlockVolumeMapper 170 OuterVolumeSpecName string 171 VolumeGidVolume string 172 VolumeSpec *volume.Spec 173 VolumeMountState VolumeMountState 174} 175 176// ActualStateOfWorldMounterUpdater defines a set of operations updating the actual 177// state of the world cache after successful mount/unmount. 178type ActualStateOfWorldMounterUpdater interface { 179 // Marks the specified volume as mounted to the specified pod 180 MarkVolumeAsMounted(markVolumeOpts MarkVolumeOpts) error 181 182 // Marks the specified volume as unmounted from the specified pod 183 MarkVolumeAsUnmounted(podName volumetypes.UniquePodName, volumeName v1.UniqueVolumeName) error 184 185 // MarkVolumeMountAsUncertain marks state of volume mount for the pod uncertain 186 MarkVolumeMountAsUncertain(markVolumeOpts MarkVolumeOpts) error 187 188 // Marks the specified volume as having been globally mounted. 189 MarkDeviceAsMounted(volumeName v1.UniqueVolumeName, devicePath, deviceMountPath string) error 190 191 // MarkDeviceAsUncertain marks device state in global mount path as uncertain 192 MarkDeviceAsUncertain(volumeName v1.UniqueVolumeName, devicePath, deviceMountPath string) error 193 194 // Marks the specified volume as having its global mount unmounted. 195 MarkDeviceAsUnmounted(volumeName v1.UniqueVolumeName) error 196 197 // Marks the specified volume's file system resize request is finished. 198 MarkVolumeAsResized(podName volumetypes.UniquePodName, volumeName v1.UniqueVolumeName) error 199 200 // GetDeviceMountState returns mount state of the device in global path 201 GetDeviceMountState(volumeName v1.UniqueVolumeName) DeviceMountState 202 203 // GetVolumeMountState returns mount state of the volume for the Pod 204 GetVolumeMountState(volumName v1.UniqueVolumeName, podName volumetypes.UniquePodName) VolumeMountState 205 206 // IsVolumeMountedElsewhere returns whether the supplied volume is mounted in a Pod other than the supplied one 207 IsVolumeMountedElsewhere(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName) bool 208 209 // MarkForInUseExpansionError marks the volume to have in-use error during expansion. 210 // volume expansion must not be retried for this volume 211 MarkForInUseExpansionError(volumeName v1.UniqueVolumeName) 212} 213 214// ActualStateOfWorldAttacherUpdater defines a set of operations updating the 215// actual state of the world cache after successful attach/detach/mount/unmount. 216type ActualStateOfWorldAttacherUpdater interface { 217 // Marks the specified volume as attached to the specified node. If the 218 // volume name is supplied, that volume name will be used. If not, the 219 // volume name is computed using the result from querying the plugin. 220 // 221 // TODO: in the future, we should be able to remove the volumeName 222 // argument to this method -- since it is used only for attachable 223 // volumes. See issue 29695. 224 MarkVolumeAsAttached(volumeName v1.UniqueVolumeName, volumeSpec *volume.Spec, nodeName types.NodeName, devicePath string) error 225 226 // Marks the specified volume as *possibly* attached to the specified node. 227 // If an attach operation fails, the attach/detach controller does not know for certain if the volume is attached or not. 228 // If the volume name is supplied, that volume name will be used. If not, the 229 // volume name is computed using the result from querying the plugin. 230 MarkVolumeAsUncertain(volumeName v1.UniqueVolumeName, volumeSpec *volume.Spec, nodeName types.NodeName) error 231 232 // Marks the specified volume as detached from the specified node 233 MarkVolumeAsDetached(volumeName v1.UniqueVolumeName, nodeName types.NodeName) 234 235 // Marks desire to detach the specified volume (remove the volume from the node's 236 // volumesToReportAsAttached list) 237 RemoveVolumeFromReportAsAttached(volumeName v1.UniqueVolumeName, nodeName types.NodeName) error 238 239 // Unmarks the desire to detach for the specified volume (add the volume back to 240 // the node's volumesToReportAsAttached list) 241 AddVolumeToReportAsAttached(volumeName v1.UniqueVolumeName, nodeName types.NodeName) 242} 243 244// VolumeLogger defines a set of operations for generating volume-related logging and error msgs 245type VolumeLogger interface { 246 // Creates a detailed msg that can be used in logs 247 // The msg format follows the pattern "<prefixMsg> <volume details> <suffixMsg>", 248 // where each implementation provides the volume details 249 GenerateMsgDetailed(prefixMsg, suffixMsg string) (detailedMsg string) 250 251 // Creates a detailed error that can be used in logs. 252 // The msg format follows the pattern "<prefixMsg> <volume details>: <err> ", 253 GenerateErrorDetailed(prefixMsg string, err error) (detailedErr error) 254 255 // Creates a simple msg that is user friendly and a detailed msg that can be used in logs 256 // The msg format follows the pattern "<prefixMsg> <volume details> <suffixMsg>", 257 // where each implementation provides the volume details 258 GenerateMsg(prefixMsg, suffixMsg string) (simpleMsg, detailedMsg string) 259 260 // Creates a simple error that is user friendly and a detailed error that can be used in logs. 261 // The msg format follows the pattern "<prefixMsg> <volume details>: <err> ", 262 GenerateError(prefixMsg string, err error) (simpleErr, detailedErr error) 263} 264 265// Generates an error string with the format ": <err>" if err exists 266func errSuffix(err error) string { 267 errStr := "" 268 if err != nil { 269 errStr = fmt.Sprintf(": %v", err) 270 } 271 return errStr 272} 273 274// Generate a detailed error msg for logs 275func generateVolumeMsgDetailed(prefixMsg, suffixMsg, volumeName, details string) (detailedMsg string) { 276 return fmt.Sprintf("%v for volume %q %v %v", prefixMsg, volumeName, details, suffixMsg) 277} 278 279// Generate a simplified error msg for events and a detailed error msg for logs 280func generateVolumeMsg(prefixMsg, suffixMsg, volumeName, details string) (simpleMsg, detailedMsg string) { 281 simpleMsg = fmt.Sprintf("%v for volume %q %v", prefixMsg, volumeName, suffixMsg) 282 return simpleMsg, generateVolumeMsgDetailed(prefixMsg, suffixMsg, volumeName, details) 283} 284 285// VolumeToAttach represents a volume that should be attached to a node. 286type VolumeToAttach struct { 287 // MultiAttachErrorReported indicates whether the multi-attach error has been reported for the given volume. 288 // It is used to prevent reporting the error from being reported more than once for a given volume. 289 MultiAttachErrorReported bool 290 291 // VolumeName is the unique identifier for the volume that should be 292 // attached. 293 VolumeName v1.UniqueVolumeName 294 295 // VolumeSpec is a volume spec containing the specification for the volume 296 // that should be attached. 297 VolumeSpec *volume.Spec 298 299 // NodeName is the identifier for the node that the volume should be 300 // attached to. 301 NodeName types.NodeName 302 303 // scheduledPods is a map containing the set of pods that reference this 304 // volume and are scheduled to the underlying node. The key in the map is 305 // the name of the pod and the value is a pod object containing more 306 // information about the pod. 307 ScheduledPods []*v1.Pod 308} 309 310// GenerateMsgDetailed returns detailed msgs for volumes to attach 311func (volume *VolumeToAttach) GenerateMsgDetailed(prefixMsg, suffixMsg string) (detailedMsg string) { 312 detailedStr := fmt.Sprintf("(UniqueName: %q) from node %q", volume.VolumeName, volume.NodeName) 313 volumeSpecName := "nil" 314 if volume.VolumeSpec != nil { 315 volumeSpecName = volume.VolumeSpec.Name() 316 } 317 return generateVolumeMsgDetailed(prefixMsg, suffixMsg, volumeSpecName, detailedStr) 318} 319 320// GenerateMsg returns simple and detailed msgs for volumes to attach 321func (volume *VolumeToAttach) GenerateMsg(prefixMsg, suffixMsg string) (simpleMsg, detailedMsg string) { 322 detailedStr := fmt.Sprintf("(UniqueName: %q) from node %q", volume.VolumeName, volume.NodeName) 323 volumeSpecName := "nil" 324 if volume.VolumeSpec != nil { 325 volumeSpecName = volume.VolumeSpec.Name() 326 } 327 return generateVolumeMsg(prefixMsg, suffixMsg, volumeSpecName, detailedStr) 328} 329 330// GenerateErrorDetailed returns detailed errors for volumes to attach 331func (volume *VolumeToAttach) GenerateErrorDetailed(prefixMsg string, err error) (detailedErr error) { 332 return fmt.Errorf(volume.GenerateMsgDetailed(prefixMsg, errSuffix(err))) 333} 334 335// GenerateError returns simple and detailed errors for volumes to attach 336func (volume *VolumeToAttach) GenerateError(prefixMsg string, err error) (simpleErr, detailedErr error) { 337 simpleMsg, detailedMsg := volume.GenerateMsg(prefixMsg, errSuffix(err)) 338 return fmt.Errorf(simpleMsg), fmt.Errorf(detailedMsg) 339} 340 341// VolumeToMount represents a volume that should be attached to this node and 342// mounted to the PodName. 343type VolumeToMount struct { 344 // VolumeName is the unique identifier for the volume that should be 345 // mounted. 346 VolumeName v1.UniqueVolumeName 347 348 // PodName is the unique identifier for the pod that the volume should be 349 // mounted to after it is attached. 350 PodName volumetypes.UniquePodName 351 352 // VolumeSpec is a volume spec containing the specification for the volume 353 // that should be mounted. Used to create NewMounter. Used to generate 354 // InnerVolumeSpecName. 355 VolumeSpec *volume.Spec 356 357 // outerVolumeSpecName is the podSpec.Volume[x].Name of the volume. If the 358 // volume was referenced through a persistent volume claim, this contains 359 // the podSpec.Volume[x].Name of the persistent volume claim. 360 OuterVolumeSpecName string 361 362 // Pod to mount the volume to. Used to create NewMounter. 363 Pod *v1.Pod 364 365 // PluginIsAttachable indicates that the plugin for this volume implements 366 // the volume.Attacher interface 367 PluginIsAttachable bool 368 369 // PluginIsDeviceMountable indicates that the plugin for this volume implements 370 // the volume.DeviceMounter interface 371 PluginIsDeviceMountable bool 372 373 // VolumeGidValue contains the value of the GID annotation, if present. 374 VolumeGidValue string 375 376 // DevicePath contains the path on the node where the volume is attached. 377 // For non-attachable volumes this is empty. 378 DevicePath string 379 380 // ReportedInUse indicates that the volume was successfully added to the 381 // VolumesInUse field in the node's status. 382 ReportedInUse bool 383 384 // DesiredSizeLimit indicates the desired upper bound on the size of the volume 385 // (if so implemented) 386 DesiredSizeLimit *resource.Quantity 387} 388 389// DeviceMountState represents device mount state in a global path. 390type DeviceMountState string 391 392const ( 393 // DeviceGloballyMounted means device has been globally mounted successfully 394 DeviceGloballyMounted DeviceMountState = "DeviceGloballyMounted" 395 396 // DeviceMountUncertain means device may not be mounted but a mount operation may be 397 // in-progress which can cause device mount to succeed. 398 DeviceMountUncertain DeviceMountState = "DeviceMountUncertain" 399 400 // DeviceNotMounted means device has not been mounted globally. 401 DeviceNotMounted DeviceMountState = "DeviceNotMounted" 402) 403 404// VolumeMountState represents volume mount state in a path local to the pod. 405type VolumeMountState string 406 407const ( 408 // VolumeMounted means volume has been mounted in pod's local path 409 VolumeMounted VolumeMountState = "VolumeMounted" 410 411 // VolumeMountUncertain means volume may or may not be mounted in pods' local path 412 VolumeMountUncertain VolumeMountState = "VolumeMountUncertain" 413 414 // VolumeNotMounted means volume has not be mounted in pod's local path 415 VolumeNotMounted VolumeMountState = "VolumeNotMounted" 416) 417 418// GenerateMsgDetailed returns detailed msgs for volumes to mount 419func (volume *VolumeToMount) GenerateMsgDetailed(prefixMsg, suffixMsg string) (detailedMsg string) { 420 detailedStr := fmt.Sprintf("(UniqueName: %q) pod %q (UID: %q)", volume.VolumeName, volume.Pod.Name, volume.Pod.UID) 421 volumeSpecName := "nil" 422 if volume.VolumeSpec != nil { 423 volumeSpecName = volume.VolumeSpec.Name() 424 } 425 return generateVolumeMsgDetailed(prefixMsg, suffixMsg, volumeSpecName, detailedStr) 426} 427 428// GenerateMsg returns simple and detailed msgs for volumes to mount 429func (volume *VolumeToMount) GenerateMsg(prefixMsg, suffixMsg string) (simpleMsg, detailedMsg string) { 430 detailedStr := fmt.Sprintf("(UniqueName: %q) pod %q (UID: %q)", volume.VolumeName, volume.Pod.Name, volume.Pod.UID) 431 volumeSpecName := "nil" 432 if volume.VolumeSpec != nil { 433 volumeSpecName = volume.VolumeSpec.Name() 434 } 435 return generateVolumeMsg(prefixMsg, suffixMsg, volumeSpecName, detailedStr) 436} 437 438// GenerateErrorDetailed returns detailed errors for volumes to mount 439func (volume *VolumeToMount) GenerateErrorDetailed(prefixMsg string, err error) (detailedErr error) { 440 return fmt.Errorf(volume.GenerateMsgDetailed(prefixMsg, errSuffix(err))) 441} 442 443// GenerateError returns simple and detailed errors for volumes to mount 444func (volume *VolumeToMount) GenerateError(prefixMsg string, err error) (simpleErr, detailedErr error) { 445 simpleMsg, detailedMsg := volume.GenerateMsg(prefixMsg, errSuffix(err)) 446 return fmt.Errorf(simpleMsg), fmt.Errorf(detailedMsg) 447} 448 449// AttachedVolume represents a volume that is attached to a node. 450type AttachedVolume struct { 451 // VolumeName is the unique identifier for the volume that is attached. 452 VolumeName v1.UniqueVolumeName 453 454 // VolumeSpec is the volume spec containing the specification for the 455 // volume that is attached. 456 VolumeSpec *volume.Spec 457 458 // NodeName is the identifier for the node that the volume is attached to. 459 NodeName types.NodeName 460 461 // PluginIsAttachable indicates that the plugin for this volume implements 462 // the volume.Attacher interface 463 PluginIsAttachable bool 464 465 // DevicePath contains the path on the node where the volume is attached. 466 // For non-attachable volumes this is empty. 467 DevicePath string 468 469 // DeviceMountPath contains the path on the node where the device should 470 // be mounted after it is attached. 471 DeviceMountPath string 472 473 // PluginName is the Unescaped Qualified name of the volume plugin used to 474 // attach and mount this volume. 475 PluginName string 476} 477 478// GenerateMsgDetailed returns detailed msgs for attached volumes 479func (volume *AttachedVolume) GenerateMsgDetailed(prefixMsg, suffixMsg string) (detailedMsg string) { 480 detailedStr := fmt.Sprintf("(UniqueName: %q) on node %q", volume.VolumeName, volume.NodeName) 481 volumeSpecName := "nil" 482 if volume.VolumeSpec != nil { 483 volumeSpecName = volume.VolumeSpec.Name() 484 } 485 return generateVolumeMsgDetailed(prefixMsg, suffixMsg, volumeSpecName, detailedStr) 486} 487 488// GenerateMsg returns simple and detailed msgs for attached volumes 489func (volume *AttachedVolume) GenerateMsg(prefixMsg, suffixMsg string) (simpleMsg, detailedMsg string) { 490 detailedStr := fmt.Sprintf("(UniqueName: %q) on node %q", volume.VolumeName, volume.NodeName) 491 volumeSpecName := "nil" 492 if volume.VolumeSpec != nil { 493 volumeSpecName = volume.VolumeSpec.Name() 494 } 495 return generateVolumeMsg(prefixMsg, suffixMsg, volumeSpecName, detailedStr) 496} 497 498// GenerateErrorDetailed returns detailed errors for attached volumes 499func (volume *AttachedVolume) GenerateErrorDetailed(prefixMsg string, err error) (detailedErr error) { 500 return fmt.Errorf(volume.GenerateMsgDetailed(prefixMsg, errSuffix(err))) 501} 502 503// GenerateError returns simple and detailed errors for attached volumes 504func (volume *AttachedVolume) GenerateError(prefixMsg string, err error) (simpleErr, detailedErr error) { 505 simpleMsg, detailedMsg := volume.GenerateMsg(prefixMsg, errSuffix(err)) 506 return fmt.Errorf(simpleMsg), fmt.Errorf(detailedMsg) 507} 508 509// MountedVolume represents a volume that has successfully been mounted to a pod. 510type MountedVolume struct { 511 // PodName is the unique identifier of the pod mounted to. 512 PodName volumetypes.UniquePodName 513 514 // VolumeName is the unique identifier of the volume mounted to the pod. 515 VolumeName v1.UniqueVolumeName 516 517 // InnerVolumeSpecName is the volume.Spec.Name() of the volume. If the 518 // volume was referenced through a persistent volume claims, this contains 519 // the name of the bound persistent volume object. 520 // It is the name that plugins use in their pod mount path, i.e. 521 // /var/lib/kubelet/pods/{podUID}/volumes/{escapeQualifiedPluginName}/{innerVolumeSpecName}/ 522 // PVC example, 523 // apiVersion: v1 524 // kind: PersistentVolume 525 // metadata: 526 // name: pv0003 <- InnerVolumeSpecName 527 // spec: 528 // capacity: 529 // storage: 5Gi 530 // accessModes: 531 // - ReadWriteOnce 532 // persistentVolumeReclaimPolicy: Recycle 533 // nfs: 534 // path: /tmp 535 // server: 172.17.0.2 536 // Non-PVC example: 537 // apiVersion: v1 538 // kind: Pod 539 // metadata: 540 // name: test-pd 541 // spec: 542 // containers: 543 // - image: k8s.gcr.io/test-webserver 544 // name: test-container 545 // volumeMounts: 546 // - mountPath: /test-pd 547 // name: test-volume 548 // volumes: 549 // - name: test-volume <- InnerVolumeSpecName 550 // gcePersistentDisk: 551 // pdName: my-data-disk 552 // fsType: ext4 553 InnerVolumeSpecName string 554 555 // outerVolumeSpecName is the podSpec.Volume[x].Name of the volume. If the 556 // volume was referenced through a persistent volume claim, this contains 557 // the podSpec.Volume[x].Name of the persistent volume claim. 558 // PVC example: 559 // kind: Pod 560 // apiVersion: v1 561 // metadata: 562 // name: mypod 563 // spec: 564 // containers: 565 // - name: myfrontend 566 // image: dockerfile/nginx 567 // volumeMounts: 568 // - mountPath: "/var/www/html" 569 // name: mypd 570 // volumes: 571 // - name: mypd <- OuterVolumeSpecName 572 // persistentVolumeClaim: 573 // claimName: myclaim 574 // Non-PVC example: 575 // apiVersion: v1 576 // kind: Pod 577 // metadata: 578 // name: test-pd 579 // spec: 580 // containers: 581 // - image: k8s.gcr.io/test-webserver 582 // name: test-container 583 // volumeMounts: 584 // - mountPath: /test-pd 585 // name: test-volume 586 // volumes: 587 // - name: test-volume <- OuterVolumeSpecName 588 // gcePersistentDisk: 589 // pdName: my-data-disk 590 // fsType: ext4 591 OuterVolumeSpecName string 592 593 // PluginName is the "Unescaped Qualified" name of the volume plugin used to 594 // mount and unmount this volume. It can be used to fetch the volume plugin 595 // to unmount with, on demand. It is also the name that plugins use, though 596 // escaped, in their pod mount path, i.e. 597 // /var/lib/kubelet/pods/{podUID}/volumes/{escapeQualifiedPluginName}/{outerVolumeSpecName}/ 598 PluginName string 599 600 // PodUID is the UID of the pod mounted to. It is also the string used by 601 // plugins in their pod mount path, i.e. 602 // /var/lib/kubelet/pods/{podUID}/volumes/{escapeQualifiedPluginName}/{outerVolumeSpecName}/ 603 PodUID types.UID 604 605 // Mounter is the volume mounter used to mount this volume. It is required 606 // by kubelet to create container.VolumeMap. 607 // Mounter is only required for file system volumes and not required for block volumes. 608 Mounter volume.Mounter 609 610 // BlockVolumeMapper is the volume mapper used to map this volume. It is required 611 // by kubelet to create container.VolumeMap. 612 // BlockVolumeMapper is only required for block volumes and not required for file system volumes. 613 BlockVolumeMapper volume.BlockVolumeMapper 614 615 // VolumeGidValue contains the value of the GID annotation, if present. 616 VolumeGidValue string 617 618 // VolumeSpec is a volume spec containing the specification for the volume 619 // that should be mounted. 620 VolumeSpec *volume.Spec 621 622 // DeviceMountPath contains the path on the node where the device should 623 // be mounted after it is attached. 624 DeviceMountPath string 625} 626 627// GenerateMsgDetailed returns detailed msgs for mounted volumes 628func (volume *MountedVolume) GenerateMsgDetailed(prefixMsg, suffixMsg string) (detailedMsg string) { 629 detailedStr := fmt.Sprintf("(UniqueName: %q) pod %q (UID: %q)", volume.VolumeName, volume.PodName, volume.PodUID) 630 return generateVolumeMsgDetailed(prefixMsg, suffixMsg, volume.OuterVolumeSpecName, detailedStr) 631} 632 633// GenerateMsg returns simple and detailed msgs for mounted volumes 634func (volume *MountedVolume) GenerateMsg(prefixMsg, suffixMsg string) (simpleMsg, detailedMsg string) { 635 detailedStr := fmt.Sprintf("(UniqueName: %q) pod %q (UID: %q)", volume.VolumeName, volume.PodName, volume.PodUID) 636 return generateVolumeMsg(prefixMsg, suffixMsg, volume.OuterVolumeSpecName, detailedStr) 637} 638 639// GenerateErrorDetailed returns simple and detailed errors for mounted volumes 640func (volume *MountedVolume) GenerateErrorDetailed(prefixMsg string, err error) (detailedErr error) { 641 return fmt.Errorf(volume.GenerateMsgDetailed(prefixMsg, errSuffix(err))) 642} 643 644// GenerateError returns simple and detailed errors for mounted volumes 645func (volume *MountedVolume) GenerateError(prefixMsg string, err error) (simpleErr, detailedErr error) { 646 simpleMsg, detailedMsg := volume.GenerateMsg(prefixMsg, errSuffix(err)) 647 return fmt.Errorf(simpleMsg), fmt.Errorf(detailedMsg) 648} 649 650type operationExecutor struct { 651 // pendingOperations keeps track of pending attach and detach operations so 652 // multiple operations are not started on the same volume 653 pendingOperations nestedpendingoperations.NestedPendingOperations 654 655 // operationGenerator is an interface that provides implementations for 656 // generating volume function 657 operationGenerator OperationGenerator 658} 659 660func (oe *operationExecutor) IsOperationPending( 661 volumeName v1.UniqueVolumeName, 662 podName volumetypes.UniquePodName, 663 nodeName types.NodeName) bool { 664 return oe.pendingOperations.IsOperationPending(volumeName, podName, nodeName) 665} 666 667func (oe *operationExecutor) AttachVolume( 668 volumeToAttach VolumeToAttach, 669 actualStateOfWorld ActualStateOfWorldAttacherUpdater) error { 670 generatedOperations := 671 oe.operationGenerator.GenerateAttachVolumeFunc(volumeToAttach, actualStateOfWorld) 672 673 if util.IsMultiAttachAllowed(volumeToAttach.VolumeSpec) { 674 return oe.pendingOperations.Run( 675 volumeToAttach.VolumeName, "" /* podName */, volumeToAttach.NodeName, generatedOperations) 676 } 677 678 return oe.pendingOperations.Run( 679 volumeToAttach.VolumeName, "" /* podName */, "" /* nodeName */, generatedOperations) 680} 681 682func (oe *operationExecutor) DetachVolume( 683 volumeToDetach AttachedVolume, 684 verifySafeToDetach bool, 685 actualStateOfWorld ActualStateOfWorldAttacherUpdater) error { 686 generatedOperations, err := 687 oe.operationGenerator.GenerateDetachVolumeFunc(volumeToDetach, verifySafeToDetach, actualStateOfWorld) 688 if err != nil { 689 return err 690 } 691 692 if util.IsMultiAttachAllowed(volumeToDetach.VolumeSpec) { 693 return oe.pendingOperations.Run( 694 volumeToDetach.VolumeName, "" /* podName */, volumeToDetach.NodeName, generatedOperations) 695 } 696 return oe.pendingOperations.Run( 697 volumeToDetach.VolumeName, "" /* podName */, "" /* nodeName */, generatedOperations) 698 699} 700 701func (oe *operationExecutor) VerifyVolumesAreAttached( 702 attachedVolumes map[types.NodeName][]AttachedVolume, 703 actualStateOfWorld ActualStateOfWorldAttacherUpdater) { 704 705 // A map of plugin names and nodes on which they exist with volumes they manage 706 bulkVerifyPluginsByNode := make(map[string]map[types.NodeName][]*volume.Spec) 707 volumeSpecMapByPlugin := make(map[string]map[*volume.Spec]v1.UniqueVolumeName) 708 709 for node, nodeAttachedVolumes := range attachedVolumes { 710 needIndividualVerifyVolumes := []AttachedVolume{} 711 for _, volumeAttached := range nodeAttachedVolumes { 712 if volumeAttached.VolumeSpec == nil { 713 klog.Errorf("VerifyVolumesAreAttached: nil spec for volume %s", volumeAttached.VolumeName) 714 continue 715 } 716 717 volumePlugin, err := 718 oe.operationGenerator.GetVolumePluginMgr().FindPluginBySpec(volumeAttached.VolumeSpec) 719 if err != nil { 720 klog.Errorf( 721 "VolumesAreAttached.FindPluginBySpec failed for volume %q (spec.Name: %q) on node %q with error: %v", 722 volumeAttached.VolumeName, 723 volumeAttached.VolumeSpec.Name(), 724 volumeAttached.NodeName, 725 err) 726 continue 727 } 728 if volumePlugin == nil { 729 // should never happen since FindPluginBySpec always returns error if volumePlugin = nil 730 klog.Errorf( 731 "Failed to find volume plugin for volume %q (spec.Name: %q) on node %q", 732 volumeAttached.VolumeName, 733 volumeAttached.VolumeSpec.Name(), 734 volumeAttached.NodeName) 735 continue 736 } 737 738 pluginName := volumePlugin.GetPluginName() 739 740 if volumePlugin.SupportsBulkVolumeVerification() { 741 pluginNodes, pluginNodesExist := bulkVerifyPluginsByNode[pluginName] 742 743 if !pluginNodesExist { 744 pluginNodes = make(map[types.NodeName][]*volume.Spec) 745 } 746 747 volumeSpecList, nodeExists := pluginNodes[node] 748 if !nodeExists { 749 volumeSpecList = []*volume.Spec{} 750 } 751 volumeSpecList = append(volumeSpecList, volumeAttached.VolumeSpec) 752 pluginNodes[node] = volumeSpecList 753 754 bulkVerifyPluginsByNode[pluginName] = pluginNodes 755 volumeSpecMap, mapExists := volumeSpecMapByPlugin[pluginName] 756 757 if !mapExists { 758 volumeSpecMap = make(map[*volume.Spec]v1.UniqueVolumeName) 759 } 760 volumeSpecMap[volumeAttached.VolumeSpec] = volumeAttached.VolumeName 761 volumeSpecMapByPlugin[pluginName] = volumeSpecMap 762 continue 763 } 764 // If node doesn't support Bulk volume polling it is best to poll individually 765 needIndividualVerifyVolumes = append(needIndividualVerifyVolumes, volumeAttached) 766 } 767 nodeError := oe.VerifyVolumesAreAttachedPerNode(needIndividualVerifyVolumes, node, actualStateOfWorld) 768 if nodeError != nil { 769 klog.Errorf("VerifyVolumesAreAttached failed for volumes %v, node %q with error %v", needIndividualVerifyVolumes, node, nodeError) 770 } 771 } 772 773 for pluginName, pluginNodeVolumes := range bulkVerifyPluginsByNode { 774 generatedOperations, err := oe.operationGenerator.GenerateBulkVolumeVerifyFunc( 775 pluginNodeVolumes, 776 pluginName, 777 volumeSpecMapByPlugin[pluginName], 778 actualStateOfWorld) 779 if err != nil { 780 klog.Errorf("BulkVerifyVolumes.GenerateBulkVolumeVerifyFunc error bulk verifying volumes for plugin %q with %v", pluginName, err) 781 } 782 783 // Ugly hack to ensure - we don't do parallel bulk polling of same volume plugin 784 uniquePluginName := v1.UniqueVolumeName(pluginName) 785 err = oe.pendingOperations.Run(uniquePluginName, "" /* Pod Name */, "" /* nodeName */, generatedOperations) 786 if err != nil { 787 klog.Errorf("BulkVerifyVolumes.Run Error bulk volume verification for plugin %q with %v", pluginName, err) 788 } 789 } 790} 791 792func (oe *operationExecutor) VerifyVolumesAreAttachedPerNode( 793 attachedVolumes []AttachedVolume, 794 nodeName types.NodeName, 795 actualStateOfWorld ActualStateOfWorldAttacherUpdater) error { 796 generatedOperations, err := 797 oe.operationGenerator.GenerateVolumesAreAttachedFunc(attachedVolumes, nodeName, actualStateOfWorld) 798 if err != nil { 799 return err 800 } 801 802 // Give an empty UniqueVolumeName so that this operation could be executed concurrently. 803 return oe.pendingOperations.Run("" /* volumeName */, "" /* podName */, "" /* nodeName */, generatedOperations) 804} 805 806func (oe *operationExecutor) MountVolume( 807 waitForAttachTimeout time.Duration, 808 volumeToMount VolumeToMount, 809 actualStateOfWorld ActualStateOfWorldMounterUpdater, 810 isRemount bool) error { 811 fsVolume, err := util.CheckVolumeModeFilesystem(volumeToMount.VolumeSpec) 812 if err != nil { 813 return err 814 } 815 var generatedOperations volumetypes.GeneratedOperations 816 if fsVolume { 817 // Filesystem volume case 818 // Mount/remount a volume when a volume is attached 819 generatedOperations = oe.operationGenerator.GenerateMountVolumeFunc( 820 waitForAttachTimeout, volumeToMount, actualStateOfWorld, isRemount) 821 822 } else { 823 // Block volume case 824 // Creates a map to device if a volume is attached 825 generatedOperations, err = oe.operationGenerator.GenerateMapVolumeFunc( 826 waitForAttachTimeout, volumeToMount, actualStateOfWorld) 827 } 828 if err != nil { 829 return err 830 } 831 // Avoid executing mount/map from multiple pods referencing the 832 // same volume in parallel 833 podName := nestedpendingoperations.EmptyUniquePodName 834 835 // TODO: remove this -- not necessary 836 if !volumeToMount.PluginIsAttachable && !volumeToMount.PluginIsDeviceMountable { 837 // volume plugins which are Non-attachable and Non-deviceMountable can execute mount for multiple pods 838 // referencing the same volume in parallel 839 podName = util.GetUniquePodName(volumeToMount.Pod) 840 } 841 842 // TODO mount_device 843 return oe.pendingOperations.Run( 844 volumeToMount.VolumeName, podName, "" /* nodeName */, generatedOperations) 845} 846 847func (oe *operationExecutor) UnmountVolume( 848 volumeToUnmount MountedVolume, 849 actualStateOfWorld ActualStateOfWorldMounterUpdater, 850 podsDir string) error { 851 fsVolume, err := util.CheckVolumeModeFilesystem(volumeToUnmount.VolumeSpec) 852 if err != nil { 853 return err 854 } 855 var generatedOperations volumetypes.GeneratedOperations 856 if fsVolume { 857 // Filesystem volume case 858 // Unmount a volume if a volume is mounted 859 generatedOperations, err = oe.operationGenerator.GenerateUnmountVolumeFunc( 860 volumeToUnmount, actualStateOfWorld, podsDir) 861 } else { 862 // Block volume case 863 // Unmap a volume if a volume is mapped 864 generatedOperations, err = oe.operationGenerator.GenerateUnmapVolumeFunc( 865 volumeToUnmount, actualStateOfWorld) 866 } 867 if err != nil { 868 return err 869 } 870 // All volume plugins can execute unmount/unmap for multiple pods referencing the 871 // same volume in parallel 872 podName := volumetypes.UniquePodName(volumeToUnmount.PodUID) 873 874 return oe.pendingOperations.Run( 875 volumeToUnmount.VolumeName, podName, "" /* nodeName */, generatedOperations) 876} 877 878func (oe *operationExecutor) UnmountDevice( 879 deviceToDetach AttachedVolume, 880 actualStateOfWorld ActualStateOfWorldMounterUpdater, 881 hostutil hostutil.HostUtils) error { 882 fsVolume, err := util.CheckVolumeModeFilesystem(deviceToDetach.VolumeSpec) 883 if err != nil { 884 return err 885 } 886 var generatedOperations volumetypes.GeneratedOperations 887 if fsVolume { 888 // Filesystem volume case 889 // Unmount and detach a device if a volume isn't referenced 890 generatedOperations, err = oe.operationGenerator.GenerateUnmountDeviceFunc( 891 deviceToDetach, actualStateOfWorld, hostutil) 892 } else { 893 // Block volume case 894 // Detach a device and remove loopback if a volume isn't referenced 895 generatedOperations, err = oe.operationGenerator.GenerateUnmapDeviceFunc( 896 deviceToDetach, actualStateOfWorld, hostutil) 897 } 898 if err != nil { 899 return err 900 } 901 // Avoid executing unmount/unmap device from multiple pods referencing 902 // the same volume in parallel 903 podName := nestedpendingoperations.EmptyUniquePodName 904 905 return oe.pendingOperations.Run( 906 deviceToDetach.VolumeName, podName, "" /* nodeName */, generatedOperations) 907} 908 909func (oe *operationExecutor) ExpandInUseVolume(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) error { 910 generatedOperations, err := oe.operationGenerator.GenerateExpandInUseVolumeFunc(volumeToMount, actualStateOfWorld) 911 if err != nil { 912 return err 913 } 914 return oe.pendingOperations.Run(volumeToMount.VolumeName, "", "" /* nodeName */, generatedOperations) 915} 916 917func (oe *operationExecutor) VerifyControllerAttachedVolume( 918 volumeToMount VolumeToMount, 919 nodeName types.NodeName, 920 actualStateOfWorld ActualStateOfWorldAttacherUpdater) error { 921 generatedOperations, err := 922 oe.operationGenerator.GenerateVerifyControllerAttachedVolumeFunc(volumeToMount, nodeName, actualStateOfWorld) 923 if err != nil { 924 return err 925 } 926 927 return oe.pendingOperations.Run( 928 volumeToMount.VolumeName, "" /* podName */, "" /* nodeName */, generatedOperations) 929} 930 931// ReconstructVolumeOperation return a func to create volumeSpec from mount path 932func (oe *operationExecutor) ReconstructVolumeOperation( 933 volumeMode v1.PersistentVolumeMode, 934 plugin volume.VolumePlugin, 935 mapperPlugin volume.BlockVolumePlugin, 936 uid types.UID, 937 podName volumetypes.UniquePodName, 938 volumeSpecName string, 939 volumePath string, 940 pluginName string) (*volume.Spec, error) { 941 942 // Filesystem Volume case 943 if volumeMode == v1.PersistentVolumeFilesystem { 944 // Create volumeSpec from mount path 945 klog.V(5).Infof("Starting operationExecutor.ReconstructVolumepodName") 946 volumeSpec, err := plugin.ConstructVolumeSpec(volumeSpecName, volumePath) 947 if err != nil { 948 return nil, err 949 } 950 return volumeSpec, nil 951 } 952 953 // Block Volume case 954 // Create volumeSpec from mount path 955 klog.V(5).Infof("Starting operationExecutor.ReconstructVolume") 956 957 // volumePath contains volumeName on the path. In the case of block volume, {volumeName} is symbolic link 958 // corresponding to raw block device. 959 // ex. volumePath: pods/{podUid}}/{DefaultKubeletVolumeDevicesDirName}/{escapeQualifiedPluginName}/{volumeName} 960 volumeSpec, err := mapperPlugin.ConstructBlockVolumeSpec(uid, volumeSpecName, volumePath) 961 if err != nil { 962 return nil, err 963 } 964 return volumeSpec, nil 965} 966 967// CheckVolumeExistenceOperation checks mount path directory if volume still exists 968func (oe *operationExecutor) CheckVolumeExistenceOperation( 969 volumeSpec *volume.Spec, 970 mountPath, volumeName string, 971 mounter mount.Interface, 972 uniqueVolumeName v1.UniqueVolumeName, 973 podName volumetypes.UniquePodName, 974 podUID types.UID, 975 attachable volume.AttachableVolumePlugin) (bool, error) { 976 fsVolume, err := util.CheckVolumeModeFilesystem(volumeSpec) 977 if err != nil { 978 return false, err 979 } 980 981 // Filesystem Volume case 982 // For attachable volume case, check mount path directory if volume is still existing and mounted. 983 // Return true if volume is mounted. 984 if fsVolume { 985 if attachable != nil { 986 var isNotMount bool 987 var mountCheckErr error 988 if mounter == nil { 989 return false, fmt.Errorf("mounter was not set for a filesystem volume") 990 } 991 if isNotMount, mountCheckErr = mounter.IsLikelyNotMountPoint(mountPath); mountCheckErr != nil { 992 return false, fmt.Errorf("could not check whether the volume %q (spec.Name: %q) pod %q (UID: %q) is mounted with: %v", 993 uniqueVolumeName, 994 volumeName, 995 podName, 996 podUID, 997 mountCheckErr) 998 } 999 return !isNotMount, nil 1000 } 1001 // For non-attachable volume case, skip check and return true without mount point check 1002 // since plugins may not have volume mount point. 1003 return true, nil 1004 } 1005 1006 // Block Volume case 1007 // Check mount path directory if volume still exists, then return true if volume 1008 // is there. Either plugin is attachable or non-attachable, the plugin should 1009 // have symbolic link associated to raw block device under pod device map 1010 // if volume exists. 1011 blkutil := volumepathhandler.NewBlockVolumePathHandler() 1012 var islinkExist bool 1013 var checkErr error 1014 if islinkExist, checkErr = blkutil.IsSymlinkExist(mountPath); checkErr != nil { 1015 return false, fmt.Errorf("could not check whether the block volume %q (spec.Name: %q) pod %q (UID: %q) is mapped to: %v", 1016 uniqueVolumeName, 1017 volumeName, 1018 podName, 1019 podUID, 1020 checkErr) 1021 } 1022 return islinkExist, nil 1023} 1024