1/* 2Copyright 2017 The Kubernetes Authors. 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package csi 18 19import ( 20 "context" 21 "errors" 22 "fmt" 23 "os" 24 "path/filepath" 25 "strings" 26 "time" 27 28 "k8s.io/klog/v2" 29 30 authenticationv1 "k8s.io/api/authentication/v1" 31 api "k8s.io/api/core/v1" 32 storage "k8s.io/api/storage/v1" 33 apierrors "k8s.io/apimachinery/pkg/api/errors" 34 meta "k8s.io/apimachinery/pkg/apis/meta/v1" 35 "k8s.io/apimachinery/pkg/types" 36 utilruntime "k8s.io/apimachinery/pkg/util/runtime" 37 utilversion "k8s.io/apimachinery/pkg/util/version" 38 "k8s.io/apimachinery/pkg/util/wait" 39 utilfeature "k8s.io/apiserver/pkg/util/feature" 40 clientset "k8s.io/client-go/kubernetes" 41 storagelisters "k8s.io/client-go/listers/storage/v1" 42 csitranslationplugins "k8s.io/csi-translation-lib/plugins" 43 "k8s.io/kubernetes/pkg/features" 44 "k8s.io/kubernetes/pkg/volume" 45 "k8s.io/kubernetes/pkg/volume/csi/nodeinfomanager" 46 volumetypes "k8s.io/kubernetes/pkg/volume/util/types" 47) 48 49const ( 50 // CSIPluginName is the name of the in-tree CSI Plugin 51 CSIPluginName = "kubernetes.io/csi" 52 53 csiTimeout = 2 * time.Minute 54 volNameSep = "^" 55 volDataFileName = "vol_data.json" 56 fsTypeBlockName = "block" 57 58 // CsiResyncPeriod is default resync period duration 59 // TODO: increase to something useful 60 CsiResyncPeriod = time.Minute 61) 62 63type csiPlugin struct { 64 host volume.VolumeHost 65 csiDriverLister storagelisters.CSIDriverLister 66 serviceAccountTokenGetter func(namespace, name string, tr *authenticationv1.TokenRequest) (*authenticationv1.TokenRequest, error) 67 volumeAttachmentLister storagelisters.VolumeAttachmentLister 68} 69 70// ProbeVolumePlugins returns implemented plugins 71func ProbeVolumePlugins() []volume.VolumePlugin { 72 p := &csiPlugin{ 73 host: nil, 74 } 75 return []volume.VolumePlugin{p} 76} 77 78// volume.VolumePlugin methods 79var _ volume.VolumePlugin = &csiPlugin{} 80 81// RegistrationHandler is the handler which is fed to the pluginwatcher API. 82type RegistrationHandler struct { 83} 84 85// TODO (verult) consider using a struct instead of global variables 86// csiDrivers map keep track of all registered CSI drivers on the node and their 87// corresponding sockets 88var csiDrivers = &DriversStore{} 89 90var nim nodeinfomanager.Interface 91 92// PluginHandler is the plugin registration handler interface passed to the 93// pluginwatcher module in kubelet 94var PluginHandler = &RegistrationHandler{} 95 96// ValidatePlugin is called by kubelet's plugin watcher upon detection 97// of a new registration socket opened by CSI Driver registrar side car. 98func (h *RegistrationHandler) ValidatePlugin(pluginName string, endpoint string, versions []string) error { 99 klog.Infof(log("Trying to validate a new CSI Driver with name: %s endpoint: %s versions: %s", 100 pluginName, endpoint, strings.Join(versions, ","))) 101 102 _, err := h.validateVersions("ValidatePlugin", pluginName, endpoint, versions) 103 if err != nil { 104 return fmt.Errorf("validation failed for CSI Driver %s at endpoint %s: %v", pluginName, endpoint, err) 105 } 106 107 return err 108} 109 110// RegisterPlugin is called when a plugin can be registered 111func (h *RegistrationHandler) RegisterPlugin(pluginName string, endpoint string, versions []string) error { 112 klog.Infof(log("Register new plugin with name: %s at endpoint: %s", pluginName, endpoint)) 113 114 highestSupportedVersion, err := h.validateVersions("RegisterPlugin", pluginName, endpoint, versions) 115 if err != nil { 116 return err 117 } 118 119 // Storing endpoint of newly registered CSI driver into the map, where CSI driver name will be the key 120 // all other CSI components will be able to get the actual socket of CSI drivers by its name. 121 csiDrivers.Set(pluginName, Driver{ 122 endpoint: endpoint, 123 highestSupportedVersion: highestSupportedVersion, 124 }) 125 126 // Get node info from the driver. 127 csi, err := newCsiDriverClient(csiDriverName(pluginName)) 128 if err != nil { 129 return err 130 } 131 132 ctx, cancel := context.WithTimeout(context.Background(), csiTimeout) 133 defer cancel() 134 135 driverNodeID, maxVolumePerNode, accessibleTopology, err := csi.NodeGetInfo(ctx) 136 if err != nil { 137 if unregErr := unregisterDriver(pluginName); unregErr != nil { 138 klog.Error(log("registrationHandler.RegisterPlugin failed to unregister plugin due to previous error: %v", unregErr)) 139 } 140 return err 141 } 142 143 err = nim.InstallCSIDriver(pluginName, driverNodeID, maxVolumePerNode, accessibleTopology) 144 if err != nil { 145 if unregErr := unregisterDriver(pluginName); unregErr != nil { 146 klog.Error(log("registrationHandler.RegisterPlugin failed to unregister plugin due to previous error: %v", unregErr)) 147 } 148 return err 149 } 150 151 return nil 152} 153 154func (h *RegistrationHandler) validateVersions(callerName, pluginName string, endpoint string, versions []string) (*utilversion.Version, error) { 155 if len(versions) == 0 { 156 return nil, errors.New(log("%s for CSI driver %q failed. Plugin returned an empty list for supported versions", callerName, pluginName)) 157 } 158 159 // Validate version 160 newDriverHighestVersion, err := highestSupportedVersion(versions) 161 if err != nil { 162 return nil, errors.New(log("%s for CSI driver %q failed. None of the versions specified %q are supported. err=%v", callerName, pluginName, versions, err)) 163 } 164 165 existingDriver, driverExists := csiDrivers.Get(pluginName) 166 if driverExists { 167 if !existingDriver.highestSupportedVersion.LessThan(newDriverHighestVersion) { 168 return nil, errors.New(log("%s for CSI driver %q failed. Another driver with the same name is already registered with a higher supported version: %q", callerName, pluginName, existingDriver.highestSupportedVersion)) 169 } 170 } 171 172 return newDriverHighestVersion, nil 173} 174 175// DeRegisterPlugin is called when a plugin removed its socket, signaling 176// it is no longer available 177func (h *RegistrationHandler) DeRegisterPlugin(pluginName string) { 178 klog.Info(log("registrationHandler.DeRegisterPlugin request for plugin %s", pluginName)) 179 if err := unregisterDriver(pluginName); err != nil { 180 klog.Error(log("registrationHandler.DeRegisterPlugin failed: %v", err)) 181 } 182} 183 184func (p *csiPlugin) Init(host volume.VolumeHost) error { 185 p.host = host 186 187 csiClient := host.GetKubeClient() 188 if csiClient == nil { 189 klog.Warning(log("kubeclient not set, assuming standalone kubelet")) 190 } else { 191 // set CSIDriverLister and volumeAttachmentLister 192 adcHost, ok := host.(volume.AttachDetachVolumeHost) 193 if ok { 194 p.csiDriverLister = adcHost.CSIDriverLister() 195 if p.csiDriverLister == nil { 196 klog.Error(log("CSIDriverLister not found on AttachDetachVolumeHost")) 197 } 198 p.volumeAttachmentLister = adcHost.VolumeAttachmentLister() 199 if p.volumeAttachmentLister == nil { 200 klog.Error(log("VolumeAttachmentLister not found on AttachDetachVolumeHost")) 201 } 202 } 203 kletHost, ok := host.(volume.KubeletVolumeHost) 204 if ok { 205 p.csiDriverLister = kletHost.CSIDriverLister() 206 if p.csiDriverLister == nil { 207 klog.Error(log("CSIDriverLister not found on KubeletVolumeHost")) 208 } 209 p.serviceAccountTokenGetter = host.GetServiceAccountTokenFunc() 210 if p.serviceAccountTokenGetter == nil { 211 klog.Error(log("ServiceAccountTokenGetter not found on KubeletVolumeHost")) 212 } 213 // We don't run the volumeAttachmentLister in the kubelet context 214 p.volumeAttachmentLister = nil 215 } 216 } 217 218 var migratedPlugins = map[string](func() bool){ 219 csitranslationplugins.GCEPDInTreePluginName: func() bool { 220 return utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) && utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationGCE) 221 }, 222 csitranslationplugins.AWSEBSInTreePluginName: func() bool { 223 return utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) && utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationAWS) 224 }, 225 csitranslationplugins.CinderInTreePluginName: func() bool { 226 return utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) && utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationOpenStack) 227 }, 228 csitranslationplugins.AzureDiskInTreePluginName: func() bool { 229 return utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) && utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationAzureDisk) 230 }, 231 csitranslationplugins.AzureFileInTreePluginName: func() bool { 232 return utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) && utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationAzureFile) 233 }, 234 csitranslationplugins.VSphereInTreePluginName: func() bool { 235 return utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) && utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationvSphere) 236 }, 237 } 238 239 // Initializing the label management channels 240 nim = nodeinfomanager.NewNodeInfoManager(host.GetNodeName(), host, migratedPlugins) 241 242 if utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) { 243 // This function prevents Kubelet from posting Ready status until CSINode 244 // is both installed and initialized 245 if err := initializeCSINode(host); err != nil { 246 return errors.New(log("failed to initialize CSINode: %v", err)) 247 } 248 } 249 250 return nil 251} 252 253func initializeCSINode(host volume.VolumeHost) error { 254 kvh, ok := host.(volume.KubeletVolumeHost) 255 if !ok { 256 klog.V(4).Info("Cast from VolumeHost to KubeletVolumeHost failed. Skipping CSINode initialization, not running on kubelet") 257 return nil 258 } 259 kubeClient := host.GetKubeClient() 260 if kubeClient == nil { 261 // Kubelet running in standalone mode. Skip CSINode initialization 262 klog.Warning("Skipping CSINode initialization, kubelet running in standalone mode") 263 return nil 264 } 265 266 kvh.SetKubeletError(errors.New("CSINode is not yet initialized")) 267 268 go func() { 269 defer utilruntime.HandleCrash() 270 271 // First wait indefinitely to talk to Kube APIServer 272 nodeName := host.GetNodeName() 273 err := waitForAPIServerForever(kubeClient, nodeName) 274 if err != nil { 275 klog.Fatalf("Failed to initialize CSINode while waiting for API server to report ok: %v", err) 276 } 277 278 // Backoff parameters tuned to retry over 140 seconds. Will fail and restart the Kubelet 279 // after max retry steps. 280 initBackoff := wait.Backoff{ 281 Steps: 6, 282 Duration: 15 * time.Millisecond, 283 Factor: 6.0, 284 Jitter: 0.1, 285 } 286 err = wait.ExponentialBackoff(initBackoff, func() (bool, error) { 287 klog.V(4).Infof("Initializing migrated drivers on CSINode") 288 err := nim.InitializeCSINodeWithAnnotation() 289 if err != nil { 290 kvh.SetKubeletError(fmt.Errorf("failed to initialize CSINode: %v", err)) 291 klog.Errorf("Failed to initialize CSINode: %v", err) 292 return false, nil 293 } 294 295 // Successfully initialized drivers, allow Kubelet to post Ready 296 kvh.SetKubeletError(nil) 297 return true, nil 298 }) 299 if err != nil { 300 // 2 releases after CSIMigration and all CSIMigrationX (where X is a volume plugin) 301 // are permanently enabled the apiserver/controllers can assume that the kubelet is 302 // using CSI for all Migrated volume plugins. Then all the CSINode initialization 303 // code can be dropped from Kubelet. 304 // Kill the Kubelet process and allow it to restart to retry initialization 305 klog.Fatalf("Failed to initialize CSINode after retrying: %v", err) 306 } 307 }() 308 return nil 309} 310 311func (p *csiPlugin) GetPluginName() string { 312 return CSIPluginName 313} 314 315// GetvolumeName returns a concatenated string of CSIVolumeSource.Driver<volNameSe>CSIVolumeSource.VolumeHandle 316// That string value is used in Detach() to extract driver name and volumeName. 317func (p *csiPlugin) GetVolumeName(spec *volume.Spec) (string, error) { 318 csi, err := getPVSourceFromSpec(spec) 319 if err != nil { 320 return "", errors.New(log("plugin.GetVolumeName failed to extract volume source from spec: %v", err)) 321 } 322 323 // return driverName<separator>volumeHandle 324 return fmt.Sprintf("%s%s%s", csi.Driver, volNameSep, csi.VolumeHandle), nil 325} 326 327func (p *csiPlugin) CanSupport(spec *volume.Spec) bool { 328 // TODO (vladimirvivien) CanSupport should also take into account 329 // the availability/registration of specified Driver in the volume source 330 if spec == nil { 331 return false 332 } 333 if utilfeature.DefaultFeatureGate.Enabled(features.CSIInlineVolume) { 334 return (spec.PersistentVolume != nil && spec.PersistentVolume.Spec.CSI != nil) || 335 (spec.Volume != nil && spec.Volume.CSI != nil) 336 } 337 338 return spec.PersistentVolume != nil && spec.PersistentVolume.Spec.CSI != nil 339} 340 341func (p *csiPlugin) RequiresRemount(spec *volume.Spec) bool { 342 if p.csiDriverLister == nil { 343 return false 344 } 345 driverName, err := GetCSIDriverName(spec) 346 if err != nil { 347 klog.V(5).Info(log("Failed to mark %q as republish required, err: %v", spec.Name(), err)) 348 return false 349 } 350 csiDriver, err := p.csiDriverLister.Get(driverName) 351 if err != nil { 352 klog.V(5).Info(log("Failed to mark %q as republish required, err: %v", spec.Name(), err)) 353 return false 354 } 355 return *csiDriver.Spec.RequiresRepublish 356} 357 358func (p *csiPlugin) NewMounter( 359 spec *volume.Spec, 360 pod *api.Pod, 361 _ volume.VolumeOptions) (volume.Mounter, error) { 362 363 volSrc, pvSrc, err := getSourceFromSpec(spec) 364 if err != nil { 365 return nil, err 366 } 367 368 var ( 369 driverName string 370 volumeHandle string 371 readOnly bool 372 ) 373 374 switch { 375 case volSrc != nil && utilfeature.DefaultFeatureGate.Enabled(features.CSIInlineVolume): 376 volumeHandle = makeVolumeHandle(string(pod.UID), spec.Name()) 377 driverName = volSrc.Driver 378 if volSrc.ReadOnly != nil { 379 readOnly = *volSrc.ReadOnly 380 } 381 case pvSrc != nil: 382 driverName = pvSrc.Driver 383 volumeHandle = pvSrc.VolumeHandle 384 readOnly = spec.ReadOnly 385 default: 386 return nil, errors.New(log("volume source not found in volume.Spec")) 387 } 388 389 volumeLifecycleMode, err := p.getVolumeLifecycleMode(spec) 390 if err != nil { 391 return nil, err 392 } 393 394 // Check CSIDriver.Spec.Mode to ensure that the CSI driver 395 // supports the current volumeLifecycleMode. 396 if err := p.supportsVolumeLifecycleMode(driverName, volumeLifecycleMode); err != nil { 397 return nil, err 398 } 399 400 fsGroupPolicy, err := p.getFSGroupPolicy(driverName) 401 if err != nil { 402 return nil, err 403 } 404 405 k8s := p.host.GetKubeClient() 406 if k8s == nil { 407 return nil, errors.New(log("failed to get a kubernetes client")) 408 } 409 410 kvh, ok := p.host.(volume.KubeletVolumeHost) 411 if !ok { 412 return nil, errors.New(log("cast from VolumeHost to KubeletVolumeHost failed")) 413 } 414 415 mounter := &csiMountMgr{ 416 plugin: p, 417 k8s: k8s, 418 spec: spec, 419 pod: pod, 420 podUID: pod.UID, 421 driverName: csiDriverName(driverName), 422 volumeLifecycleMode: volumeLifecycleMode, 423 fsGroupPolicy: fsGroupPolicy, 424 volumeID: volumeHandle, 425 specVolumeID: spec.Name(), 426 readOnly: readOnly, 427 kubeVolHost: kvh, 428 } 429 mounter.csiClientGetter.driverName = csiDriverName(driverName) 430 431 // Save volume info in pod dir 432 dir := mounter.GetPath() 433 dataDir := filepath.Dir(dir) // dropoff /mount at end 434 435 if err := os.MkdirAll(dataDir, 0750); err != nil { 436 return nil, errors.New(log("failed to create dir %#v: %v", dataDir, err)) 437 } 438 klog.V(4).Info(log("created path successfully [%s]", dataDir)) 439 440 mounter.MetricsProvider = NewMetricsCsi(volumeHandle, dir, csiDriverName(driverName)) 441 442 // persist volume info data for teardown 443 node := string(p.host.GetNodeName()) 444 volData := map[string]string{ 445 volDataKey.specVolID: spec.Name(), 446 volDataKey.volHandle: volumeHandle, 447 volDataKey.driverName: driverName, 448 volDataKey.nodeName: node, 449 volDataKey.volumeLifecycleMode: string(volumeLifecycleMode), 450 } 451 452 attachID := getAttachmentName(volumeHandle, driverName, node) 453 volData[volDataKey.attachmentID] = attachID 454 455 err = saveVolumeData(dataDir, volDataFileName, volData) 456 defer func() { 457 // Only if there was an error and volume operation was considered 458 // finished, we should remove the directory. 459 if err != nil && volumetypes.IsOperationFinishedError(err) { 460 // attempt to cleanup volume mount dir. 461 if err = removeMountDir(p, dir); err != nil { 462 klog.Error(log("attacher.MountDevice failed to remove mount dir after error [%s]: %v", dir, err)) 463 } 464 } 465 }() 466 467 if err != nil { 468 errorMsg := log("csi.NewMounter failed to save volume info data: %v", err) 469 klog.Error(errorMsg) 470 471 return nil, errors.New(errorMsg) 472 } 473 474 klog.V(4).Info(log("mounter created successfully")) 475 476 return mounter, nil 477} 478 479func (p *csiPlugin) NewUnmounter(specName string, podUID types.UID) (volume.Unmounter, error) { 480 klog.V(4).Infof(log("setting up unmounter for [name=%v, podUID=%v]", specName, podUID)) 481 482 kvh, ok := p.host.(volume.KubeletVolumeHost) 483 if !ok { 484 return nil, errors.New(log("cast from VolumeHost to KubeletVolumeHost failed")) 485 } 486 487 unmounter := &csiMountMgr{ 488 plugin: p, 489 podUID: podUID, 490 specVolumeID: specName, 491 kubeVolHost: kvh, 492 } 493 494 // load volume info from file 495 dir := unmounter.GetPath() 496 dataDir := filepath.Dir(dir) // dropoff /mount at end 497 data, err := loadVolumeData(dataDir, volDataFileName) 498 if err != nil { 499 return nil, errors.New(log("unmounter failed to load volume data file [%s]: %v", dir, err)) 500 } 501 unmounter.driverName = csiDriverName(data[volDataKey.driverName]) 502 unmounter.volumeID = data[volDataKey.volHandle] 503 unmounter.csiClientGetter.driverName = unmounter.driverName 504 505 return unmounter, nil 506} 507 508func (p *csiPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) { 509 klog.V(4).Info(log("plugin.ConstructVolumeSpec [pv.Name=%v, path=%v]", volumeName, mountPath)) 510 511 volData, err := loadVolumeData(mountPath, volDataFileName) 512 if err != nil { 513 return nil, errors.New(log("plugin.ConstructVolumeSpec failed loading volume data using [%s]: %v", mountPath, err)) 514 } 515 516 klog.V(4).Info(log("plugin.ConstructVolumeSpec extracted [%#v]", volData)) 517 518 var spec *volume.Spec 519 inlineEnabled := utilfeature.DefaultFeatureGate.Enabled(features.CSIInlineVolume) 520 521 // If inlineEnabled is true and mode is VolumeLifecycleEphemeral, 522 // use constructVolSourceSpec to construct volume source spec. 523 // If inlineEnabled is false or mode is VolumeLifecyclePersistent, 524 // use constructPVSourceSpec to construct volume construct pv source spec. 525 if inlineEnabled && storage.VolumeLifecycleMode(volData[volDataKey.volumeLifecycleMode]) == storage.VolumeLifecycleEphemeral { 526 spec = p.constructVolSourceSpec(volData[volDataKey.specVolID], volData[volDataKey.driverName]) 527 return spec, nil 528 } 529 spec = p.constructPVSourceSpec(volData[volDataKey.specVolID], volData[volDataKey.driverName], volData[volDataKey.volHandle]) 530 531 return spec, nil 532} 533 534// constructVolSourceSpec constructs volume.Spec with CSIVolumeSource 535func (p *csiPlugin) constructVolSourceSpec(volSpecName, driverName string) *volume.Spec { 536 vol := &api.Volume{ 537 Name: volSpecName, 538 VolumeSource: api.VolumeSource{ 539 CSI: &api.CSIVolumeSource{ 540 Driver: driverName, 541 }, 542 }, 543 } 544 return volume.NewSpecFromVolume(vol) 545} 546 547//constructPVSourceSpec constructs volume.Spec with CSIPersistentVolumeSource 548func (p *csiPlugin) constructPVSourceSpec(volSpecName, driverName, volumeHandle string) *volume.Spec { 549 fsMode := api.PersistentVolumeFilesystem 550 pv := &api.PersistentVolume{ 551 ObjectMeta: meta.ObjectMeta{ 552 Name: volSpecName, 553 }, 554 Spec: api.PersistentVolumeSpec{ 555 PersistentVolumeSource: api.PersistentVolumeSource{ 556 CSI: &api.CSIPersistentVolumeSource{ 557 Driver: driverName, 558 VolumeHandle: volumeHandle, 559 }, 560 }, 561 VolumeMode: &fsMode, 562 }, 563 } 564 return volume.NewSpecFromPersistentVolume(pv, false) 565} 566 567func (p *csiPlugin) SupportsMountOption() bool { 568 // TODO (vladimirvivien) use CSI VolumeCapability.MountVolume.mount_flags 569 // to probe for the result for this method 570 // (bswartz) Until the CSI spec supports probing, our only option is to 571 // make plugins register their support for mount options or lack thereof 572 // directly with kubernetes. 573 return true 574} 575 576func (p *csiPlugin) SupportsBulkVolumeVerification() bool { 577 return false 578} 579 580// volume.AttachableVolumePlugin methods 581var _ volume.AttachableVolumePlugin = &csiPlugin{} 582 583var _ volume.DeviceMountableVolumePlugin = &csiPlugin{} 584 585func (p *csiPlugin) NewAttacher() (volume.Attacher, error) { 586 return p.newAttacherDetacher() 587} 588 589func (p *csiPlugin) NewDeviceMounter() (volume.DeviceMounter, error) { 590 return p.NewAttacher() 591} 592 593func (p *csiPlugin) NewDetacher() (volume.Detacher, error) { 594 return p.newAttacherDetacher() 595} 596 597func (p *csiPlugin) CanAttach(spec *volume.Spec) (bool, error) { 598 inlineEnabled := utilfeature.DefaultFeatureGate.Enabled(features.CSIInlineVolume) 599 if inlineEnabled { 600 volumeLifecycleMode, err := p.getVolumeLifecycleMode(spec) 601 if err != nil { 602 return false, err 603 } 604 605 if volumeLifecycleMode == storage.VolumeLifecycleEphemeral { 606 klog.V(5).Info(log("plugin.CanAttach = false, ephemeral mode detected for spec %v", spec.Name())) 607 return false, nil 608 } 609 } 610 611 pvSrc, err := getCSISourceFromSpec(spec) 612 if err != nil { 613 return false, err 614 } 615 616 driverName := pvSrc.Driver 617 618 skipAttach, err := p.skipAttach(driverName) 619 if err != nil { 620 return false, err 621 } 622 623 return !skipAttach, nil 624} 625 626// CanDeviceMount returns true if the spec supports device mount 627func (p *csiPlugin) CanDeviceMount(spec *volume.Spec) (bool, error) { 628 inlineEnabled := utilfeature.DefaultFeatureGate.Enabled(features.CSIInlineVolume) 629 if !inlineEnabled { 630 // No need to check anything, we assume it is a persistent volume. 631 return true, nil 632 } 633 634 volumeLifecycleMode, err := p.getVolumeLifecycleMode(spec) 635 if err != nil { 636 return false, err 637 } 638 639 if volumeLifecycleMode == storage.VolumeLifecycleEphemeral { 640 klog.V(5).Info(log("plugin.CanDeviceMount skipped ephemeral mode detected for spec %v", spec.Name())) 641 return false, nil 642 } 643 644 // Persistent volumes support device mount. 645 return true, nil 646} 647 648func (p *csiPlugin) NewDeviceUnmounter() (volume.DeviceUnmounter, error) { 649 return p.NewDetacher() 650} 651 652func (p *csiPlugin) GetDeviceMountRefs(deviceMountPath string) ([]string, error) { 653 m := p.host.GetMounter(p.GetPluginName()) 654 return m.GetMountRefs(deviceMountPath) 655} 656 657// BlockVolumePlugin methods 658var _ volume.BlockVolumePlugin = &csiPlugin{} 659 660func (p *csiPlugin) NewBlockVolumeMapper(spec *volume.Spec, podRef *api.Pod, opts volume.VolumeOptions) (volume.BlockVolumeMapper, error) { 661 pvSource, err := getCSISourceFromSpec(spec) 662 if err != nil { 663 return nil, err 664 } 665 readOnly, err := getReadOnlyFromSpec(spec) 666 if err != nil { 667 return nil, err 668 } 669 670 klog.V(4).Info(log("setting up block mapper for [volume=%v,driver=%v]", pvSource.VolumeHandle, pvSource.Driver)) 671 672 k8s := p.host.GetKubeClient() 673 if k8s == nil { 674 return nil, errors.New(log("failed to get a kubernetes client")) 675 } 676 677 mapper := &csiBlockMapper{ 678 k8s: k8s, 679 plugin: p, 680 volumeID: pvSource.VolumeHandle, 681 driverName: csiDriverName(pvSource.Driver), 682 readOnly: readOnly, 683 spec: spec, 684 specName: spec.Name(), 685 pod: podRef, 686 podUID: podRef.UID, 687 } 688 mapper.csiClientGetter.driverName = csiDriverName(pvSource.Driver) 689 690 // Save volume info in pod dir 691 dataDir := getVolumeDeviceDataDir(spec.Name(), p.host) 692 693 if err := os.MkdirAll(dataDir, 0750); err != nil { 694 return nil, errors.New(log("failed to create data dir %s: %v", dataDir, err)) 695 } 696 klog.V(4).Info(log("created path successfully [%s]", dataDir)) 697 698 blockPath, err := mapper.GetGlobalMapPath(spec) 699 if err != nil { 700 return nil, errors.New(log("failed to get device path: %v", err)) 701 } 702 703 mapper.MetricsProvider = NewMetricsCsi(pvSource.VolumeHandle, blockPath+"/"+string(podRef.UID), csiDriverName(pvSource.Driver)) 704 705 // persist volume info data for teardown 706 node := string(p.host.GetNodeName()) 707 attachID := getAttachmentName(pvSource.VolumeHandle, pvSource.Driver, node) 708 volData := map[string]string{ 709 volDataKey.specVolID: spec.Name(), 710 volDataKey.volHandle: pvSource.VolumeHandle, 711 volDataKey.driverName: pvSource.Driver, 712 volDataKey.nodeName: node, 713 volDataKey.attachmentID: attachID, 714 } 715 716 err = saveVolumeData(dataDir, volDataFileName, volData) 717 defer func() { 718 // Only if there was an error and volume operation was considered 719 // finished, we should remove the directory. 720 if err != nil && volumetypes.IsOperationFinishedError(err) { 721 // attempt to cleanup volume mount dir. 722 if err = removeMountDir(p, dataDir); err != nil { 723 klog.Error(log("attacher.MountDevice failed to remove mount dir after error [%s]: %v", dataDir, err)) 724 } 725 } 726 }() 727 if err != nil { 728 errorMsg := log("csi.NewBlockVolumeMapper failed to save volume info data: %v", err) 729 klog.Error(errorMsg) 730 return nil, errors.New(errorMsg) 731 } 732 733 return mapper, nil 734} 735 736func (p *csiPlugin) NewBlockVolumeUnmapper(volName string, podUID types.UID) (volume.BlockVolumeUnmapper, error) { 737 klog.V(4).Infof(log("setting up block unmapper for [Spec=%v, podUID=%v]", volName, podUID)) 738 unmapper := &csiBlockMapper{ 739 plugin: p, 740 podUID: podUID, 741 specName: volName, 742 } 743 744 // load volume info from file 745 dataDir := getVolumeDeviceDataDir(unmapper.specName, p.host) 746 data, err := loadVolumeData(dataDir, volDataFileName) 747 if err != nil { 748 return nil, errors.New(log("unmapper failed to load volume data file [%s]: %v", dataDir, err)) 749 } 750 unmapper.driverName = csiDriverName(data[volDataKey.driverName]) 751 unmapper.volumeID = data[volDataKey.volHandle] 752 unmapper.csiClientGetter.driverName = unmapper.driverName 753 754 return unmapper, nil 755} 756 757func (p *csiPlugin) ConstructBlockVolumeSpec(podUID types.UID, specVolName, mapPath string) (*volume.Spec, error) { 758 klog.V(4).Infof("plugin.ConstructBlockVolumeSpec [podUID=%s, specVolName=%s, path=%s]", string(podUID), specVolName, mapPath) 759 760 dataDir := getVolumeDeviceDataDir(specVolName, p.host) 761 volData, err := loadVolumeData(dataDir, volDataFileName) 762 if err != nil { 763 return nil, errors.New(log("plugin.ConstructBlockVolumeSpec failed loading volume data using [%s]: %v", mapPath, err)) 764 } 765 766 klog.V(4).Info(log("plugin.ConstructBlockVolumeSpec extracted [%#v]", volData)) 767 768 blockMode := api.PersistentVolumeBlock 769 pv := &api.PersistentVolume{ 770 ObjectMeta: meta.ObjectMeta{ 771 Name: volData[volDataKey.specVolID], 772 }, 773 Spec: api.PersistentVolumeSpec{ 774 PersistentVolumeSource: api.PersistentVolumeSource{ 775 CSI: &api.CSIPersistentVolumeSource{ 776 Driver: volData[volDataKey.driverName], 777 VolumeHandle: volData[volDataKey.volHandle], 778 }, 779 }, 780 VolumeMode: &blockMode, 781 }, 782 } 783 784 return volume.NewSpecFromPersistentVolume(pv, false), nil 785} 786 787// skipAttach looks up CSIDriver object associated with driver name 788// to determine if driver requires attachment volume operation 789func (p *csiPlugin) skipAttach(driver string) (bool, error) { 790 kletHost, ok := p.host.(volume.KubeletVolumeHost) 791 if ok { 792 if err := kletHost.WaitForCacheSync(); err != nil { 793 return false, err 794 } 795 } 796 797 if p.csiDriverLister == nil { 798 return false, errors.New("CSIDriver lister does not exist") 799 } 800 csiDriver, err := p.csiDriverLister.Get(driver) 801 if err != nil { 802 if apierrors.IsNotFound(err) { 803 // Don't skip attach if CSIDriver does not exist 804 return false, nil 805 } 806 return false, err 807 } 808 if csiDriver.Spec.AttachRequired != nil && *csiDriver.Spec.AttachRequired == false { 809 return true, nil 810 } 811 return false, nil 812} 813 814// supportsVolumeMode checks whether the CSI driver supports a volume in the given mode. 815// An error indicates that it isn't supported and explains why. 816func (p *csiPlugin) supportsVolumeLifecycleMode(driver string, volumeMode storage.VolumeLifecycleMode) error { 817 if !utilfeature.DefaultFeatureGate.Enabled(features.CSIInlineVolume) { 818 // Feature disabled, therefore only "persistent" volumes are supported. 819 if volumeMode != storage.VolumeLifecyclePersistent { 820 return fmt.Errorf("CSIInlineVolume feature not enabled, %q volumes not supported", volumeMode) 821 } 822 return nil 823 } 824 825 // Retrieve CSIDriver. It's not an error if that isn't 826 // possible (we don't have the lister if CSIDriverRegistry is 827 // disabled) or the driver isn't found (CSIDriver is 828 // optional), but then only persistent volumes are supported. 829 var csiDriver *storage.CSIDriver 830 if p.csiDriverLister != nil { 831 kletHost, ok := p.host.(volume.KubeletVolumeHost) 832 if ok { 833 if err := kletHost.WaitForCacheSync(); err != nil { 834 return err 835 } 836 } 837 838 c, err := p.csiDriverLister.Get(driver) 839 if err != nil && !apierrors.IsNotFound(err) { 840 // Some internal error. 841 return err 842 } 843 csiDriver = c 844 } 845 846 // The right response depends on whether we have information 847 // about the driver and the volume mode. 848 switch { 849 case csiDriver == nil && volumeMode == storage.VolumeLifecyclePersistent: 850 // No information, but that's okay for persistent volumes (and only those). 851 return nil 852 case csiDriver == nil: 853 return fmt.Errorf("volume mode %q not supported by driver %s (no CSIDriver object)", volumeMode, driver) 854 case containsVolumeMode(csiDriver.Spec.VolumeLifecycleModes, volumeMode): 855 // Explicitly listed. 856 return nil 857 default: 858 return fmt.Errorf("volume mode %q not supported by driver %s (only supports %q)", volumeMode, driver, csiDriver.Spec.VolumeLifecycleModes) 859 } 860} 861 862// containsVolumeMode checks whether the given volume mode is listed. 863func containsVolumeMode(modes []storage.VolumeLifecycleMode, mode storage.VolumeLifecycleMode) bool { 864 for _, m := range modes { 865 if m == mode { 866 return true 867 } 868 } 869 return false 870} 871 872// getVolumeLifecycleMode returns the mode for the specified spec: {persistent|ephemeral}. 873// 1) If mode cannot be determined, it will default to "persistent". 874// 2) If Mode cannot be resolved to either {persistent | ephemeral}, an error is returned 875// See https://github.com/kubernetes/enhancements/blob/master/keps/sig-storage/596-csi-inline-volumes/README.md 876func (p *csiPlugin) getVolumeLifecycleMode(spec *volume.Spec) (storage.VolumeLifecycleMode, error) { 877 // 1) if volume.Spec.Volume.CSI != nil -> mode is ephemeral 878 // 2) if volume.Spec.PersistentVolume.Spec.CSI != nil -> persistent 879 volSrc, _, err := getSourceFromSpec(spec) 880 if err != nil { 881 return "", err 882 } 883 884 if volSrc != nil && utilfeature.DefaultFeatureGate.Enabled(features.CSIInlineVolume) { 885 return storage.VolumeLifecycleEphemeral, nil 886 } 887 return storage.VolumeLifecyclePersistent, nil 888} 889 890// getFSGroupPolicy returns if the CSI driver supports a volume in the given mode. 891// An error indicates that it isn't supported and explains why. 892func (p *csiPlugin) getFSGroupPolicy(driver string) (storage.FSGroupPolicy, error) { 893 if !utilfeature.DefaultFeatureGate.Enabled(features.CSIVolumeFSGroupPolicy) { 894 // feature is disabled, default to ReadWriteOnceWithFSTypeFSGroupPolicy 895 return storage.ReadWriteOnceWithFSTypeFSGroupPolicy, nil 896 } 897 898 // Retrieve CSIDriver. It's not an error if that isn't 899 // possible (we don't have the lister if CSIDriverRegistry is 900 // disabled) or the driver isn't found (CSIDriver is 901 // optional) 902 var csiDriver *storage.CSIDriver 903 if p.csiDriverLister != nil { 904 kletHost, ok := p.host.(volume.KubeletVolumeHost) 905 if ok { 906 if err := kletHost.WaitForCacheSync(); err != nil { 907 return storage.ReadWriteOnceWithFSTypeFSGroupPolicy, err 908 } 909 } 910 911 c, err := p.csiDriverLister.Get(driver) 912 if err != nil && !apierrors.IsNotFound(err) { 913 // Some internal error. 914 return storage.ReadWriteOnceWithFSTypeFSGroupPolicy, err 915 } 916 csiDriver = c 917 } 918 919 // If the csiDriver isn't defined, return the default behavior 920 if csiDriver == nil { 921 return storage.ReadWriteOnceWithFSTypeFSGroupPolicy, nil 922 } 923 // If the csiDriver exists but the fsGroupPolicy isn't defined, return an error 924 if csiDriver.Spec.FSGroupPolicy == nil || *csiDriver.Spec.FSGroupPolicy == "" { 925 return storage.ReadWriteOnceWithFSTypeFSGroupPolicy, errors.New(log("expected valid fsGroupPolicy, received nil value or empty string")) 926 } 927 return *csiDriver.Spec.FSGroupPolicy, nil 928} 929 930func (p *csiPlugin) getPublishContext(client clientset.Interface, handle, driver, nodeName string) (map[string]string, error) { 931 skip, err := p.skipAttach(driver) 932 if err != nil { 933 return nil, err 934 } 935 if skip { 936 return nil, nil 937 } 938 939 attachID := getAttachmentName(handle, driver, nodeName) 940 941 // search for attachment by VolumeAttachment.Spec.Source.PersistentVolumeName 942 attachment, err := client.StorageV1().VolumeAttachments().Get(context.TODO(), attachID, meta.GetOptions{}) 943 if err != nil { 944 return nil, err // This err already has enough context ("VolumeAttachment xyz not found") 945 } 946 947 if attachment == nil { 948 err = errors.New("no existing VolumeAttachment found") 949 return nil, err 950 } 951 return attachment.Status.AttachmentMetadata, nil 952} 953 954func (p *csiPlugin) newAttacherDetacher() (*csiAttacher, error) { 955 k8s := p.host.GetKubeClient() 956 if k8s == nil { 957 return nil, errors.New(log("unable to get kubernetes client from host")) 958 } 959 960 return &csiAttacher{ 961 plugin: p, 962 k8s: k8s, 963 watchTimeout: csiTimeout, 964 }, nil 965} 966 967// podInfoEnabled check CSIDriver enabled pod info flag 968func (p *csiPlugin) podInfoEnabled(driverName string) (bool, error) { 969 kletHost, ok := p.host.(volume.KubeletVolumeHost) 970 if ok { 971 kletHost.WaitForCacheSync() 972 } 973 974 if p.csiDriverLister == nil { 975 return false, fmt.Errorf("CSIDriverLister not found") 976 } 977 978 csiDriver, err := p.csiDriverLister.Get(driverName) 979 if err != nil { 980 if apierrors.IsNotFound(err) { 981 klog.V(4).Infof(log("CSIDriver %q not found, not adding pod information", driverName)) 982 return false, nil 983 } 984 return false, err 985 } 986 987 // if PodInfoOnMount is not set or false we do not set pod attributes 988 if csiDriver.Spec.PodInfoOnMount == nil || *csiDriver.Spec.PodInfoOnMount == false { 989 klog.V(4).Infof(log("CSIDriver %q does not require pod information", driverName)) 990 return false, nil 991 } 992 return true, nil 993} 994 995func unregisterDriver(driverName string) error { 996 csiDrivers.Delete(driverName) 997 998 if err := nim.UninstallCSIDriver(driverName); err != nil { 999 return errors.New(log("Error uninstalling CSI driver: %v", err)) 1000 } 1001 1002 return nil 1003} 1004 1005// Return the highest supported version 1006func highestSupportedVersion(versions []string) (*utilversion.Version, error) { 1007 if len(versions) == 0 { 1008 return nil, errors.New(log("CSI driver reporting empty array for supported versions")) 1009 } 1010 1011 var highestSupportedVersion *utilversion.Version 1012 var theErr error 1013 for i := len(versions) - 1; i >= 0; i-- { 1014 currentHighestVer, err := utilversion.ParseGeneric(versions[i]) 1015 if err != nil { 1016 theErr = err 1017 continue 1018 } 1019 if currentHighestVer.Major() > 1 { 1020 // CSI currently only has version 0.x and 1.x (see https://github.com/container-storage-interface/spec/releases). 1021 // Therefore any driver claiming version 2.x+ is ignored as an unsupported versions. 1022 // Future 1.x versions of CSI are supposed to be backwards compatible so this version of Kubernetes will work with any 1.x driver 1023 // (or 0.x), but it may not work with 2.x drivers (because 2.x does not have to be backwards compatible with 1.x). 1024 continue 1025 } 1026 if highestSupportedVersion == nil || highestSupportedVersion.LessThan(currentHighestVer) { 1027 highestSupportedVersion = currentHighestVer 1028 } 1029 } 1030 1031 if highestSupportedVersion == nil { 1032 return nil, fmt.Errorf("could not find a highest supported version from versions (%v) reported by this driver: %v", versions, theErr) 1033 } 1034 1035 if highestSupportedVersion.Major() != 1 { 1036 // CSI v0.x is no longer supported as of Kubernetes v1.17 in 1037 // accordance with deprecation policy set out in Kubernetes v1.13 1038 return nil, fmt.Errorf("highest supported version reported by driver is %v, must be v1.x", highestSupportedVersion) 1039 } 1040 return highestSupportedVersion, nil 1041} 1042 1043// waitForAPIServerForever waits forever to get a CSINode instance as a proxy 1044// for a healthy APIServer 1045func waitForAPIServerForever(client clientset.Interface, nodeName types.NodeName) error { 1046 var lastErr error 1047 err := wait.PollImmediateInfinite(time.Second, func() (bool, error) { 1048 // Get a CSINode from API server to make sure 1) kubelet can reach API server 1049 // and 2) it has enough permissions. Kubelet may have restricted permissions 1050 // when it's bootstrapping TLS. 1051 // https://kubernetes.io/docs/reference/command-line-tools-reference/kubelet-tls-bootstrapping/ 1052 _, lastErr = client.StorageV1().CSINodes().Get(context.TODO(), string(nodeName), meta.GetOptions{}) 1053 if lastErr == nil || apierrors.IsNotFound(lastErr) { 1054 // API server contacted 1055 return true, nil 1056 } 1057 klog.V(2).Infof("Failed to contact API server when waiting for CSINode publishing: %s", lastErr) 1058 return false, nil 1059 }) 1060 if err != nil { 1061 // In theory this is unreachable, but just in case: 1062 return fmt.Errorf("%v: %v", err, lastErr) 1063 } 1064 1065 return nil 1066} 1067