1/* 2Copyright 2016 The Kubernetes Authors. 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package operationexecutor 18 19import ( 20 "context" 21 goerrors "errors" 22 "fmt" 23 "os" 24 "path/filepath" 25 "strings" 26 "time" 27 28 v1 "k8s.io/api/core/v1" 29 "k8s.io/apimachinery/pkg/api/errors" 30 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" 31 "k8s.io/apimachinery/pkg/types" 32 utilfeature "k8s.io/apiserver/pkg/util/feature" 33 clientset "k8s.io/client-go/kubernetes" 34 "k8s.io/client-go/tools/record" 35 volerr "k8s.io/cloud-provider/volume/errors" 36 csitrans "k8s.io/csi-translation-lib" 37 "k8s.io/klog/v2" 38 v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" 39 "k8s.io/kubernetes/pkg/features" 40 kevents "k8s.io/kubernetes/pkg/kubelet/events" 41 "k8s.io/kubernetes/pkg/volume" 42 "k8s.io/kubernetes/pkg/volume/util" 43 "k8s.io/kubernetes/pkg/volume/util/hostutil" 44 volumetypes "k8s.io/kubernetes/pkg/volume/util/types" 45 "k8s.io/kubernetes/pkg/volume/util/volumepathhandler" 46) 47 48const ( 49 unknownVolumePlugin string = "UnknownVolumePlugin" 50 unknownAttachableVolumePlugin string = "UnknownAttachableVolumePlugin" 51) 52 53// InTreeToCSITranslator contains methods required to check migratable status 54// and perform translations from InTree PVs and Inline to CSI 55type InTreeToCSITranslator interface { 56 IsPVMigratable(pv *v1.PersistentVolume) bool 57 IsInlineMigratable(vol *v1.Volume) bool 58 IsMigratableIntreePluginByName(inTreePluginName string) bool 59 GetInTreePluginNameFromSpec(pv *v1.PersistentVolume, vol *v1.Volume) (string, error) 60 GetCSINameFromInTreeName(pluginName string) (string, error) 61 TranslateInTreePVToCSI(pv *v1.PersistentVolume) (*v1.PersistentVolume, error) 62 TranslateInTreeInlineVolumeToCSI(volume *v1.Volume, podNamespace string) (*v1.PersistentVolume, error) 63} 64 65var _ OperationGenerator = &operationGenerator{} 66 67type operationGenerator struct { 68 // Used to fetch objects from the API server like Node in the 69 // VerifyControllerAttachedVolume operation. 70 kubeClient clientset.Interface 71 72 // volumePluginMgr is the volume plugin manager used to create volume 73 // plugin objects. 74 volumePluginMgr *volume.VolumePluginMgr 75 76 // recorder is used to record events in the API server 77 recorder record.EventRecorder 78 79 // checkNodeCapabilitiesBeforeMount, if set, enables the CanMount check, 80 // which verifies that the components (binaries, etc.) required to mount 81 // the volume are available on the underlying node before attempting mount. 82 checkNodeCapabilitiesBeforeMount bool 83 84 // blkUtil provides volume path related operations for block volume 85 blkUtil volumepathhandler.BlockVolumePathHandler 86 87 translator InTreeToCSITranslator 88} 89 90// NewOperationGenerator is returns instance of operationGenerator 91func NewOperationGenerator(kubeClient clientset.Interface, 92 volumePluginMgr *volume.VolumePluginMgr, 93 recorder record.EventRecorder, 94 checkNodeCapabilitiesBeforeMount bool, 95 blkUtil volumepathhandler.BlockVolumePathHandler) OperationGenerator { 96 97 return &operationGenerator{ 98 kubeClient: kubeClient, 99 volumePluginMgr: volumePluginMgr, 100 recorder: recorder, 101 checkNodeCapabilitiesBeforeMount: checkNodeCapabilitiesBeforeMount, 102 blkUtil: blkUtil, 103 translator: csitrans.New(), 104 } 105} 106 107// OperationGenerator interface that extracts out the functions from operation_executor to make it dependency injectable 108type OperationGenerator interface { 109 // Generates the MountVolume function needed to perform the mount of a volume plugin 110 GenerateMountVolumeFunc(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorldMounterUpdater ActualStateOfWorldMounterUpdater, isRemount bool) volumetypes.GeneratedOperations 111 112 // Generates the UnmountVolume function needed to perform the unmount of a volume plugin 113 GenerateUnmountVolumeFunc(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, podsDir string) (volumetypes.GeneratedOperations, error) 114 115 // Generates the AttachVolume function needed to perform attach of a volume plugin 116 GenerateAttachVolumeFunc(volumeToAttach VolumeToAttach, actualStateOfWorld ActualStateOfWorldAttacherUpdater) volumetypes.GeneratedOperations 117 118 // Generates the DetachVolume function needed to perform the detach of a volume plugin 119 GenerateDetachVolumeFunc(volumeToDetach AttachedVolume, verifySafeToDetach bool, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) 120 121 // Generates the VolumesAreAttached function needed to verify if volume plugins are attached 122 GenerateVolumesAreAttachedFunc(attachedVolumes []AttachedVolume, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) 123 124 // Generates the UnMountDevice function needed to perform the unmount of a device 125 GenerateUnmountDeviceFunc(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter hostutil.HostUtils) (volumetypes.GeneratedOperations, error) 126 127 // Generates the function needed to check if the attach_detach controller has attached the volume plugin 128 GenerateVerifyControllerAttachedVolumeFunc(volumeToMount VolumeToMount, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) 129 130 // Generates the MapVolume function needed to perform the map of a volume plugin 131 GenerateMapVolumeFunc(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorldMounterUpdater ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) 132 133 // Generates the UnmapVolume function needed to perform the unmap of a volume plugin 134 GenerateUnmapVolumeFunc(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) 135 136 // Generates the UnmapDevice function needed to perform the unmap of a device 137 GenerateUnmapDeviceFunc(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter hostutil.HostUtils) (volumetypes.GeneratedOperations, error) 138 139 // GetVolumePluginMgr returns volume plugin manager 140 GetVolumePluginMgr() *volume.VolumePluginMgr 141 142 // GetCSITranslator returns the CSI Translation Library 143 GetCSITranslator() InTreeToCSITranslator 144 145 GenerateBulkVolumeVerifyFunc( 146 map[types.NodeName][]*volume.Spec, 147 string, 148 map[*volume.Spec]v1.UniqueVolumeName, ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) 149 150 GenerateExpandVolumeFunc(*v1.PersistentVolumeClaim, *v1.PersistentVolume) (volumetypes.GeneratedOperations, error) 151 152 // Generates the volume file system resize function, which can resize volume's file system to expected size without unmounting the volume. 153 GenerateExpandInUseVolumeFunc(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) 154} 155 156func (og *operationGenerator) GenerateVolumesAreAttachedFunc( 157 attachedVolumes []AttachedVolume, 158 nodeName types.NodeName, 159 actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) { 160 // volumesPerPlugin maps from a volume plugin to a list of volume specs which belong 161 // to this type of plugin 162 volumesPerPlugin := make(map[string][]*volume.Spec) 163 // volumeSpecMap maps from a volume spec to its unique volumeName which will be used 164 // when calling MarkVolumeAsDetached 165 volumeSpecMap := make(map[*volume.Spec]v1.UniqueVolumeName) 166 167 // Iterate each volume spec and put them into a map index by the pluginName 168 for _, volumeAttached := range attachedVolumes { 169 if volumeAttached.VolumeSpec == nil { 170 klog.Errorf("VerifyVolumesAreAttached.GenerateVolumesAreAttachedFunc: nil spec for volume %s", volumeAttached.VolumeName) 171 continue 172 } 173 volumePlugin, err := 174 og.volumePluginMgr.FindPluginBySpec(volumeAttached.VolumeSpec) 175 if err != nil || volumePlugin == nil { 176 klog.Errorf(volumeAttached.GenerateErrorDetailed("VolumesAreAttached.FindPluginBySpec failed", err).Error()) 177 continue 178 } 179 volumeSpecList, pluginExists := volumesPerPlugin[volumePlugin.GetPluginName()] 180 if !pluginExists { 181 volumeSpecList = []*volume.Spec{} 182 } 183 volumeSpecList = append(volumeSpecList, volumeAttached.VolumeSpec) 184 volumesPerPlugin[volumePlugin.GetPluginName()] = volumeSpecList 185 // Migration: VolumeSpecMap contains original VolumeName for use in ActualStateOfWorld 186 volumeSpecMap[volumeAttached.VolumeSpec] = volumeAttached.VolumeName 187 } 188 189 volumesAreAttachedFunc := func() volumetypes.OperationContext { 190 191 // For each volume plugin, pass the list of volume specs to VolumesAreAttached to check 192 // whether the volumes are still attached. 193 for pluginName, volumesSpecs := range volumesPerPlugin { 194 attachableVolumePlugin, err := 195 og.volumePluginMgr.FindAttachablePluginByName(pluginName) 196 if err != nil || attachableVolumePlugin == nil { 197 klog.Errorf( 198 "VolumeAreAttached.FindAttachablePluginBySpec failed for plugin %q with: %v", 199 pluginName, 200 err) 201 continue 202 } 203 204 volumeAttacher, newAttacherErr := attachableVolumePlugin.NewAttacher() 205 if newAttacherErr != nil { 206 klog.Errorf( 207 "VolumesAreAttached.NewAttacher failed for getting plugin %q with: %v", 208 pluginName, 209 newAttacherErr) 210 continue 211 } 212 213 attached, areAttachedErr := volumeAttacher.VolumesAreAttached(volumesSpecs, nodeName) 214 if areAttachedErr != nil { 215 klog.Errorf( 216 "VolumesAreAttached failed for checking on node %q with: %v", 217 nodeName, 218 areAttachedErr) 219 continue 220 } 221 222 for spec, check := range attached { 223 if !check { 224 actualStateOfWorld.MarkVolumeAsDetached(volumeSpecMap[spec], nodeName) 225 klog.V(1).Infof("VerifyVolumesAreAttached determined volume %q (spec.Name: %q) is no longer attached to node %q, therefore it was marked as detached.", 226 volumeSpecMap[spec], spec.Name(), nodeName) 227 } 228 } 229 } 230 231 // It is hard to differentiate migrated status for all volumes for verify_volumes_are_attached_per_node 232 return volumetypes.NewOperationContext(nil, nil, false) 233 } 234 235 return volumetypes.GeneratedOperations{ 236 OperationName: "verify_volumes_are_attached_per_node", 237 OperationFunc: volumesAreAttachedFunc, 238 CompleteFunc: util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume("<n/a>", nil), "verify_volumes_are_attached_per_node"), 239 EventRecorderFunc: nil, // nil because we do not want to generate event on error 240 }, nil 241} 242 243func (og *operationGenerator) GenerateBulkVolumeVerifyFunc( 244 pluginNodeVolumes map[types.NodeName][]*volume.Spec, 245 pluginName string, 246 volumeSpecMap map[*volume.Spec]v1.UniqueVolumeName, 247 actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) { 248 249 // Migration: All inputs already should be translated by caller for this 250 // function except volumeSpecMap which contains original volume names for 251 // use with actualStateOfWorld 252 253 bulkVolumeVerifyFunc := func() volumetypes.OperationContext { 254 attachableVolumePlugin, err := 255 og.volumePluginMgr.FindAttachablePluginByName(pluginName) 256 if err != nil || attachableVolumePlugin == nil { 257 klog.Errorf( 258 "BulkVerifyVolume.FindAttachablePluginBySpec failed for plugin %q with: %v", 259 pluginName, 260 err) 261 return volumetypes.NewOperationContext(nil, nil, false) 262 } 263 264 volumeAttacher, newAttacherErr := attachableVolumePlugin.NewAttacher() 265 266 if newAttacherErr != nil { 267 klog.Errorf( 268 "BulkVerifyVolume.NewAttacher failed for getting plugin %q with: %v", 269 attachableVolumePlugin, 270 newAttacherErr) 271 return volumetypes.NewOperationContext(nil, nil, false) 272 } 273 bulkVolumeVerifier, ok := volumeAttacher.(volume.BulkVolumeVerifier) 274 275 if !ok { 276 klog.Errorf("BulkVerifyVolume failed to type assert attacher %q", bulkVolumeVerifier) 277 return volumetypes.NewOperationContext(nil, nil, false) 278 } 279 280 attached, bulkAttachErr := bulkVolumeVerifier.BulkVerifyVolumes(pluginNodeVolumes) 281 if bulkAttachErr != nil { 282 klog.Errorf("BulkVerifyVolume.BulkVerifyVolumes Error checking volumes are attached with %v", bulkAttachErr) 283 return volumetypes.NewOperationContext(nil, nil, false) 284 } 285 286 for nodeName, volumeSpecs := range pluginNodeVolumes { 287 for _, volumeSpec := range volumeSpecs { 288 nodeVolumeSpecs, nodeChecked := attached[nodeName] 289 290 if !nodeChecked { 291 klog.V(2).Infof("VerifyVolumesAreAttached.BulkVerifyVolumes failed for node %q and leaving volume %q as attached", 292 nodeName, 293 volumeSpec.Name()) 294 continue 295 } 296 297 check := nodeVolumeSpecs[volumeSpec] 298 299 if !check { 300 klog.V(2).Infof("VerifyVolumesAreAttached.BulkVerifyVolumes failed for node %q and volume %q", 301 nodeName, 302 volumeSpec.Name()) 303 actualStateOfWorld.MarkVolumeAsDetached(volumeSpecMap[volumeSpec], nodeName) 304 } 305 } 306 } 307 308 // It is hard to differentiate migrated status for all volumes for verify_volumes_are_attached 309 return volumetypes.NewOperationContext(nil, nil, false) 310 } 311 312 return volumetypes.GeneratedOperations{ 313 OperationName: "verify_volumes_are_attached", 314 OperationFunc: bulkVolumeVerifyFunc, 315 CompleteFunc: util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(pluginName, nil), "verify_volumes_are_attached"), 316 EventRecorderFunc: nil, // nil because we do not want to generate event on error 317 }, nil 318 319} 320 321func (og *operationGenerator) GenerateAttachVolumeFunc( 322 volumeToAttach VolumeToAttach, 323 actualStateOfWorld ActualStateOfWorldAttacherUpdater) volumetypes.GeneratedOperations { 324 325 attachVolumeFunc := func() volumetypes.OperationContext { 326 attachableVolumePlugin, err := 327 og.volumePluginMgr.FindAttachablePluginBySpec(volumeToAttach.VolumeSpec) 328 329 migrated := getMigratedStatusBySpec(volumeToAttach.VolumeSpec) 330 331 if err != nil || attachableVolumePlugin == nil { 332 eventErr, detailedErr := volumeToAttach.GenerateError("AttachVolume.FindAttachablePluginBySpec failed", err) 333 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) 334 } 335 336 volumeAttacher, newAttacherErr := attachableVolumePlugin.NewAttacher() 337 if newAttacherErr != nil { 338 eventErr, detailedErr := volumeToAttach.GenerateError("AttachVolume.NewAttacher failed", newAttacherErr) 339 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) 340 } 341 342 // Execute attach 343 devicePath, attachErr := volumeAttacher.Attach( 344 volumeToAttach.VolumeSpec, volumeToAttach.NodeName) 345 346 if attachErr != nil { 347 uncertainNode := volumeToAttach.NodeName 348 if derr, ok := attachErr.(*volerr.DanglingAttachError); ok { 349 uncertainNode = derr.CurrentNode 350 } 351 addErr := actualStateOfWorld.MarkVolumeAsUncertain( 352 volumeToAttach.VolumeName, 353 volumeToAttach.VolumeSpec, 354 uncertainNode) 355 if addErr != nil { 356 klog.Errorf("AttachVolume.MarkVolumeAsUncertain fail to add the volume %q to actual state with %s", volumeToAttach.VolumeName, addErr) 357 } 358 359 // On failure, return error. Caller will log and retry. 360 eventErr, detailedErr := volumeToAttach.GenerateError("AttachVolume.Attach failed", attachErr) 361 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) 362 } 363 364 // Successful attach event is useful for user debugging 365 simpleMsg, _ := volumeToAttach.GenerateMsg("AttachVolume.Attach succeeded", "") 366 for _, pod := range volumeToAttach.ScheduledPods { 367 og.recorder.Eventf(pod, v1.EventTypeNormal, kevents.SuccessfulAttachVolume, simpleMsg) 368 } 369 klog.Infof(volumeToAttach.GenerateMsgDetailed("AttachVolume.Attach succeeded", "")) 370 371 // Update actual state of world 372 addVolumeNodeErr := actualStateOfWorld.MarkVolumeAsAttached( 373 v1.UniqueVolumeName(""), volumeToAttach.VolumeSpec, volumeToAttach.NodeName, devicePath) 374 if addVolumeNodeErr != nil { 375 // On failure, return error. Caller will log and retry. 376 eventErr, detailedErr := volumeToAttach.GenerateError("AttachVolume.MarkVolumeAsAttached failed", addVolumeNodeErr) 377 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) 378 } 379 380 return volumetypes.NewOperationContext(nil, nil, migrated) 381 } 382 383 eventRecorderFunc := func(err *error) { 384 if *err != nil { 385 for _, pod := range volumeToAttach.ScheduledPods { 386 og.recorder.Eventf(pod, v1.EventTypeWarning, kevents.FailedAttachVolume, (*err).Error()) 387 } 388 } 389 } 390 391 attachableVolumePluginName := unknownAttachableVolumePlugin 392 393 // Get attacher plugin 394 attachableVolumePlugin, err := 395 og.volumePluginMgr.FindAttachablePluginBySpec(volumeToAttach.VolumeSpec) 396 // It's ok to ignore the error, returning error is not expected from this function. 397 // If an error case occurred during the function generation, this error case(skipped one) will also trigger an error 398 // while the generated function is executed. And those errors will be handled during the execution of the generated 399 // function with a back off policy. 400 if err == nil && attachableVolumePlugin != nil { 401 attachableVolumePluginName = attachableVolumePlugin.GetPluginName() 402 } 403 404 return volumetypes.GeneratedOperations{ 405 OperationName: "volume_attach", 406 OperationFunc: attachVolumeFunc, 407 EventRecorderFunc: eventRecorderFunc, 408 CompleteFunc: util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(attachableVolumePluginName, volumeToAttach.VolumeSpec), "volume_attach"), 409 } 410} 411 412func (og *operationGenerator) GetVolumePluginMgr() *volume.VolumePluginMgr { 413 return og.volumePluginMgr 414} 415 416func (og *operationGenerator) GetCSITranslator() InTreeToCSITranslator { 417 return og.translator 418} 419 420func (og *operationGenerator) GenerateDetachVolumeFunc( 421 volumeToDetach AttachedVolume, 422 verifySafeToDetach bool, 423 actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) { 424 var volumeName string 425 var attachableVolumePlugin volume.AttachableVolumePlugin 426 var pluginName string 427 var err error 428 429 if volumeToDetach.VolumeSpec != nil { 430 attachableVolumePlugin, err = findDetachablePluginBySpec(volumeToDetach.VolumeSpec, og.volumePluginMgr) 431 if err != nil || attachableVolumePlugin == nil { 432 return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("DetachVolume.findDetachablePluginBySpec failed", err) 433 } 434 435 volumeName, err = 436 attachableVolumePlugin.GetVolumeName(volumeToDetach.VolumeSpec) 437 if err != nil { 438 return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("DetachVolume.GetVolumeName failed", err) 439 } 440 } else { 441 // Get attacher plugin and the volumeName by splitting the volume unique name in case 442 // there's no VolumeSpec: this happens only on attach/detach controller crash recovery 443 // when a pod has been deleted during the controller downtime 444 pluginName, volumeName, err = util.SplitUniqueName(volumeToDetach.VolumeName) 445 if err != nil { 446 return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("DetachVolume.SplitUniqueName failed", err) 447 } 448 449 attachableVolumePlugin, err = og.volumePluginMgr.FindAttachablePluginByName(pluginName) 450 if err != nil || attachableVolumePlugin == nil { 451 return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("DetachVolume.FindAttachablePluginByName failed", err) 452 } 453 454 } 455 456 if pluginName == "" { 457 pluginName = attachableVolumePlugin.GetPluginName() 458 } 459 460 volumeDetacher, err := attachableVolumePlugin.NewDetacher() 461 if err != nil { 462 return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("DetachVolume.NewDetacher failed", err) 463 } 464 465 detachVolumeFunc := func() volumetypes.OperationContext { 466 var err error 467 if verifySafeToDetach { 468 err = og.verifyVolumeIsSafeToDetach(volumeToDetach) 469 } 470 if err == nil { 471 err = volumeDetacher.Detach(volumeName, volumeToDetach.NodeName) 472 } 473 474 migrated := getMigratedStatusBySpec(volumeToDetach.VolumeSpec) 475 476 if err != nil { 477 // On failure, add volume back to ReportAsAttached list 478 actualStateOfWorld.AddVolumeToReportAsAttached( 479 volumeToDetach.VolumeName, volumeToDetach.NodeName) 480 eventErr, detailedErr := volumeToDetach.GenerateError("DetachVolume.Detach failed", err) 481 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) 482 } 483 484 klog.Infof(volumeToDetach.GenerateMsgDetailed("DetachVolume.Detach succeeded", "")) 485 486 // Update actual state of world 487 actualStateOfWorld.MarkVolumeAsDetached( 488 volumeToDetach.VolumeName, volumeToDetach.NodeName) 489 490 return volumetypes.NewOperationContext(nil, nil, migrated) 491 } 492 493 return volumetypes.GeneratedOperations{ 494 OperationName: "volume_detach", 495 OperationFunc: detachVolumeFunc, 496 CompleteFunc: util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(pluginName, volumeToDetach.VolumeSpec), "volume_detach"), 497 EventRecorderFunc: nil, // nil because we do not want to generate event on error 498 }, nil 499} 500 501func (og *operationGenerator) GenerateMountVolumeFunc( 502 waitForAttachTimeout time.Duration, 503 volumeToMount VolumeToMount, 504 actualStateOfWorld ActualStateOfWorldMounterUpdater, 505 isRemount bool) volumetypes.GeneratedOperations { 506 507 volumePluginName := unknownVolumePlugin 508 volumePlugin, err := 509 og.volumePluginMgr.FindPluginBySpec(volumeToMount.VolumeSpec) 510 if err == nil && volumePlugin != nil { 511 volumePluginName = volumePlugin.GetPluginName() 512 } 513 514 mountVolumeFunc := func() volumetypes.OperationContext { 515 // Get mounter plugin 516 volumePlugin, err := og.volumePluginMgr.FindPluginBySpec(volumeToMount.VolumeSpec) 517 518 migrated := getMigratedStatusBySpec(volumeToMount.VolumeSpec) 519 520 if err != nil || volumePlugin == nil { 521 eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.FindPluginBySpec failed", err) 522 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) 523 } 524 525 affinityErr := checkNodeAffinity(og, volumeToMount) 526 if affinityErr != nil { 527 eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.NodeAffinity check failed", affinityErr) 528 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) 529 } 530 531 volumeMounter, newMounterErr := volumePlugin.NewMounter( 532 volumeToMount.VolumeSpec, 533 volumeToMount.Pod, 534 volume.VolumeOptions{}) 535 if newMounterErr != nil { 536 eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.NewMounter initialization failed", newMounterErr) 537 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) 538 } 539 540 mountCheckError := checkMountOptionSupport(og, volumeToMount, volumePlugin) 541 if mountCheckError != nil { 542 eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.MountOptionSupport check failed", mountCheckError) 543 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) 544 } 545 546 // Enforce ReadWriteOncePod access mode if it is the only one present. This is also enforced during scheduling. 547 if utilfeature.DefaultFeatureGate.Enabled(features.ReadWriteOncePod) && 548 actualStateOfWorld.IsVolumeMountedElsewhere(volumeToMount.VolumeName, volumeToMount.PodName) && 549 // Because we do not know what access mode the pod intends to use if there are multiple. 550 len(volumeToMount.VolumeSpec.PersistentVolume.Spec.AccessModes) == 1 && 551 v1helper.ContainsAccessMode(volumeToMount.VolumeSpec.PersistentVolume.Spec.AccessModes, v1.ReadWriteOncePod) { 552 553 err = goerrors.New("volume uses the ReadWriteOncePod access mode and is already in use by another pod") 554 eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.SetUp failed", err) 555 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) 556 } 557 558 // Get attacher, if possible 559 attachableVolumePlugin, _ := 560 og.volumePluginMgr.FindAttachablePluginBySpec(volumeToMount.VolumeSpec) 561 var volumeAttacher volume.Attacher 562 if attachableVolumePlugin != nil { 563 volumeAttacher, _ = attachableVolumePlugin.NewAttacher() 564 } 565 566 // get deviceMounter, if possible 567 deviceMountableVolumePlugin, _ := og.volumePluginMgr.FindDeviceMountablePluginBySpec(volumeToMount.VolumeSpec) 568 var volumeDeviceMounter volume.DeviceMounter 569 if deviceMountableVolumePlugin != nil { 570 volumeDeviceMounter, _ = deviceMountableVolumePlugin.NewDeviceMounter() 571 } 572 573 var fsGroup *int64 574 var fsGroupChangePolicy *v1.PodFSGroupChangePolicy 575 if podSc := volumeToMount.Pod.Spec.SecurityContext; podSc != nil { 576 if podSc.FSGroup != nil { 577 fsGroup = podSc.FSGroup 578 } 579 if podSc.FSGroupChangePolicy != nil { 580 fsGroupChangePolicy = podSc.FSGroupChangePolicy 581 } 582 } 583 584 devicePath := volumeToMount.DevicePath 585 if volumeAttacher != nil { 586 // Wait for attachable volumes to finish attaching 587 klog.Infof(volumeToMount.GenerateMsgDetailed("MountVolume.WaitForAttach entering", fmt.Sprintf("DevicePath %q", volumeToMount.DevicePath))) 588 589 devicePath, err = volumeAttacher.WaitForAttach( 590 volumeToMount.VolumeSpec, devicePath, volumeToMount.Pod, waitForAttachTimeout) 591 if err != nil { 592 // On failure, return error. Caller will log and retry. 593 eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.WaitForAttach failed", err) 594 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) 595 } 596 597 klog.Infof(volumeToMount.GenerateMsgDetailed("MountVolume.WaitForAttach succeeded", fmt.Sprintf("DevicePath %q", devicePath))) 598 } 599 600 var resizeDone bool 601 var resizeError error 602 resizeOptions := volume.NodeResizeOptions{ 603 DevicePath: devicePath, 604 } 605 606 if volumeDeviceMounter != nil && actualStateOfWorld.GetDeviceMountState(volumeToMount.VolumeName) != DeviceGloballyMounted { 607 deviceMountPath, err := 608 volumeDeviceMounter.GetDeviceMountPath(volumeToMount.VolumeSpec) 609 if err != nil { 610 // On failure, return error. Caller will log and retry. 611 eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.GetDeviceMountPath failed", err) 612 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) 613 } 614 615 // Mount device to global mount path 616 err = volumeDeviceMounter.MountDevice( 617 volumeToMount.VolumeSpec, 618 devicePath, 619 deviceMountPath, 620 volume.DeviceMounterArgs{FsGroup: fsGroup}, 621 ) 622 if err != nil { 623 og.checkForFailedMount(volumeToMount, err) 624 og.markDeviceErrorState(volumeToMount, devicePath, deviceMountPath, err, actualStateOfWorld) 625 // On failure, return error. Caller will log and retry. 626 eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.MountDevice failed", err) 627 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) 628 } 629 630 klog.Infof(volumeToMount.GenerateMsgDetailed("MountVolume.MountDevice succeeded", fmt.Sprintf("device mount path %q", deviceMountPath))) 631 632 // Update actual state of world to reflect volume is globally mounted 633 markDeviceMountedErr := actualStateOfWorld.MarkDeviceAsMounted( 634 volumeToMount.VolumeName, devicePath, deviceMountPath) 635 if markDeviceMountedErr != nil { 636 // On failure, return error. Caller will log and retry. 637 eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.MarkDeviceAsMounted failed", markDeviceMountedErr) 638 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) 639 } 640 641 // If volume expansion is performed after MountDevice but before SetUp then 642 // deviceMountPath and deviceStagePath is going to be the same. 643 // Deprecation: Calling NodeExpandVolume after NodeStage/MountDevice will be deprecated 644 // in a future version of k8s. 645 resizeOptions.DeviceMountPath = deviceMountPath 646 resizeOptions.DeviceStagePath = deviceMountPath 647 resizeOptions.CSIVolumePhase = volume.CSIVolumeStaged 648 649 // NodeExpandVolume will resize the file system if user has requested a resize of 650 // underlying persistent volume and is allowed to do so. 651 resizeDone, resizeError = og.nodeExpandVolume(volumeToMount, actualStateOfWorld, resizeOptions) 652 653 if resizeError != nil { 654 klog.Errorf("MountVolume.NodeExpandVolume failed with %v", resizeError) 655 656 // Resize failed. To make sure NodeExpand is re-tried again on the next attempt 657 // *before* SetUp(), mark the mounted device as uncertain. 658 markDeviceUncertainErr := actualStateOfWorld.MarkDeviceAsUncertain( 659 volumeToMount.VolumeName, devicePath, deviceMountPath) 660 if markDeviceUncertainErr != nil { 661 // just log, return the resizeError error instead 662 klog.Infof(volumeToMount.GenerateMsgDetailed( 663 "MountVolume.MountDevice failed to mark volume as uncertain", 664 markDeviceUncertainErr.Error())) 665 } 666 eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.MountDevice failed while expanding volume", resizeError) 667 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) 668 } 669 } 670 671 if og.checkNodeCapabilitiesBeforeMount { 672 if canMountErr := volumeMounter.CanMount(); canMountErr != nil { 673 err = fmt.Errorf( 674 "verify that your node machine has the required components before attempting to mount this volume type. %s", 675 canMountErr) 676 eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.CanMount failed", err) 677 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) 678 } 679 } 680 681 // Execute mount 682 mountErr := volumeMounter.SetUp(volume.MounterArgs{ 683 FsUser: util.FsUserFrom(volumeToMount.Pod), 684 FsGroup: fsGroup, 685 DesiredSize: volumeToMount.DesiredSizeLimit, 686 FSGroupChangePolicy: fsGroupChangePolicy, 687 }) 688 // Update actual state of world 689 markOpts := MarkVolumeOpts{ 690 PodName: volumeToMount.PodName, 691 PodUID: volumeToMount.Pod.UID, 692 VolumeName: volumeToMount.VolumeName, 693 Mounter: volumeMounter, 694 OuterVolumeSpecName: volumeToMount.OuterVolumeSpecName, 695 VolumeGidVolume: volumeToMount.VolumeGidValue, 696 VolumeSpec: volumeToMount.VolumeSpec, 697 VolumeMountState: VolumeMounted, 698 } 699 if mountErr != nil { 700 og.checkForFailedMount(volumeToMount, mountErr) 701 og.markVolumeErrorState(volumeToMount, markOpts, mountErr, actualStateOfWorld) 702 // On failure, return error. Caller will log and retry. 703 eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.SetUp failed", mountErr) 704 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) 705 } 706 707 _, detailedMsg := volumeToMount.GenerateMsg("MountVolume.SetUp succeeded", "") 708 verbosity := klog.Level(1) 709 if isRemount { 710 verbosity = klog.Level(4) 711 } 712 klog.V(verbosity).Infof(detailedMsg) 713 resizeOptions.DeviceMountPath = volumeMounter.GetPath() 714 resizeOptions.CSIVolumePhase = volume.CSIVolumePublished 715 716 // We need to call resizing here again in case resizing was not done during device mount. There could be 717 // two reasons of that: 718 // - Volume does not support DeviceMounter interface. 719 // - In case of CSI the volume does not have node stage_unstage capability. 720 if !resizeDone { 721 _, resizeError = og.nodeExpandVolume(volumeToMount, actualStateOfWorld, resizeOptions) 722 if resizeError != nil { 723 klog.Errorf("MountVolume.NodeExpandVolume failed with %v", resizeError) 724 eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.Setup failed while expanding volume", resizeError) 725 // At this point, MountVolume.Setup already succeeded, we should add volume into actual state 726 // so that reconciler can clean up volume when needed. However, volume resize failed, 727 // we should not mark the volume as mounted to avoid pod starts using it. 728 // Considering the above situations, we mark volume as uncertain here so that reconciler will tigger 729 // volume tear down when pod is deleted, and also makes sure pod will not start using it. 730 if err := actualStateOfWorld.MarkVolumeMountAsUncertain(markOpts); err != nil { 731 klog.Errorf(volumeToMount.GenerateErrorDetailed("MountVolume.MarkVolumeMountAsUncertain failed", err).Error()) 732 } 733 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) 734 } 735 } 736 737 markVolMountedErr := actualStateOfWorld.MarkVolumeAsMounted(markOpts) 738 if markVolMountedErr != nil { 739 // On failure, return error. Caller will log and retry. 740 eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.MarkVolumeAsMounted failed", markVolMountedErr) 741 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) 742 } 743 return volumetypes.NewOperationContext(nil, nil, migrated) 744 } 745 746 eventRecorderFunc := func(err *error) { 747 if *err != nil { 748 og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.FailedMountVolume, (*err).Error()) 749 } 750 } 751 752 return volumetypes.GeneratedOperations{ 753 OperationName: "volume_mount", 754 OperationFunc: mountVolumeFunc, 755 EventRecorderFunc: eventRecorderFunc, 756 CompleteFunc: util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(volumePluginName, volumeToMount.VolumeSpec), "volume_mount"), 757 } 758} 759 760func (og *operationGenerator) checkForFailedMount(volumeToMount VolumeToMount, mountError error) { 761 pv := volumeToMount.VolumeSpec.PersistentVolume 762 if pv == nil { 763 return 764 } 765 766 if volumetypes.IsFilesystemMismatchError(mountError) { 767 simpleMsg, _ := volumeToMount.GenerateMsg("MountVolume failed", mountError.Error()) 768 og.recorder.Eventf(pv, v1.EventTypeWarning, kevents.FailedMountOnFilesystemMismatch, simpleMsg) 769 } 770} 771 772func (og *operationGenerator) markDeviceErrorState(volumeToMount VolumeToMount, devicePath, deviceMountPath string, mountError error, actualStateOfWorld ActualStateOfWorldMounterUpdater) { 773 if volumetypes.IsOperationFinishedError(mountError) && 774 actualStateOfWorld.GetDeviceMountState(volumeToMount.VolumeName) == DeviceMountUncertain { 775 // Only devices which were uncertain can be marked as unmounted 776 markDeviceUnmountError := actualStateOfWorld.MarkDeviceAsUnmounted(volumeToMount.VolumeName) 777 if markDeviceUnmountError != nil { 778 klog.Errorf(volumeToMount.GenerateErrorDetailed("MountDevice.MarkDeviceAsUnmounted failed", markDeviceUnmountError).Error()) 779 } 780 return 781 } 782 783 if volumetypes.IsUncertainProgressError(mountError) && 784 actualStateOfWorld.GetDeviceMountState(volumeToMount.VolumeName) == DeviceNotMounted { 785 // only devices which are not mounted can be marked as uncertain. We do not want to mark a device 786 // which was previously marked as mounted here as uncertain. 787 markDeviceUncertainError := actualStateOfWorld.MarkDeviceAsUncertain(volumeToMount.VolumeName, devicePath, deviceMountPath) 788 if markDeviceUncertainError != nil { 789 klog.Errorf(volumeToMount.GenerateErrorDetailed("MountDevice.MarkDeviceAsUncertain failed", markDeviceUncertainError).Error()) 790 } 791 } 792 793} 794 795func (og *operationGenerator) markVolumeErrorState(volumeToMount VolumeToMount, markOpts MarkVolumeOpts, mountError error, actualStateOfWorld ActualStateOfWorldMounterUpdater) { 796 if volumetypes.IsOperationFinishedError(mountError) && 797 actualStateOfWorld.GetVolumeMountState(volumeToMount.VolumeName, markOpts.PodName) == VolumeMountUncertain { 798 t := actualStateOfWorld.MarkVolumeAsUnmounted(volumeToMount.PodName, volumeToMount.VolumeName) 799 if t != nil { 800 klog.Errorf(volumeToMount.GenerateErrorDetailed("MountVolume.MarkVolumeAsUnmounted failed", t).Error()) 801 } 802 return 803 } 804 805 if volumetypes.IsUncertainProgressError(mountError) && 806 actualStateOfWorld.GetVolumeMountState(volumeToMount.VolumeName, markOpts.PodName) == VolumeNotMounted { 807 t := actualStateOfWorld.MarkVolumeMountAsUncertain(markOpts) 808 if t != nil { 809 klog.Errorf(volumeToMount.GenerateErrorDetailed("MountVolume.MarkVolumeMountAsUncertain failed", t).Error()) 810 } 811 } 812 813} 814 815func (og *operationGenerator) GenerateUnmountVolumeFunc( 816 volumeToUnmount MountedVolume, 817 actualStateOfWorld ActualStateOfWorldMounterUpdater, 818 podsDir string) (volumetypes.GeneratedOperations, error) { 819 // Get mountable plugin 820 volumePlugin, err := og.volumePluginMgr.FindPluginByName(volumeToUnmount.PluginName) 821 if err != nil || volumePlugin == nil { 822 return volumetypes.GeneratedOperations{}, volumeToUnmount.GenerateErrorDetailed("UnmountVolume.FindPluginByName failed", err) 823 } 824 volumeUnmounter, newUnmounterErr := volumePlugin.NewUnmounter( 825 volumeToUnmount.InnerVolumeSpecName, volumeToUnmount.PodUID) 826 if newUnmounterErr != nil { 827 return volumetypes.GeneratedOperations{}, volumeToUnmount.GenerateErrorDetailed("UnmountVolume.NewUnmounter failed", newUnmounterErr) 828 } 829 830 unmountVolumeFunc := func() volumetypes.OperationContext { 831 subpather := og.volumePluginMgr.Host.GetSubpather() 832 833 migrated := getMigratedStatusBySpec(volumeToUnmount.VolumeSpec) 834 835 // Remove all bind-mounts for subPaths 836 podDir := filepath.Join(podsDir, string(volumeToUnmount.PodUID)) 837 if err := subpather.CleanSubPaths(podDir, volumeToUnmount.InnerVolumeSpecName); err != nil { 838 eventErr, detailedErr := volumeToUnmount.GenerateError("error cleaning subPath mounts", err) 839 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) 840 } 841 842 // Execute unmount 843 unmountErr := volumeUnmounter.TearDown() 844 if unmountErr != nil { 845 // Mark the volume as uncertain, so SetUp is called for new pods. Teardown may be already in progress. 846 opts := MarkVolumeOpts{ 847 PodName: volumeToUnmount.PodName, 848 PodUID: volumeToUnmount.PodUID, 849 VolumeName: volumeToUnmount.VolumeName, 850 OuterVolumeSpecName: volumeToUnmount.OuterVolumeSpecName, 851 VolumeGidVolume: volumeToUnmount.VolumeGidValue, 852 VolumeSpec: volumeToUnmount.VolumeSpec, 853 VolumeMountState: VolumeMountUncertain, 854 } 855 markMountUncertainErr := actualStateOfWorld.MarkVolumeMountAsUncertain(opts) 856 if markMountUncertainErr != nil { 857 // There is nothing else we can do. Hope that UnmountVolume will be re-tried shortly. 858 klog.Errorf(volumeToUnmount.GenerateErrorDetailed("UnmountVolume.MarkVolumeMountAsUncertain failed", markMountUncertainErr).Error()) 859 } 860 861 // On failure, return error. Caller will log and retry. 862 eventErr, detailedErr := volumeToUnmount.GenerateError("UnmountVolume.TearDown failed", unmountErr) 863 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) 864 } 865 866 klog.Infof( 867 "UnmountVolume.TearDown succeeded for volume %q (OuterVolumeSpecName: %q) pod %q (UID: %q). InnerVolumeSpecName %q. PluginName %q, VolumeGidValue %q", 868 volumeToUnmount.VolumeName, 869 volumeToUnmount.OuterVolumeSpecName, 870 volumeToUnmount.PodName, 871 volumeToUnmount.PodUID, 872 volumeToUnmount.InnerVolumeSpecName, 873 volumeToUnmount.PluginName, 874 volumeToUnmount.VolumeGidValue) 875 876 // Update actual state of world 877 markVolMountedErr := actualStateOfWorld.MarkVolumeAsUnmounted( 878 volumeToUnmount.PodName, volumeToUnmount.VolumeName) 879 if markVolMountedErr != nil { 880 // On failure, just log and exit 881 klog.Errorf(volumeToUnmount.GenerateErrorDetailed("UnmountVolume.MarkVolumeAsUnmounted failed", markVolMountedErr).Error()) 882 } 883 884 return volumetypes.NewOperationContext(nil, nil, migrated) 885 } 886 887 return volumetypes.GeneratedOperations{ 888 OperationName: "volume_unmount", 889 OperationFunc: unmountVolumeFunc, 890 CompleteFunc: util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(volumePlugin.GetPluginName(), volumeToUnmount.VolumeSpec), "volume_unmount"), 891 EventRecorderFunc: nil, // nil because we do not want to generate event on error 892 }, nil 893} 894 895func (og *operationGenerator) GenerateUnmountDeviceFunc( 896 deviceToDetach AttachedVolume, 897 actualStateOfWorld ActualStateOfWorldMounterUpdater, 898 hostutil hostutil.HostUtils) (volumetypes.GeneratedOperations, error) { 899 // Get DeviceMounter plugin 900 deviceMountableVolumePlugin, err := 901 og.volumePluginMgr.FindDeviceMountablePluginByName(deviceToDetach.PluginName) 902 if err != nil || deviceMountableVolumePlugin == nil { 903 return volumetypes.GeneratedOperations{}, deviceToDetach.GenerateErrorDetailed("UnmountDevice.FindDeviceMountablePluginByName failed", err) 904 } 905 906 volumeDeviceUnmounter, err := deviceMountableVolumePlugin.NewDeviceUnmounter() 907 if err != nil { 908 return volumetypes.GeneratedOperations{}, deviceToDetach.GenerateErrorDetailed("UnmountDevice.NewDeviceUnmounter failed", err) 909 } 910 911 volumeDeviceMounter, err := deviceMountableVolumePlugin.NewDeviceMounter() 912 if err != nil { 913 return volumetypes.GeneratedOperations{}, deviceToDetach.GenerateErrorDetailed("UnmountDevice.NewDeviceMounter failed", err) 914 } 915 916 unmountDeviceFunc := func() volumetypes.OperationContext { 917 918 migrated := getMigratedStatusBySpec(deviceToDetach.VolumeSpec) 919 920 //deviceMountPath := deviceToDetach.DeviceMountPath 921 deviceMountPath, err := 922 volumeDeviceMounter.GetDeviceMountPath(deviceToDetach.VolumeSpec) 923 if err != nil { 924 // On failure other than "does not exist", return error. Caller will log and retry. 925 if !strings.Contains(err.Error(), "does not exist") { 926 eventErr, detailedErr := deviceToDetach.GenerateError("GetDeviceMountPath failed", err) 927 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) 928 } 929 // If the mount path could not be found, don't fail the unmount, but instead log a warning and proceed, 930 // using the value from deviceToDetach.DeviceMountPath, so that the device can be marked as unmounted 931 deviceMountPath = deviceToDetach.DeviceMountPath 932 klog.Warningf(deviceToDetach.GenerateMsg(fmt.Sprintf( 933 "GetDeviceMountPath failed, but unmount operation will proceed using deviceMountPath=%s: %v", deviceMountPath, err), "")) 934 } 935 refs, err := deviceMountableVolumePlugin.GetDeviceMountRefs(deviceMountPath) 936 937 if err != nil || util.HasMountRefs(deviceMountPath, refs) { 938 if err == nil { 939 err = fmt.Errorf("the device mount path %q is still mounted by other references %v", deviceMountPath, refs) 940 } 941 eventErr, detailedErr := deviceToDetach.GenerateError("GetDeviceMountRefs check failed", err) 942 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) 943 } 944 // Execute unmount 945 unmountDeviceErr := volumeDeviceUnmounter.UnmountDevice(deviceMountPath) 946 if unmountDeviceErr != nil { 947 // Mark the device as uncertain, so MountDevice is called for new pods. UnmountDevice may be already in progress. 948 markDeviceUncertainErr := actualStateOfWorld.MarkDeviceAsUncertain(deviceToDetach.VolumeName, deviceToDetach.DevicePath, deviceMountPath) 949 if markDeviceUncertainErr != nil { 950 // There is nothing else we can do. Hope that UnmountDevice will be re-tried shortly. 951 klog.Errorf(deviceToDetach.GenerateErrorDetailed("UnmountDevice.MarkDeviceAsUncertain failed", markDeviceUncertainErr).Error()) 952 } 953 954 // On failure, return error. Caller will log and retry. 955 eventErr, detailedErr := deviceToDetach.GenerateError("UnmountDevice failed", unmountDeviceErr) 956 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) 957 } 958 // Before logging that UnmountDevice succeeded and moving on, 959 // use hostutil.PathIsDevice to check if the path is a device, 960 // if so use hostutil.DeviceOpened to check if the device is in use anywhere 961 // else on the system. Retry if it returns true. 962 deviceOpened, deviceOpenedErr := isDeviceOpened(deviceToDetach, hostutil) 963 if deviceOpenedErr != nil { 964 return volumetypes.NewOperationContext(nil, deviceOpenedErr, migrated) 965 } 966 // The device is still in use elsewhere. Caller will log and retry. 967 if deviceOpened { 968 eventErr, detailedErr := deviceToDetach.GenerateError( 969 "UnmountDevice failed", 970 goerrors.New("the device is in use when it was no longer expected to be in use")) 971 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) 972 } 973 974 klog.Infof(deviceToDetach.GenerateMsg("UnmountDevice succeeded", "")) 975 976 // Update actual state of world 977 markDeviceUnmountedErr := actualStateOfWorld.MarkDeviceAsUnmounted( 978 deviceToDetach.VolumeName) 979 if markDeviceUnmountedErr != nil { 980 // On failure, return error. Caller will log and retry. 981 eventErr, detailedErr := deviceToDetach.GenerateError("MarkDeviceAsUnmounted failed", markDeviceUnmountedErr) 982 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) 983 } 984 985 return volumetypes.NewOperationContext(nil, nil, migrated) 986 } 987 988 return volumetypes.GeneratedOperations{ 989 OperationName: "unmount_device", 990 OperationFunc: unmountDeviceFunc, 991 CompleteFunc: util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(deviceMountableVolumePlugin.GetPluginName(), deviceToDetach.VolumeSpec), "unmount_device"), 992 EventRecorderFunc: nil, // nil because we do not want to generate event on error 993 }, nil 994} 995 996// GenerateMapVolumeFunc marks volume as mounted based on following steps. 997// If plugin is attachable, call WaitForAttach() and then mark the device 998// as mounted. On next step, SetUpDevice is called without dependent of 999// plugin type, but this method mainly is targeted for none attachable plugin. 1000// After setup is done, create symbolic links on both global map path and pod 1001// device map path. Once symbolic links are created, take fd lock by 1002// loopback for the device to avoid silent volume replacement. This lock 1003// will be released once no one uses the device. 1004// If all steps are completed, the volume is marked as mounted. 1005func (og *operationGenerator) GenerateMapVolumeFunc( 1006 waitForAttachTimeout time.Duration, 1007 volumeToMount VolumeToMount, 1008 actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) { 1009 1010 // Get block volume mapper plugin 1011 blockVolumePlugin, err := 1012 og.volumePluginMgr.FindMapperPluginBySpec(volumeToMount.VolumeSpec) 1013 if err != nil { 1014 return volumetypes.GeneratedOperations{}, volumeToMount.GenerateErrorDetailed("MapVolume.FindMapperPluginBySpec failed", err) 1015 } 1016 1017 if blockVolumePlugin == nil { 1018 return volumetypes.GeneratedOperations{}, volumeToMount.GenerateErrorDetailed("MapVolume.FindMapperPluginBySpec failed to find BlockVolumeMapper plugin. Volume plugin is nil.", nil) 1019 } 1020 1021 affinityErr := checkNodeAffinity(og, volumeToMount) 1022 if affinityErr != nil { 1023 eventErr, detailedErr := volumeToMount.GenerateError("MapVolume.NodeAffinity check failed", affinityErr) 1024 og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.FailedMountVolume, eventErr.Error()) 1025 return volumetypes.GeneratedOperations{}, detailedErr 1026 } 1027 blockVolumeMapper, newMapperErr := blockVolumePlugin.NewBlockVolumeMapper( 1028 volumeToMount.VolumeSpec, 1029 volumeToMount.Pod, 1030 volume.VolumeOptions{}) 1031 if newMapperErr != nil { 1032 eventErr, detailedErr := volumeToMount.GenerateError("MapVolume.NewBlockVolumeMapper initialization failed", newMapperErr) 1033 og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.FailedMapVolume, eventErr.Error()) 1034 return volumetypes.GeneratedOperations{}, detailedErr 1035 } 1036 1037 // Get attacher, if possible 1038 attachableVolumePlugin, _ := 1039 og.volumePluginMgr.FindAttachablePluginBySpec(volumeToMount.VolumeSpec) 1040 var volumeAttacher volume.Attacher 1041 if attachableVolumePlugin != nil { 1042 volumeAttacher, _ = attachableVolumePlugin.NewAttacher() 1043 } 1044 1045 mapVolumeFunc := func() (operationContext volumetypes.OperationContext) { 1046 var devicePath string 1047 var stagingPath string 1048 1049 migrated := getMigratedStatusBySpec(volumeToMount.VolumeSpec) 1050 1051 // Enforce ReadWriteOncePod access mode. This is also enforced during scheduling. 1052 if utilfeature.DefaultFeatureGate.Enabled(features.ReadWriteOncePod) && 1053 actualStateOfWorld.IsVolumeMountedElsewhere(volumeToMount.VolumeName, volumeToMount.PodName) && 1054 // Because we do not know what access mode the pod intends to use if there are multiple. 1055 len(volumeToMount.VolumeSpec.PersistentVolume.Spec.AccessModes) == 1 && 1056 v1helper.ContainsAccessMode(volumeToMount.VolumeSpec.PersistentVolume.Spec.AccessModes, v1.ReadWriteOncePod) { 1057 1058 err = goerrors.New("volume uses the ReadWriteOncePod access mode and is already in use by another pod") 1059 eventErr, detailedErr := volumeToMount.GenerateError("MapVolume.SetUpDevice failed", err) 1060 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) 1061 } 1062 1063 // Set up global map path under the given plugin directory using symbolic link 1064 globalMapPath, err := 1065 blockVolumeMapper.GetGlobalMapPath(volumeToMount.VolumeSpec) 1066 if err != nil { 1067 // On failure, return error. Caller will log and retry. 1068 eventErr, detailedErr := volumeToMount.GenerateError("MapVolume.GetGlobalMapPath failed", err) 1069 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) 1070 } 1071 if volumeAttacher != nil { 1072 // Wait for attachable volumes to finish attaching 1073 klog.Infof(volumeToMount.GenerateMsgDetailed("MapVolume.WaitForAttach entering", fmt.Sprintf("DevicePath %q", volumeToMount.DevicePath))) 1074 1075 devicePath, err = volumeAttacher.WaitForAttach( 1076 volumeToMount.VolumeSpec, volumeToMount.DevicePath, volumeToMount.Pod, waitForAttachTimeout) 1077 if err != nil { 1078 // On failure, return error. Caller will log and retry. 1079 eventErr, detailedErr := volumeToMount.GenerateError("MapVolume.WaitForAttach failed", err) 1080 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) 1081 } 1082 1083 klog.Infof(volumeToMount.GenerateMsgDetailed("MapVolume.WaitForAttach succeeded", fmt.Sprintf("DevicePath %q", devicePath))) 1084 1085 } 1086 // Call SetUpDevice if blockVolumeMapper implements CustomBlockVolumeMapper 1087 if customBlockVolumeMapper, ok := blockVolumeMapper.(volume.CustomBlockVolumeMapper); ok { 1088 var mapErr error 1089 stagingPath, mapErr = customBlockVolumeMapper.SetUpDevice() 1090 if mapErr != nil { 1091 og.markDeviceErrorState(volumeToMount, devicePath, globalMapPath, mapErr, actualStateOfWorld) 1092 // On failure, return error. Caller will log and retry. 1093 eventErr, detailedErr := volumeToMount.GenerateError("MapVolume.SetUpDevice failed", mapErr) 1094 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) 1095 } 1096 } 1097 1098 // Update actual state of world to reflect volume is globally mounted 1099 markedDevicePath := devicePath 1100 markDeviceMappedErr := actualStateOfWorld.MarkDeviceAsMounted( 1101 volumeToMount.VolumeName, markedDevicePath, globalMapPath) 1102 if markDeviceMappedErr != nil { 1103 // On failure, return error. Caller will log and retry. 1104 eventErr, detailedErr := volumeToMount.GenerateError("MapVolume.MarkDeviceAsMounted failed", markDeviceMappedErr) 1105 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) 1106 } 1107 1108 markVolumeOpts := MarkVolumeOpts{ 1109 PodName: volumeToMount.PodName, 1110 PodUID: volumeToMount.Pod.UID, 1111 VolumeName: volumeToMount.VolumeName, 1112 BlockVolumeMapper: blockVolumeMapper, 1113 OuterVolumeSpecName: volumeToMount.OuterVolumeSpecName, 1114 VolumeGidVolume: volumeToMount.VolumeGidValue, 1115 VolumeSpec: volumeToMount.VolumeSpec, 1116 VolumeMountState: VolumeMounted, 1117 } 1118 1119 // Call MapPodDevice if blockVolumeMapper implements CustomBlockVolumeMapper 1120 if customBlockVolumeMapper, ok := blockVolumeMapper.(volume.CustomBlockVolumeMapper); ok { 1121 // Execute driver specific map 1122 pluginDevicePath, mapErr := customBlockVolumeMapper.MapPodDevice() 1123 if mapErr != nil { 1124 // On failure, return error. Caller will log and retry. 1125 og.markVolumeErrorState(volumeToMount, markVolumeOpts, mapErr, actualStateOfWorld) 1126 eventErr, detailedErr := volumeToMount.GenerateError("MapVolume.MapPodDevice failed", mapErr) 1127 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) 1128 } 1129 1130 // From now on, the volume is mapped. Mark it as uncertain on error, 1131 // so it is is unmapped when corresponding pod is deleted. 1132 defer func() { 1133 if operationContext.EventErr != nil { 1134 errText := operationContext.EventErr.Error() 1135 og.markVolumeErrorState(volumeToMount, markVolumeOpts, volumetypes.NewUncertainProgressError(errText), actualStateOfWorld) 1136 } 1137 }() 1138 1139 // if pluginDevicePath is provided, assume attacher may not provide device 1140 // or attachment flow uses SetupDevice to get device path 1141 if len(pluginDevicePath) != 0 { 1142 devicePath = pluginDevicePath 1143 } 1144 if len(devicePath) == 0 { 1145 eventErr, detailedErr := volumeToMount.GenerateError("MapVolume failed", goerrors.New("device path of the volume is empty")) 1146 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) 1147 } 1148 } 1149 1150 // When kubelet is containerized, devicePath may be a symlink at a place unavailable to 1151 // kubelet, so evaluate it on the host and expect that it links to a device in /dev, 1152 // which will be available to containerized kubelet. If still it does not exist, 1153 // AttachFileDevice will fail. If kubelet is not containerized, eval it anyway. 1154 kvh, ok := og.GetVolumePluginMgr().Host.(volume.KubeletVolumeHost) 1155 if !ok { 1156 eventErr, detailedErr := volumeToMount.GenerateError("MapVolume type assertion error", fmt.Errorf("volume host does not implement KubeletVolumeHost interface")) 1157 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) 1158 } 1159 hu := kvh.GetHostUtil() 1160 devicePath, err = hu.EvalHostSymlinks(devicePath) 1161 if err != nil { 1162 eventErr, detailedErr := volumeToMount.GenerateError("MapVolume.EvalHostSymlinks failed", err) 1163 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) 1164 } 1165 1166 // Update actual state of world with the devicePath again, if devicePath has changed from markedDevicePath 1167 // TODO: This can be improved after #82492 is merged and ASW has state. 1168 if markedDevicePath != devicePath { 1169 markDeviceMappedErr := actualStateOfWorld.MarkDeviceAsMounted( 1170 volumeToMount.VolumeName, devicePath, globalMapPath) 1171 if markDeviceMappedErr != nil { 1172 // On failure, return error. Caller will log and retry. 1173 eventErr, detailedErr := volumeToMount.GenerateError("MapVolume.MarkDeviceAsMounted failed", markDeviceMappedErr) 1174 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) 1175 } 1176 } 1177 1178 // Execute common map 1179 volumeMapPath, volName := blockVolumeMapper.GetPodDeviceMapPath() 1180 mapErr := util.MapBlockVolume(og.blkUtil, devicePath, globalMapPath, volumeMapPath, volName, volumeToMount.Pod.UID) 1181 if mapErr != nil { 1182 // On failure, return error. Caller will log and retry. 1183 eventErr, detailedErr := volumeToMount.GenerateError("MapVolume.MapBlockVolume failed", mapErr) 1184 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) 1185 } 1186 1187 // Device mapping for global map path succeeded 1188 simpleMsg, detailedMsg := volumeToMount.GenerateMsg("MapVolume.MapPodDevice succeeded", fmt.Sprintf("globalMapPath %q", globalMapPath)) 1189 verbosity := klog.Level(4) 1190 og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeNormal, kevents.SuccessfulMountVolume, simpleMsg) 1191 klog.V(verbosity).Infof(detailedMsg) 1192 1193 // Device mapping for pod device map path succeeded 1194 simpleMsg, detailedMsg = volumeToMount.GenerateMsg("MapVolume.MapPodDevice succeeded", fmt.Sprintf("volumeMapPath %q", volumeMapPath)) 1195 verbosity = klog.Level(1) 1196 og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeNormal, kevents.SuccessfulMountVolume, simpleMsg) 1197 klog.V(verbosity).Infof(detailedMsg) 1198 1199 resizeOptions := volume.NodeResizeOptions{ 1200 DevicePath: devicePath, 1201 DeviceStagePath: stagingPath, 1202 CSIVolumePhase: volume.CSIVolumePublished, 1203 } 1204 _, resizeError := og.nodeExpandVolume(volumeToMount, actualStateOfWorld, resizeOptions) 1205 if resizeError != nil { 1206 klog.Errorf("MapVolume.NodeExpandVolume failed with %v", resizeError) 1207 eventErr, detailedErr := volumeToMount.GenerateError("MapVolume.MarkVolumeAsMounted failed while expanding volume", resizeError) 1208 // At this point, MountVolume.Setup already succeeded, we should add volume into actual state 1209 // so that reconciler can clean up volume when needed. However, if nodeExpandVolume failed, 1210 // we should not mark the volume as mounted to avoid pod starts using it. 1211 // Considering the above situations, we mark volume as uncertain here so that reconciler will tigger 1212 // volume tear down when pod is deleted, and also makes sure pod will not start using it. 1213 if err := actualStateOfWorld.MarkVolumeMountAsUncertain(markVolumeOpts); err != nil { 1214 klog.Errorf(volumeToMount.GenerateErrorDetailed("MountVolume.MarkVolumeMountAsUncertain failed", err).Error()) 1215 } 1216 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) 1217 } 1218 1219 markVolMountedErr := actualStateOfWorld.MarkVolumeAsMounted(markVolumeOpts) 1220 if markVolMountedErr != nil { 1221 // On failure, return error. Caller will log and retry. 1222 eventErr, detailedErr := volumeToMount.GenerateError("MapVolume.MarkVolumeAsMounted failed", markVolMountedErr) 1223 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) 1224 } 1225 1226 return volumetypes.NewOperationContext(nil, nil, migrated) 1227 } 1228 1229 eventRecorderFunc := func(err *error) { 1230 if *err != nil { 1231 og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.FailedMapVolume, (*err).Error()) 1232 } 1233 } 1234 1235 return volumetypes.GeneratedOperations{ 1236 OperationName: "map_volume", 1237 OperationFunc: mapVolumeFunc, 1238 EventRecorderFunc: eventRecorderFunc, 1239 CompleteFunc: util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(blockVolumePlugin.GetPluginName(), volumeToMount.VolumeSpec), "map_volume"), 1240 }, nil 1241} 1242 1243// GenerateUnmapVolumeFunc marks volume as unmonuted based on following steps. 1244// Remove symbolic links from pod device map path dir and global map path dir. 1245// Once those cleanups are done, remove pod device map path dir. 1246// If all steps are completed, the volume is marked as unmounted. 1247func (og *operationGenerator) GenerateUnmapVolumeFunc( 1248 volumeToUnmount MountedVolume, 1249 actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) { 1250 1251 // Get block volume unmapper plugin 1252 blockVolumePlugin, err := 1253 og.volumePluginMgr.FindMapperPluginByName(volumeToUnmount.PluginName) 1254 if err != nil { 1255 return volumetypes.GeneratedOperations{}, volumeToUnmount.GenerateErrorDetailed("UnmapVolume.FindMapperPluginByName failed", err) 1256 } 1257 if blockVolumePlugin == nil { 1258 return volumetypes.GeneratedOperations{}, volumeToUnmount.GenerateErrorDetailed("UnmapVolume.FindMapperPluginByName failed to find BlockVolumeMapper plugin. Volume plugin is nil.", nil) 1259 } 1260 blockVolumeUnmapper, newUnmapperErr := blockVolumePlugin.NewBlockVolumeUnmapper( 1261 volumeToUnmount.InnerVolumeSpecName, volumeToUnmount.PodUID) 1262 if newUnmapperErr != nil { 1263 return volumetypes.GeneratedOperations{}, volumeToUnmount.GenerateErrorDetailed("UnmapVolume.NewUnmapper failed", newUnmapperErr) 1264 } 1265 1266 unmapVolumeFunc := func() volumetypes.OperationContext { 1267 1268 migrated := getMigratedStatusBySpec(volumeToUnmount.VolumeSpec) 1269 1270 // pods/{podUid}/volumeDevices/{escapeQualifiedPluginName}/{volumeName} 1271 podDeviceUnmapPath, volName := blockVolumeUnmapper.GetPodDeviceMapPath() 1272 // plugins/kubernetes.io/{PluginName}/volumeDevices/{volumePluginDependentPath}/{podUID} 1273 globalUnmapPath := volumeToUnmount.DeviceMountPath 1274 1275 // Mark the device as uncertain to make sure kubelet calls UnmapDevice again in all the "return err" 1276 // cases below. The volume is marked as fully un-mapped at the end of this function, when everything 1277 // succeeds. 1278 markVolumeOpts := MarkVolumeOpts{ 1279 PodName: volumeToUnmount.PodName, 1280 PodUID: volumeToUnmount.PodUID, 1281 VolumeName: volumeToUnmount.VolumeName, 1282 OuterVolumeSpecName: volumeToUnmount.OuterVolumeSpecName, 1283 VolumeGidVolume: volumeToUnmount.VolumeGidValue, 1284 VolumeSpec: volumeToUnmount.VolumeSpec, 1285 VolumeMountState: VolumeMountUncertain, 1286 } 1287 markVolumeUncertainErr := actualStateOfWorld.MarkVolumeMountAsUncertain(markVolumeOpts) 1288 if markVolumeUncertainErr != nil { 1289 // On failure, return error. Caller will log and retry. 1290 eventErr, detailedErr := volumeToUnmount.GenerateError("UnmapVolume.MarkDeviceAsUncertain failed", markVolumeUncertainErr) 1291 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) 1292 } 1293 1294 // Execute common unmap 1295 unmapErr := util.UnmapBlockVolume(og.blkUtil, globalUnmapPath, podDeviceUnmapPath, volName, volumeToUnmount.PodUID) 1296 if unmapErr != nil { 1297 // On failure, return error. Caller will log and retry. 1298 eventErr, detailedErr := volumeToUnmount.GenerateError("UnmapVolume.UnmapBlockVolume failed", unmapErr) 1299 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) 1300 } 1301 1302 // Call UnmapPodDevice if blockVolumeUnmapper implements CustomBlockVolumeUnmapper 1303 if customBlockVolumeUnmapper, ok := blockVolumeUnmapper.(volume.CustomBlockVolumeUnmapper); ok { 1304 // Execute plugin specific unmap 1305 unmapErr = customBlockVolumeUnmapper.UnmapPodDevice() 1306 if unmapErr != nil { 1307 // On failure, return error. Caller will log and retry. 1308 eventErr, detailedErr := volumeToUnmount.GenerateError("UnmapVolume.UnmapPodDevice failed", unmapErr) 1309 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) 1310 } 1311 } 1312 1313 klog.Infof( 1314 "UnmapVolume succeeded for volume %q (OuterVolumeSpecName: %q) pod %q (UID: %q). InnerVolumeSpecName %q. PluginName %q, VolumeGidValue %q", 1315 volumeToUnmount.VolumeName, 1316 volumeToUnmount.OuterVolumeSpecName, 1317 volumeToUnmount.PodName, 1318 volumeToUnmount.PodUID, 1319 volumeToUnmount.InnerVolumeSpecName, 1320 volumeToUnmount.PluginName, 1321 volumeToUnmount.VolumeGidValue) 1322 1323 // Update actual state of world 1324 markVolUnmountedErr := actualStateOfWorld.MarkVolumeAsUnmounted( 1325 volumeToUnmount.PodName, volumeToUnmount.VolumeName) 1326 if markVolUnmountedErr != nil { 1327 // On failure, just log and exit 1328 klog.Errorf(volumeToUnmount.GenerateErrorDetailed("UnmapVolume.MarkVolumeAsUnmounted failed", markVolUnmountedErr).Error()) 1329 } 1330 1331 return volumetypes.NewOperationContext(nil, nil, migrated) 1332 } 1333 1334 return volumetypes.GeneratedOperations{ 1335 OperationName: "unmap_volume", 1336 OperationFunc: unmapVolumeFunc, 1337 CompleteFunc: util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(blockVolumePlugin.GetPluginName(), volumeToUnmount.VolumeSpec), "unmap_volume"), 1338 EventRecorderFunc: nil, // nil because we do not want to generate event on error 1339 }, nil 1340} 1341 1342// GenerateUnmapDeviceFunc marks device as unmounted based on following steps. 1343// Check under globalMapPath dir if there isn't pod's symbolic links in it. 1344// If symbolic link isn't there, the device isn't referenced from Pods. 1345// Call plugin TearDownDevice to clean-up device connection, stored data under 1346// globalMapPath, these operations depend on plugin implementation. 1347// Once TearDownDevice is completed, remove globalMapPath dir. 1348// After globalMapPath is removed, fd lock by loopback for the device can 1349// be released safely because no one can consume the device at this point. 1350// At last, device open status will be checked just in case. 1351// If all steps are completed, the device is marked as unmounted. 1352func (og *operationGenerator) GenerateUnmapDeviceFunc( 1353 deviceToDetach AttachedVolume, 1354 actualStateOfWorld ActualStateOfWorldMounterUpdater, 1355 hostutil hostutil.HostUtils) (volumetypes.GeneratedOperations, error) { 1356 1357 blockVolumePlugin, err := 1358 og.volumePluginMgr.FindMapperPluginByName(deviceToDetach.PluginName) 1359 if err != nil { 1360 return volumetypes.GeneratedOperations{}, deviceToDetach.GenerateErrorDetailed("UnmapDevice.FindMapperPluginByName failed", err) 1361 } 1362 1363 if blockVolumePlugin == nil { 1364 return volumetypes.GeneratedOperations{}, deviceToDetach.GenerateErrorDetailed("UnmapDevice.FindMapperPluginByName failed to find BlockVolumeMapper plugin. Volume plugin is nil.", nil) 1365 } 1366 1367 blockVolumeUnmapper, newUnmapperErr := blockVolumePlugin.NewBlockVolumeUnmapper( 1368 deviceToDetach.VolumeSpec.Name(), 1369 "" /* podUID */) 1370 if newUnmapperErr != nil { 1371 return volumetypes.GeneratedOperations{}, deviceToDetach.GenerateErrorDetailed("UnmapDevice.NewUnmapper failed", newUnmapperErr) 1372 } 1373 1374 unmapDeviceFunc := func() volumetypes.OperationContext { 1375 migrated := getMigratedStatusBySpec(deviceToDetach.VolumeSpec) 1376 // Search under globalMapPath dir if all symbolic links from pods have been removed already. 1377 // If symbolic links are there, pods may still refer the volume. 1378 globalMapPath := deviceToDetach.DeviceMountPath 1379 refs, err := og.blkUtil.GetDeviceBindMountRefs(deviceToDetach.DevicePath, globalMapPath) 1380 if err != nil { 1381 if os.IsNotExist(err) { 1382 // Looks like SetupDevice did not complete. Fall through to TearDownDevice and mark the device as unmounted. 1383 refs = nil 1384 } else { 1385 eventErr, detailedErr := deviceToDetach.GenerateError("UnmapDevice.GetDeviceBindMountRefs check failed", err) 1386 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) 1387 } 1388 } 1389 if len(refs) > 0 { 1390 err = fmt.Errorf("the device %q is still referenced from other Pods %v", globalMapPath, refs) 1391 eventErr, detailedErr := deviceToDetach.GenerateError("UnmapDevice failed", err) 1392 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) 1393 } 1394 1395 // Mark the device as uncertain to make sure kubelet calls UnmapDevice again in all the "return err" 1396 // cases below. The volume is marked as fully un-mapped at the end of this function, when everything 1397 // succeeds. 1398 markDeviceUncertainErr := actualStateOfWorld.MarkDeviceAsUncertain( 1399 deviceToDetach.VolumeName, deviceToDetach.DevicePath, globalMapPath) 1400 if markDeviceUncertainErr != nil { 1401 // On failure, return error. Caller will log and retry. 1402 eventErr, detailedErr := deviceToDetach.GenerateError("UnmapDevice.MarkDeviceAsUncertain failed", markDeviceUncertainErr) 1403 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) 1404 } 1405 1406 // Call TearDownDevice if blockVolumeUnmapper implements CustomBlockVolumeUnmapper 1407 if customBlockVolumeUnmapper, ok := blockVolumeUnmapper.(volume.CustomBlockVolumeUnmapper); ok { 1408 // Execute tear down device 1409 unmapErr := customBlockVolumeUnmapper.TearDownDevice(globalMapPath, deviceToDetach.DevicePath) 1410 if unmapErr != nil { 1411 // On failure, return error. Caller will log and retry. 1412 eventErr, detailedErr := deviceToDetach.GenerateError("UnmapDevice.TearDownDevice failed", unmapErr) 1413 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) 1414 } 1415 } 1416 1417 // Plugin finished TearDownDevice(). Now globalMapPath dir and plugin's stored data 1418 // on the dir are unnecessary, clean up it. 1419 removeMapPathErr := og.blkUtil.RemoveMapPath(globalMapPath) 1420 if removeMapPathErr != nil { 1421 // On failure, return error. Caller will log and retry. 1422 eventErr, detailedErr := deviceToDetach.GenerateError("UnmapDevice.RemoveMapPath failed", removeMapPathErr) 1423 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) 1424 } 1425 1426 // Before logging that UnmapDevice succeeded and moving on, 1427 // use hostutil.PathIsDevice to check if the path is a device, 1428 // if so use hostutil.DeviceOpened to check if the device is in use anywhere 1429 // else on the system. Retry if it returns true. 1430 deviceOpened, deviceOpenedErr := isDeviceOpened(deviceToDetach, hostutil) 1431 if deviceOpenedErr != nil { 1432 return volumetypes.NewOperationContext(nil, deviceOpenedErr, migrated) 1433 } 1434 // The device is still in use elsewhere. Caller will log and retry. 1435 if deviceOpened { 1436 eventErr, detailedErr := deviceToDetach.GenerateError( 1437 "UnmapDevice failed", 1438 fmt.Errorf("the device is in use when it was no longer expected to be in use")) 1439 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) 1440 } 1441 1442 klog.Infof(deviceToDetach.GenerateMsgDetailed("UnmapDevice succeeded", "")) 1443 1444 // Update actual state of world 1445 markDeviceUnmountedErr := actualStateOfWorld.MarkDeviceAsUnmounted( 1446 deviceToDetach.VolumeName) 1447 if markDeviceUnmountedErr != nil { 1448 // On failure, return error. Caller will log and retry. 1449 eventErr, detailedErr := deviceToDetach.GenerateError("MarkDeviceAsUnmounted failed", markDeviceUnmountedErr) 1450 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) 1451 } 1452 1453 return volumetypes.NewOperationContext(nil, nil, migrated) 1454 } 1455 1456 return volumetypes.GeneratedOperations{ 1457 OperationName: "unmap_device", 1458 OperationFunc: unmapDeviceFunc, 1459 CompleteFunc: util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(blockVolumePlugin.GetPluginName(), deviceToDetach.VolumeSpec), "unmap_device"), 1460 EventRecorderFunc: nil, // nil because we do not want to generate event on error 1461 }, nil 1462} 1463 1464func (og *operationGenerator) GenerateVerifyControllerAttachedVolumeFunc( 1465 volumeToMount VolumeToMount, 1466 nodeName types.NodeName, 1467 actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) { 1468 volumePlugin, err := 1469 og.volumePluginMgr.FindPluginBySpec(volumeToMount.VolumeSpec) 1470 if err != nil || volumePlugin == nil { 1471 return volumetypes.GeneratedOperations{}, volumeToMount.GenerateErrorDetailed("VerifyControllerAttachedVolume.FindPluginBySpec failed", err) 1472 } 1473 1474 verifyControllerAttachedVolumeFunc := func() volumetypes.OperationContext { 1475 migrated := getMigratedStatusBySpec(volumeToMount.VolumeSpec) 1476 if !volumeToMount.PluginIsAttachable { 1477 // If the volume does not implement the attacher interface, it is 1478 // assumed to be attached and the actual state of the world is 1479 // updated accordingly. 1480 1481 addVolumeNodeErr := actualStateOfWorld.MarkVolumeAsAttached( 1482 volumeToMount.VolumeName, volumeToMount.VolumeSpec, nodeName, "" /* devicePath */) 1483 if addVolumeNodeErr != nil { 1484 // On failure, return error. Caller will log and retry. 1485 eventErr, detailedErr := volumeToMount.GenerateError("VerifyControllerAttachedVolume.MarkVolumeAsAttachedByUniqueVolumeName failed", addVolumeNodeErr) 1486 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) 1487 } 1488 1489 return volumetypes.NewOperationContext(nil, nil, migrated) 1490 } 1491 1492 if !volumeToMount.ReportedInUse { 1493 // If the given volume has not yet been added to the list of 1494 // VolumesInUse in the node's volume status, do not proceed, return 1495 // error. Caller will log and retry. The node status is updated 1496 // periodically by kubelet, so it may take as much as 10 seconds 1497 // before this clears. 1498 // Issue #28141 to enable on demand status updates. 1499 eventErr, detailedErr := volumeToMount.GenerateError("Volume has not been added to the list of VolumesInUse in the node's volume status", nil) 1500 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) 1501 } 1502 1503 // Fetch current node object 1504 node, fetchErr := og.kubeClient.CoreV1().Nodes().Get(context.TODO(), string(nodeName), metav1.GetOptions{}) 1505 if fetchErr != nil { 1506 // On failure, return error. Caller will log and retry. 1507 eventErr, detailedErr := volumeToMount.GenerateError("VerifyControllerAttachedVolume failed fetching node from API server", fetchErr) 1508 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) 1509 } 1510 1511 if node == nil { 1512 // On failure, return error. Caller will log and retry. 1513 eventErr, detailedErr := volumeToMount.GenerateError( 1514 "VerifyControllerAttachedVolume failed", 1515 fmt.Errorf("node object retrieved from API server is nil")) 1516 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) 1517 } 1518 1519 for _, attachedVolume := range node.Status.VolumesAttached { 1520 if attachedVolume.Name == volumeToMount.VolumeName { 1521 addVolumeNodeErr := actualStateOfWorld.MarkVolumeAsAttached( 1522 v1.UniqueVolumeName(""), volumeToMount.VolumeSpec, nodeName, attachedVolume.DevicePath) 1523 klog.Infof(volumeToMount.GenerateMsgDetailed("Controller attach succeeded", fmt.Sprintf("device path: %q", attachedVolume.DevicePath))) 1524 if addVolumeNodeErr != nil { 1525 // On failure, return error. Caller will log and retry. 1526 eventErr, detailedErr := volumeToMount.GenerateError("VerifyControllerAttachedVolume.MarkVolumeAsAttached failed", addVolumeNodeErr) 1527 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) 1528 } 1529 return volumetypes.NewOperationContext(nil, nil, migrated) 1530 } 1531 } 1532 1533 // Volume not attached, return error. Caller will log and retry. 1534 eventErr, detailedErr := volumeToMount.GenerateError("Volume not attached according to node status", nil) 1535 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) 1536 } 1537 1538 return volumetypes.GeneratedOperations{ 1539 OperationName: "verify_controller_attached_volume", 1540 OperationFunc: verifyControllerAttachedVolumeFunc, 1541 CompleteFunc: util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(volumePlugin.GetPluginName(), volumeToMount.VolumeSpec), "verify_controller_attached_volume"), 1542 EventRecorderFunc: nil, // nil because we do not want to generate event on error 1543 }, nil 1544 1545} 1546 1547func (og *operationGenerator) verifyVolumeIsSafeToDetach( 1548 volumeToDetach AttachedVolume) error { 1549 // Fetch current node object 1550 node, fetchErr := og.kubeClient.CoreV1().Nodes().Get(context.TODO(), string(volumeToDetach.NodeName), metav1.GetOptions{}) 1551 if fetchErr != nil { 1552 if errors.IsNotFound(fetchErr) { 1553 klog.Warningf(volumeToDetach.GenerateMsgDetailed("Node not found on API server. DetachVolume will skip safe to detach check", "")) 1554 return nil 1555 } 1556 1557 // On failure, return error. Caller will log and retry. 1558 return volumeToDetach.GenerateErrorDetailed("DetachVolume failed fetching node from API server", fetchErr) 1559 } 1560 1561 if node == nil { 1562 // On failure, return error. Caller will log and retry. 1563 return volumeToDetach.GenerateErrorDetailed( 1564 "DetachVolume failed fetching node from API server", 1565 fmt.Errorf("node object retrieved from API server is nil")) 1566 } 1567 1568 for _, inUseVolume := range node.Status.VolumesInUse { 1569 if inUseVolume == volumeToDetach.VolumeName { 1570 return volumeToDetach.GenerateErrorDetailed( 1571 "DetachVolume failed", 1572 fmt.Errorf("volume is still in use by node, according to Node status")) 1573 } 1574 } 1575 1576 // Volume is not marked as in use by node 1577 klog.Infof(volumeToDetach.GenerateMsgDetailed("Verified volume is safe to detach", "")) 1578 return nil 1579} 1580 1581func (og *operationGenerator) GenerateExpandVolumeFunc( 1582 pvc *v1.PersistentVolumeClaim, 1583 pv *v1.PersistentVolume) (volumetypes.GeneratedOperations, error) { 1584 1585 volumeSpec := volume.NewSpecFromPersistentVolume(pv, false) 1586 1587 volumePlugin, err := og.volumePluginMgr.FindExpandablePluginBySpec(volumeSpec) 1588 if err != nil { 1589 return volumetypes.GeneratedOperations{}, fmt.Errorf("error finding plugin for expanding volume: %q with error %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err) 1590 } 1591 1592 if volumePlugin == nil { 1593 return volumetypes.GeneratedOperations{}, fmt.Errorf("can not find plugin for expanding volume: %q", util.GetPersistentVolumeClaimQualifiedName(pvc)) 1594 } 1595 1596 expandVolumeFunc := func() volumetypes.OperationContext { 1597 1598 migrated := false 1599 1600 newSize := pvc.Spec.Resources.Requests[v1.ResourceStorage] 1601 statusSize := pvc.Status.Capacity[v1.ResourceStorage] 1602 pvSize := pv.Spec.Capacity[v1.ResourceStorage] 1603 if pvSize.Cmp(newSize) < 0 { 1604 updatedSize, expandErr := volumePlugin.ExpandVolumeDevice( 1605 volumeSpec, 1606 newSize, 1607 statusSize) 1608 if expandErr != nil { 1609 detailedErr := fmt.Errorf("error expanding volume %q of plugin %q: %v", util.GetPersistentVolumeClaimQualifiedName(pvc), volumePlugin.GetPluginName(), expandErr) 1610 return volumetypes.NewOperationContext(detailedErr, detailedErr, migrated) 1611 } 1612 1613 klog.Infof("ExpandVolume succeeded for volume %s", util.GetPersistentVolumeClaimQualifiedName(pvc)) 1614 1615 newSize = updatedSize 1616 // k8s doesn't have transactions, we can't guarantee that after updating PV - updating PVC will be 1617 // successful, that is why all PVCs for which pvc.Spec.Size > pvc.Status.Size must be reprocessed 1618 // until they reflect user requested size in pvc.Status.Size 1619 updateErr := util.UpdatePVSize(pv, newSize, og.kubeClient) 1620 if updateErr != nil { 1621 detailedErr := fmt.Errorf("error updating PV spec capacity for volume %q with : %v", util.GetPersistentVolumeClaimQualifiedName(pvc), updateErr) 1622 return volumetypes.NewOperationContext(detailedErr, detailedErr, migrated) 1623 } 1624 1625 klog.Infof("ExpandVolume.UpdatePV succeeded for volume %s", util.GetPersistentVolumeClaimQualifiedName(pvc)) 1626 } 1627 1628 fsVolume, _ := util.CheckVolumeModeFilesystem(volumeSpec) 1629 // No Cloudprovider resize needed, lets mark resizing as done 1630 // Rest of the volume expand controller code will assume PVC as *not* resized until pvc.Status.Size 1631 // reflects user requested size. 1632 if !volumePlugin.RequiresFSResize() || !fsVolume { 1633 klog.V(4).Infof("Controller resizing done for PVC %s", util.GetPersistentVolumeClaimQualifiedName(pvc)) 1634 err := util.MarkResizeFinished(pvc, newSize, og.kubeClient) 1635 if err != nil { 1636 detailedErr := fmt.Errorf("error marking pvc %s as resized : %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err) 1637 return volumetypes.NewOperationContext(detailedErr, detailedErr, migrated) 1638 } 1639 successMsg := fmt.Sprintf("ExpandVolume succeeded for volume %s", util.GetPersistentVolumeClaimQualifiedName(pvc)) 1640 og.recorder.Eventf(pvc, v1.EventTypeNormal, kevents.VolumeResizeSuccess, successMsg) 1641 } else { 1642 err := util.MarkForFSResize(pvc, og.kubeClient) 1643 if err != nil { 1644 detailedErr := fmt.Errorf("error updating pvc %s condition for fs resize : %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err) 1645 klog.Warning(detailedErr) 1646 return volumetypes.NewOperationContext(nil, nil, migrated) 1647 } 1648 oldCapacity := pvc.Status.Capacity[v1.ResourceStorage] 1649 err = util.AddAnnPreResizeCapacity(pv, oldCapacity, og.kubeClient) 1650 if err != nil { 1651 detailedErr := fmt.Errorf("error updating pv %s annotation (%s) with pre-resize capacity %s: %v", pv.ObjectMeta.Name, util.AnnPreResizeCapacity, oldCapacity.String(), err) 1652 klog.Warning(detailedErr) 1653 return volumetypes.NewOperationContext(nil, nil, migrated) 1654 } 1655 1656 } 1657 return volumetypes.NewOperationContext(nil, nil, migrated) 1658 } 1659 1660 eventRecorderFunc := func(err *error) { 1661 if *err != nil { 1662 og.recorder.Eventf(pvc, v1.EventTypeWarning, kevents.VolumeResizeFailed, (*err).Error()) 1663 } 1664 } 1665 1666 return volumetypes.GeneratedOperations{ 1667 OperationName: "expand_volume", 1668 OperationFunc: expandVolumeFunc, 1669 EventRecorderFunc: eventRecorderFunc, 1670 CompleteFunc: util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(volumePlugin.GetPluginName(), volumeSpec), "expand_volume"), 1671 }, nil 1672} 1673 1674func (og *operationGenerator) GenerateExpandInUseVolumeFunc( 1675 volumeToMount VolumeToMount, 1676 actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) { 1677 1678 volumePlugin, err := 1679 og.volumePluginMgr.FindPluginBySpec(volumeToMount.VolumeSpec) 1680 if err != nil || volumePlugin == nil { 1681 return volumetypes.GeneratedOperations{}, volumeToMount.GenerateErrorDetailed("NodeExpandVolume.FindPluginBySpec failed", err) 1682 } 1683 1684 fsResizeFunc := func() volumetypes.OperationContext { 1685 var resizeDone bool 1686 var eventErr, detailedErr error 1687 migrated := false 1688 1689 resizeOptions := volume.NodeResizeOptions{ 1690 VolumeSpec: volumeToMount.VolumeSpec, 1691 DevicePath: volumeToMount.DevicePath, 1692 } 1693 fsVolume, err := util.CheckVolumeModeFilesystem(volumeToMount.VolumeSpec) 1694 if err != nil { 1695 eventErr, detailedErr = volumeToMount.GenerateError("NodeExpandvolume.CheckVolumeModeFilesystem failed", err) 1696 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) 1697 } 1698 1699 if fsVolume { 1700 volumeMounter, newMounterErr := volumePlugin.NewMounter( 1701 volumeToMount.VolumeSpec, 1702 volumeToMount.Pod, 1703 volume.VolumeOptions{}) 1704 if newMounterErr != nil { 1705 eventErr, detailedErr = volumeToMount.GenerateError("NodeExpandVolume.NewMounter initialization failed", newMounterErr) 1706 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) 1707 } 1708 1709 resizeOptions.DeviceMountPath = volumeMounter.GetPath() 1710 1711 deviceMountableVolumePlugin, _ := og.volumePluginMgr.FindDeviceMountablePluginBySpec(volumeToMount.VolumeSpec) 1712 var volumeDeviceMounter volume.DeviceMounter 1713 if deviceMountableVolumePlugin != nil { 1714 volumeDeviceMounter, _ = deviceMountableVolumePlugin.NewDeviceMounter() 1715 } 1716 1717 if volumeDeviceMounter != nil { 1718 deviceStagePath, err := volumeDeviceMounter.GetDeviceMountPath(volumeToMount.VolumeSpec) 1719 if err != nil { 1720 eventErr, detailedErr = volumeToMount.GenerateError("NodeExpandVolume.GetDeviceMountPath failed", err) 1721 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) 1722 } 1723 resizeOptions.DeviceStagePath = deviceStagePath 1724 } 1725 } else { 1726 // Get block volume mapper plugin 1727 blockVolumePlugin, err := 1728 og.volumePluginMgr.FindMapperPluginBySpec(volumeToMount.VolumeSpec) 1729 if err != nil { 1730 eventErr, detailedErr = volumeToMount.GenerateError("MapVolume.FindMapperPluginBySpec failed", err) 1731 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) 1732 } 1733 1734 if blockVolumePlugin == nil { 1735 eventErr, detailedErr = volumeToMount.GenerateError("MapVolume.FindMapperPluginBySpec failed to find BlockVolumeMapper plugin. Volume plugin is nil.", nil) 1736 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) 1737 } 1738 1739 blockVolumeMapper, newMapperErr := blockVolumePlugin.NewBlockVolumeMapper( 1740 volumeToMount.VolumeSpec, 1741 volumeToMount.Pod, 1742 volume.VolumeOptions{}) 1743 if newMapperErr != nil { 1744 eventErr, detailedErr = volumeToMount.GenerateError("MapVolume.NewBlockVolumeMapper initialization failed", newMapperErr) 1745 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) 1746 } 1747 1748 // if plugin supports custom mappers lets add DeviceStagePath 1749 if customBlockVolumeMapper, ok := blockVolumeMapper.(volume.CustomBlockVolumeMapper); ok { 1750 resizeOptions.DeviceStagePath = customBlockVolumeMapper.GetStagingPath() 1751 } 1752 } 1753 1754 // if we are doing online expansion then volume is already published 1755 resizeOptions.CSIVolumePhase = volume.CSIVolumePublished 1756 resizeDone, eventErr, detailedErr = og.doOnlineExpansion(volumeToMount, actualStateOfWorld, resizeOptions) 1757 if eventErr != nil || detailedErr != nil { 1758 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) 1759 } 1760 if resizeDone { 1761 return volumetypes.NewOperationContext(nil, nil, migrated) 1762 } 1763 // This is a placeholder error - we should NEVER reach here. 1764 err = fmt.Errorf("volume resizing failed for unknown reason") 1765 eventErr, detailedErr = volumeToMount.GenerateError("NodeExpandVolume.NodeExpandVolume failed to resize volume", err) 1766 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) 1767 } 1768 1769 eventRecorderFunc := func(err *error) { 1770 if *err != nil { 1771 og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.VolumeResizeFailed, (*err).Error()) 1772 } 1773 } 1774 1775 return volumetypes.GeneratedOperations{ 1776 OperationName: "volume_fs_resize", 1777 OperationFunc: fsResizeFunc, 1778 EventRecorderFunc: eventRecorderFunc, 1779 CompleteFunc: util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(volumePlugin.GetPluginName(), volumeToMount.VolumeSpec), "volume_fs_resize"), 1780 }, nil 1781} 1782 1783func (og *operationGenerator) doOnlineExpansion(volumeToMount VolumeToMount, 1784 actualStateOfWorld ActualStateOfWorldMounterUpdater, 1785 resizeOptions volume.NodeResizeOptions) (bool, error, error) { 1786 1787 resizeDone, err := og.nodeExpandVolume(volumeToMount, actualStateOfWorld, resizeOptions) 1788 if err != nil { 1789 e1, e2 := volumeToMount.GenerateError("NodeExpandVolume.NodeExpandVolume failed", err) 1790 klog.Errorf(e2.Error()) 1791 return false, e1, e2 1792 } 1793 if resizeDone { 1794 markFSResizedErr := actualStateOfWorld.MarkVolumeAsResized(volumeToMount.PodName, volumeToMount.VolumeName) 1795 if markFSResizedErr != nil { 1796 // On failure, return error. Caller will log and retry. 1797 e1, e2 := volumeToMount.GenerateError("NodeExpandVolume.MarkVolumeAsResized failed", markFSResizedErr) 1798 return false, e1, e2 1799 } 1800 return true, nil, nil 1801 } 1802 return false, nil, nil 1803} 1804 1805func (og *operationGenerator) nodeExpandVolume( 1806 volumeToMount VolumeToMount, 1807 actualStateOfWorld ActualStateOfWorldMounterUpdater, 1808 rsOpts volume.NodeResizeOptions) (bool, error) { 1809 if !utilfeature.DefaultFeatureGate.Enabled(features.ExpandPersistentVolumes) { 1810 klog.V(4).Infof("Resizing is not enabled for this volume %s", volumeToMount.VolumeName) 1811 return true, nil 1812 } 1813 1814 if volumeToMount.VolumeSpec != nil && 1815 volumeToMount.VolumeSpec.InlineVolumeSpecForCSIMigration { 1816 klog.V(4).Infof("This volume %s is a migrated inline volume and is not resizable", volumeToMount.VolumeName) 1817 return true, nil 1818 } 1819 1820 // Get expander, if possible 1821 expandableVolumePlugin, _ := 1822 og.volumePluginMgr.FindNodeExpandablePluginBySpec(volumeToMount.VolumeSpec) 1823 1824 if expandableVolumePlugin != nil && 1825 expandableVolumePlugin.RequiresFSResize() && 1826 volumeToMount.VolumeSpec.PersistentVolume != nil { 1827 pv := volumeToMount.VolumeSpec.PersistentVolume 1828 pvc, err := og.kubeClient.CoreV1().PersistentVolumeClaims(pv.Spec.ClaimRef.Namespace).Get(context.TODO(), pv.Spec.ClaimRef.Name, metav1.GetOptions{}) 1829 if err != nil { 1830 // Return error rather than leave the file system un-resized, caller will log and retry 1831 return false, fmt.Errorf("mountVolume.NodeExpandVolume get PVC failed : %v", err) 1832 } 1833 1834 pvcStatusCap := pvc.Status.Capacity[v1.ResourceStorage] 1835 pvSpecCap := pv.Spec.Capacity[v1.ResourceStorage] 1836 if pvcStatusCap.Cmp(pvSpecCap) < 0 { 1837 // File system resize was requested, proceed 1838 klog.V(4).Infof(volumeToMount.GenerateMsgDetailed("MountVolume.NodeExpandVolume entering", fmt.Sprintf("DevicePath %q", volumeToMount.DevicePath))) 1839 1840 if volumeToMount.VolumeSpec.ReadOnly { 1841 simpleMsg, detailedMsg := volumeToMount.GenerateMsg("MountVolume.NodeExpandVolume failed", "requested read-only file system") 1842 klog.Warningf(detailedMsg) 1843 og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.FileSystemResizeFailed, simpleMsg) 1844 og.recorder.Eventf(pvc, v1.EventTypeWarning, kevents.FileSystemResizeFailed, simpleMsg) 1845 return true, nil 1846 } 1847 rsOpts.VolumeSpec = volumeToMount.VolumeSpec 1848 rsOpts.NewSize = pvSpecCap 1849 rsOpts.OldSize = pvcStatusCap 1850 resizeDone, resizeErr := expandableVolumePlugin.NodeExpand(rsOpts) 1851 if resizeErr != nil { 1852 // if driver returned FailedPrecondition error that means 1853 // volume expansion should not be retried on this node but 1854 // expansion operation should not block mounting 1855 if volumetypes.IsFailedPreconditionError(resizeErr) { 1856 actualStateOfWorld.MarkForInUseExpansionError(volumeToMount.VolumeName) 1857 klog.Errorf(volumeToMount.GenerateErrorDetailed("MountVolume.NodeExapndVolume failed with %v", resizeErr).Error()) 1858 return true, nil 1859 } 1860 return false, resizeErr 1861 } 1862 // Volume resizing is not done but it did not error out. This could happen if a CSI volume 1863 // does not have node stage_unstage capability but was asked to resize the volume before 1864 // node publish. In which case - we must retry resizing after node publish. 1865 if !resizeDone { 1866 return false, nil 1867 } 1868 simpleMsg, detailedMsg := volumeToMount.GenerateMsg("MountVolume.NodeExpandVolume succeeded", "") 1869 og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeNormal, kevents.FileSystemResizeSuccess, simpleMsg) 1870 og.recorder.Eventf(pvc, v1.EventTypeNormal, kevents.FileSystemResizeSuccess, simpleMsg) 1871 klog.Infof(detailedMsg) 1872 // File system resize succeeded, now update the PVC's Capacity to match the PV's 1873 err = util.MarkFSResizeFinished(pvc, pvSpecCap, og.kubeClient) 1874 if err != nil { 1875 // On retry, NodeExpandVolume will be called again but do nothing 1876 return false, fmt.Errorf("mountVolume.NodeExpandVolume update PVC status failed : %v", err) 1877 } 1878 return true, nil 1879 } 1880 } 1881 return true, nil 1882} 1883 1884func checkMountOptionSupport(og *operationGenerator, volumeToMount VolumeToMount, plugin volume.VolumePlugin) error { 1885 mountOptions := util.MountOptionFromSpec(volumeToMount.VolumeSpec) 1886 1887 if len(mountOptions) > 0 && !plugin.SupportsMountOption() { 1888 return fmt.Errorf("mount options are not supported for this volume type") 1889 } 1890 return nil 1891} 1892 1893// checkNodeAffinity looks at the PV node affinity, and checks if the node has the same corresponding labels 1894// This ensures that we don't mount a volume that doesn't belong to this node 1895func checkNodeAffinity(og *operationGenerator, volumeToMount VolumeToMount) error { 1896 pv := volumeToMount.VolumeSpec.PersistentVolume 1897 if pv != nil { 1898 nodeLabels, err := og.volumePluginMgr.Host.GetNodeLabels() 1899 if err != nil { 1900 return err 1901 } 1902 err = util.CheckNodeAffinity(pv, nodeLabels) 1903 if err != nil { 1904 return err 1905 } 1906 } 1907 return nil 1908} 1909 1910// isDeviceOpened checks the device status if the device is in use anywhere else on the system 1911func isDeviceOpened(deviceToDetach AttachedVolume, hostUtil hostutil.HostUtils) (bool, error) { 1912 isDevicePath, devicePathErr := hostUtil.PathIsDevice(deviceToDetach.DevicePath) 1913 var deviceOpened bool 1914 var deviceOpenedErr error 1915 if !isDevicePath && devicePathErr == nil || 1916 (devicePathErr != nil && strings.Contains(devicePathErr.Error(), "does not exist")) { 1917 // not a device path or path doesn't exist 1918 //TODO: refer to #36092 1919 klog.V(3).Infof("The path isn't device path or doesn't exist. Skip checking device path: %s", deviceToDetach.DevicePath) 1920 deviceOpened = false 1921 } else if devicePathErr != nil { 1922 return false, deviceToDetach.GenerateErrorDetailed("PathIsDevice failed", devicePathErr) 1923 } else { 1924 deviceOpened, deviceOpenedErr = hostUtil.DeviceOpened(deviceToDetach.DevicePath) 1925 if deviceOpenedErr != nil { 1926 return false, deviceToDetach.GenerateErrorDetailed("DeviceOpened failed", deviceOpenedErr) 1927 } 1928 } 1929 return deviceOpened, nil 1930} 1931 1932// findDetachablePluginBySpec is a variant of VolumePluginMgr.FindAttachablePluginByName() function. 1933// The difference is that it bypass the CanAttach() check for CSI plugin, i.e. it assumes all CSI plugin supports detach. 1934// The intention here is that a CSI plugin volume can end up in an Uncertain state, so that a detach 1935// operation will help it to detach no matter it actually has the ability to attach/detach. 1936func findDetachablePluginBySpec(spec *volume.Spec, pm *volume.VolumePluginMgr) (volume.AttachableVolumePlugin, error) { 1937 volumePlugin, err := pm.FindPluginBySpec(spec) 1938 if err != nil { 1939 return nil, err 1940 } 1941 if attachableVolumePlugin, ok := volumePlugin.(volume.AttachableVolumePlugin); ok { 1942 if attachableVolumePlugin.GetPluginName() == "kubernetes.io/csi" { 1943 return attachableVolumePlugin, nil 1944 } 1945 if canAttach, err := attachableVolumePlugin.CanAttach(spec); err != nil { 1946 return nil, err 1947 } else if canAttach { 1948 return attachableVolumePlugin, nil 1949 } 1950 } 1951 return nil, nil 1952} 1953 1954func getMigratedStatusBySpec(spec *volume.Spec) bool { 1955 migrated := false 1956 if spec != nil { 1957 migrated = spec.Migrated 1958 } 1959 return migrated 1960} 1961